多线程使用PipedStream 通讯
Java 提供了四个相关的管道流,我们可以使用其在多线程进行数据传递,其分别是
类名 | 作用 | 备注 |
---|---|---|
PipedInputStream | 字节管道输入流 | 字节流 |
PipedOutputStream | 字节管道输出流 | 字节流 |
PipedReader | 字符管道读取 | 字符流 |
PipedWriter | 字符管道写入 | 字符流 |
其分为两类:字节流和字符流,基本步骤为:线程A写入数据到输出流/写入,线程B读取数据从输入流/字符读取,从而实现线程通讯,下面我们先看下基于字节流的实现方法.
写数据到输出流
package com.zhoutao.demo.thread.piped;import java.io.IOException;import java.io.PipedOutputStream;import java.util.concurrent.TimeUnit;public class WriteData { private int count = 0; public void writeMethod(PipedOutputStream pipedOutputStream) throws InterruptedException, IOException { while (true) { // 每隔1s向输出流写入数字字符串 pipedOutputStream.write(String.valueOf(count++).getBytes()); TimeUnit.SECONDS.sleep(1); } }}
读数据从输入流
package com.zhoutao.demo.thread.piped;import java.io.IOException;import java.io.PipedInputStream;public class ReadData { public void readMethod(PipedInputStream inputStream) throws IOException { byte[] bytes = new byte[20]; int read; // 当流中不存在数据时候,read方法会进入阻塞状态 while ((read = inputStream.read(bytes)) != -1) { String newData = new String(bytes, 0, read); System.out.println("Get Data = " + newData); } }}
启动测试
package com.zhoutao.demo.thread.piped;import java.io.IOException;import java.io.PipedInputStream;import java.io.PipedOutputStream;public class PipesStreamDemo { public static void main(String[] args) throws IOException { // 创建读写对象 WriteData writeData = new WriteData(); ReadData readData = new ReadData(); // 创建管道输入输出流 PipedInputStream pipedInputStream = new PipedInputStream(); PipedOutputStream pipedOutputStream = new PipedOutputStream(); // 重点:连接管道流 pipedOutputStream.connect(pipedInputStream); // 创建对应的线程并启动 ThreadRead threadRead = new ThreadRead(readData, pipedInputStream); ThreadWrite threadWrite = new ThreadWrite(writeData, pipedOutputStream); threadRead.start(); threadWrite.start(); // 观察控制台输出的数据 } static class ThreadRead extends Thread { private ReadData readData; private PipedInputStream inputStream; public ThreadRead(ReadData readData, PipedInputStream inputStream) { this.readData = readData; this.inputStream = inputStream; } @Override public void run() { try { readData.readMethod(inputStream); } catch (IOException e) { e.printStackTrace(); } } } static class ThreadWrite extends Thread { private WriteData writeData; private PipedOutputStream pipedOutputStream; public ThreadWrite(WriteData writeData, PipedOutputStream pipedOutputStream) { this.writeData = writeData; this.pipedOutputStream = pipedOutputStream; } @Override public void run() { try { writeData.writeMethod(pipedOutputStream); } catch (InterruptedException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } }}
测试数据
Get Data = 0Get Data = 1Get Data = 2Get Data = 3Get Data = 4Get Data = 5Get Data = 6Get Data = 7