Java_io体系之PipedWriter、PipedReader简介、走进源码及示例——14-程序员宅基地

技术标签: java  

Java_io体系之PipedWriter、PipedReader简介、走进源码及示例——14


——管道字符输出流、必须建立在管道输入流之上、所以先介绍管道字符输出流。可以先看示例或者总结、总结写的有点Q、不喜可无视、有误的地方指出则不胜感激。


一:PipedWriter


1、类功能简介:


管道字符输出流、用于将当前线程的指定字符写入到与此线程对应的管道字符输入流中去、所以PipedReader(pr)、PipedWriter(pw)必须配套使用、缺一不可。管道字符输出流的本质就是调用pr中的方法将字符或者字符数组写入到pr中、这一点是与众不同的地方。所以pw中的方法很少也很简单、主要就是负责将传入的pr与本身绑定、配对使用、然后就是调用绑定的pr的写入方法、将字符或者字符数组写入到pr的缓存字符数组中。


2、PipedWriter API简介:


A:关键字

    private PipedReader sink;	与此PipedWriter绑定的PipedReader

    
    private boolean closed = false;		标示此流是否关闭。

B:构造方法

	PipedWriter(PipedReader snk)	根据传入的PipedReader构造pw、并将pr与此pw绑定
    
    PipedWriter()	创建一个pw、在使用之前必须与一个pr绑定



C:一般方法

	synchronized void connect(PipedReader snk)		将此pw与一个pr绑定
	
	void close()	关闭此流。
	
	synchronized void connect(PipedReader snk)		将此pw与一个pr绑定
	
	synchronized void flush()	flush此流、唤醒pr中所有等待的方法。
	
	void write(int c)	将一个整数写入到与此pw绑定的pr的缓存字符数组buf中去
	
	void write(char cbuf[], int off, int len)	将cbuf的一部分写入pr的buf中去


3、源码分析


package com.chy.io.original.code;

import java.io.IOException;

public class PipedWriter extends Writer {
	
	//与此PipedWriter绑定的PipedReader
    private PipedReader sink;

    //标示此流是否关闭。
    private boolean closed = false;

    /**
     * 根据传入的PipedReader构造pw、并将pr与此pw绑定
     */
    public PipedWriter(PipedReader snk)  throws IOException {
    	connect(snk);
    }
    
    /**
     * 创建一个pw、在使用之前必须与一个pr绑定
     */
    public PipedWriter() {
    }
    
    /**
     * 将此pw与一个pr绑定
     */
    public synchronized void connect(PipedReader snk) throws IOException {
        if (snk == null) {
            throw new NullPointerException();
        } else if (sink != null || snk.connected) {
		    throw new IOException("Already connected");
		} else if (snk.closedByReader || closed) {
	            throw new IOException("Pipe closed");
	    }
	        
		sink = snk;
		snk.in = -1;
		snk.out = 0;
        snk.connected = true;
    }

    /**
     * 将一个整数写入到与此pw绑定的pr的缓存字符数组buf中去
     */
    public void write(int c)  throws IOException {
        if (sink == null) {
            throw new IOException("Pipe not connected");
        }
        sink.receive(c);
    }

    /**
     * 将cbuf的一部分写入pr的buf中去
     */
    public void write(char cbuf[], int off, int len) throws IOException {
        if (sink == null) {
            throw new IOException("Pipe not connected");
        } else if ((off | len | (off + len) | (cbuf.length - (off + len))) < 0) {
        	throw new IndexOutOfBoundsException();
		}
		sink.receive(cbuf, off, len);
    }

    /**
     * flush此流、唤醒pr中所有等待的方法。
     */
    public synchronized void flush() throws IOException {
		if (sink != null) {
	            if (sink.closedByReader || closed) {
	                throw new IOException("Pipe closed");
	            }            
	            synchronized (sink) {
	                sink.notifyAll();
	            }
		}
    }

    /**
     * 关闭此流。
     */
    public void close()  throws IOException {
        closed = true;
		if (sink != null) {
		    sink.receivedLast();
		}
    }
}

4、实例演示:


因为PipedWriter必须与PipedReader结合使用、所以将两者的示例放在一起。

二:PipedReader


1、类功能简介:


管道字符输入流、用于读取对应绑定的管道字符输出流写入其内置字符缓存数组buffer中的字符、借此来实现线程之间的通信、pr中专门有两个方法供pw调用、receive(char c)、receive(char[] b, int off, intlen)、使得pw可以将字符或者字符数组写入pr的buffer中、

2、PipedReader API简介:


A:关键字

	boolean closedByWriter = false;		标记PipedWriter是否关闭
	
    boolean closedByReader = false;		标记PipedReader是否关闭
    
    boolean connected = false;			标记PipedWriter与标记PipedReader是否关闭的连接是否关闭

    Thread readSide; 	拥有PipedReader的线程
    
    Thread writeSide;	拥有PipedWriter的线程

    private static final int DEFAULT_PIPE_SIZE = 1024;		用于循环存放PipedWriter写入的字符数组的默认大小

    char buffer[];		用于循环存放PipedWriter写入的字符数组

    int in = -1;	buf中下一个存放PipedWriter调用此PipedReader的receive(int c)时、c在buf中存放的位置的下标。此为初始状态、即buf中没有字符

    int out = 0;	buf中下一个被读取的字符的下标


B:构造方法

	PipedReader(PipedWriter src)	使用默认的buf的大小和传入的pw构造pr
	
	PipedReader(PipedWriter src, int pipeSize)		使用指定的buf的大小和传入的pw构造pr
	
	PipedReader()		使用默认大小构造pr
	
	PipedReader(int pipeSize)		使用指定大小构造pr


C:一般方法

	void close()	清空buf中数据、关闭此流。
	
	void connect(PipedWriter src)	调用与此流绑定的pw的connect方法、将此流与对应的pw绑定
	
	synchronized boolean ready()	查看此流是否可读
	
	synchronized int read()		从buf中读取一个字符、以整数形式返回
	
	synchronized int read(char cbuf[], int off, int len)	将buf中读取一部分字符到cbuf中。
	
	synchronized void receive(int c)	pw调用此流的此方法、向pr的buf以整数形式中写入一个字符。
	
	synchronized void receive(char c[], int off, int len)	将c中一部分字符写入到buf中。
	
	synchronized void receivedLast()	提醒所有等待的线程、已经接收到了最后一个字符。


3、源码分析


package com.chy.io.original.code;

import java.io.IOException;

public class PipedReader extends Reader {
    boolean closedByWriter = false;
    boolean closedByReader = false;
    boolean connected = false;

    Thread readSide;
    Thread writeSide;

   /** 
    * 用于循环存放PipedWriter写入的字符数组的默认大小
    */ 
    private static final int DEFAULT_PIPE_SIZE = 1024;

    /**
     * 用于循环存放PipedWriter写入的字符数组
     */
    char buffer[];

    /**
     * buf中下一个存放PipedWriter调用此PipedReader的receive(int c)时、c在buf中存放的位置的下标。
     * in为-1时、说明buf中没有可读取字符、in=out时已经存满了。
     */
    int in = -1;

    /**
     * buf中下一个被读取的字符的下标
     */
    int out = 0;

    /**
     * 使用默认的buf的大小和传入的pw构造pr
     */
    public PipedReader(PipedWriter src) throws IOException {
    	this(src, DEFAULT_PIPE_SIZE);
    }

    /**
     * 使用指定的buf的大小和传入的pw构造pr
     */
    public PipedReader(PipedWriter src, int pipeSize) throws IOException {
		initPipe(pipeSize);
		connect(src);
    }


    /**
     * 使用默认大小构造pr
     */
    public PipedReader() {
    	initPipe(DEFAULT_PIPE_SIZE);
    }

    /**
     * 使用指定大小构造pr
     */
    public PipedReader(int pipeSize) {
    	initPipe(pipeSize);
    }

    //初始化buf大小
    private void initPipe(int pipeSize) {
		if (pipeSize <= 0) {
		    throw new IllegalArgumentException("Pipe size <= 0");
		}
		buffer = new char[pipeSize];
    }

    /**
     * 调用与此流绑定的pw的connect方法、将此流与对应的pw绑定
     */
    public void connect(PipedWriter src) throws IOException {
    	src.connect(this);
    }
    
    /**
     * pw调用此流的此方法、向pr的buf以整数形式中写入一个字符。
     */
    synchronized void receive(int c) throws IOException {
        if (!connected) {
            throw new IOException("Pipe not connected");
        } else if (closedByWriter || closedByReader) {
        	throw new IOException("Pipe closed");
        } else if (readSide != null && !readSide.isAlive()) {
            throw new IOException("Read end dead");
        }

		writeSide = Thread.currentThread();
		while (in == out) {
		    if ((readSide != null) && !readSide.isAlive()) {
		    	throw new IOException("Pipe broken");
		    }
		    //buf中写入的被读取完、唤醒所有此对象监控的线程其他方法、如果一秒钟之后还是满值、则再次唤醒其他方法、直到buf中被读取。
		    notifyAll();	
		    try {
		        wait(1000);
		    } catch (InterruptedException ex) {
		    	throw new java.io.InterruptedIOException();
		    }
		}
		//buf中存放第一个字符时、将字符在buf中存放位置的下标in初始化为0、读取的下标也初始化为0、准备接受写入的第一个字符。
		if (in < 0) {
		    in = 0;
		    out = 0;
		}
		buffer[in++] = (char) c;
		//如果buf中放满了、则再从头开始存放。
		if (in >= buffer.length) {
		    in = 0;
		}
    }

    /**
     * 将c中一部分字符写入到buf中。
     */
    synchronized void receive(char c[], int off, int len)  throws IOException {
		while (--len >= 0) {
		    receive(c[off++]);
		}
    }

    /**
     * 提醒所有等待的线程、已经接收到了最后一个字符、PipedWriter已关闭。用于PipedWriter的close()方法.
     */
    synchronized void receivedLast() {
		closedByWriter = true;
		notifyAll();
    }

    /**
     * 从buf中读取一个字符、以整数形式返回
     */
    public synchronized int read()  throws IOException {
        if (!connected) {
            throw new IOException("Pipe not connected");
        } else if (closedByReader) {
		    throw new IOException("Pipe closed");
		} else if (writeSide != null && !writeSide.isAlive() && !closedByWriter && (in < 0)) {
            throw new IOException("Write end dead");
        }

        readSide = Thread.currentThread();
		int trials = 2;
		while (in < 0) {
		    if (closedByWriter) { 
			/* closed by writer, return EOF */
			return -1;
		    }
		    if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {
			throw new IOException("Pipe broken");
		    }
	            /* might be a writer waiting */
		    notifyAll();
		    try {
		        wait(1000);
		    } catch (InterruptedException ex) {
			throw new java.io.InterruptedIOException();
		    }
	 	}
		int ret = buffer[out++];
		if (out >= buffer.length) {
		    out = 0;
		}
		if (in == out) {
	            /* now empty */
		    in = -1;		
		}
		return ret;
    }

    /**
     * 将buf中读取一部分字符到cbuf中。
     */
    public synchronized int read(char cbuf[], int off, int len)  throws IOException {
        if (!connected) {
            throw new IOException("Pipe not connected");
        } else if (closedByReader) {
		    throw new IOException("Pipe closed");
		} else if (writeSide != null && !writeSide.isAlive() && !closedByWriter && (in < 0)) {
            throw new IOException("Write end dead");
        }

        if ((off < 0) || (off > cbuf.length) || (len < 0) ||
            ((off + len) > cbuf.length) || ((off + len) < 0)) {
		    throw new IndexOutOfBoundsException();
		} else if (len == 0) {
		    return 0;
		}

        /* possibly wait on the first character */
		int c = read();		
		if (c < 0) {
		    return -1;
		}
		cbuf[off] =  (char)c;
		int rlen = 1;
		while ((in >= 0) && (--len > 0)) {
		    cbuf[off + rlen] = buffer[out++];
		    rlen++;
		    //如果读取的下一个字符下标大于buffer的size、则重置out、从新开始从第一个开始读取。
		    if (out >= buffer.length) {
		    	out = 0;
		    }
		    //如果下一个写入字符的下标与下一个被读取的下标相同、则清空buf
		    if (in == out) {
	                /* now empty */
		    	in = -1;	
		    }
		}
		return rlen;
    }

    /**
     * 查看此流是否可读、看各个线程是否关闭、以及buffer中是否有可供读取的字符。
     */
    public synchronized boolean ready() throws IOException {
        if (!connected) {
            throw new IOException("Pipe not connected");
        } else if (closedByReader) {
	    throw new IOException("Pipe closed");
	} else if (writeSide != null && !writeSide.isAlive()
                   && !closedByWriter && (in < 0)) {
            throw new IOException("Write end dead");
        }
        if (in < 0) {
            return false;
        } else {
            return true;
        }
    }
 
    /**
     * 清空buf中数据、关闭此流。
     */
    public void close()  throws IOException {
		in = -1;
		closedByReader = true;
    }
}


4、实例演示:


用于发送字符的线程:CharSenderThread

package com.chy.io.original.thread;

import java.io.IOException;
import java.io.PipedWriter;

@SuppressWarnings("all")
public class CharSenderThread implements Runnable {
	private PipedWriter pw = new PipedWriter();
	
	public PipedWriter getPipedWriter(){
		return pw;
	}
	@Override
	public void run() {
		//sendOneChar();
		//sendShortMessage();
		sendLongMessage();
	}

	private void sendOneChar(){
		try {
			pw.write("a".charAt(0));
			pw.flush();
			pw.close();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	private void sendShortMessage() {
		try {
			pw.write("this is a short message from CharSenderThread !".toCharArray());
			pw.flush();
			pw.close();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	private void sendLongMessage(){
		try {
			char[] b = new char[1028];
			//生成一个长度为1028的字符数组、前1020个是1、后8个是2。
			for(int i=0; i<1020; i++){
				b[i] = 'a';
			}
			for (int i = 1020; i <1028; i++) {
				b[i] = 'b';
			}
			pw.write(b);
			pw.flush();
			pw.close();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
}

用于接收字符的线程: CharReceiveThread

package com.chy.io.original.thread;

import java.io.IOException;
import java.io.PipedReader;

@SuppressWarnings("all")
public class CharReceiverThread extends Thread {
	
	private PipedReader pr = new PipedReader();
	
	public PipedReader getPipedReader(){
		return pr;
	}
	@Override
	public void run() {
		//receiveOneChar();
		//receiveShortMessage();
		receiverLongMessage();
	}
	
	private void receiveOneChar(){
		try {
			int n = pr.read();
			System.out.println(n);
			pr.close();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	private void receiveShortMessage() {
		try {
			char[] b = new char[1024];
			int n = pr.read(b);
			System.out.println(new String(b, 0, n));
			pr.close();
			
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	private void receiverLongMessage(){
		try {
			char[] b = new char[2048];
			int count = 0;
			while(true){
				count = pr.read(b); 
				for (int i = 0; i < count; i++) {
					System.out.print(b[i]);
				}
				if(count == -1)
					break;
			}
			pr.close();
			
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
}

启动类:PipedWriterAndPipedReaderTest

package com.chy.io.original.test;

import java.io.IOException;
import java.io.PipedReader;
import java.io.PipedWriter;

import com.chy.io.original.thread.CharReceiverThread;
import com.chy.io.original.thread.CharSenderThread;

public class PipedWriterAndPipedReaderTest {
	public static void main(String[] args) throws IOException{
		CharSenderThread cst = new CharSenderThread();
		CharReceiverThread crt = new CharReceiverThread();
		PipedWriter pw = cst.getPipedWriter();
		PipedReader pr = crt.getPipedReader();
		
		pw.connect(pr);
		
		/**
		 * 想想为什么下面这样写会报Piped not connect异常 ?
		 */
		//new Thread(new CharSenderThread()).start();
		//new CharReceiverThread().start();
		
		new Thread(cst).start();
		crt.start();
	}
}

两个线程中分别有三个方法、可以对应的每次放开一对方法来测试、还有这里最后一个读取1028个字符的方法用了死循环来读取、可以试试当不用死循环来读取会有什么不一样的效果?初始化字符的时候要用char = 'a' 而不是cahr = "a"、可自己想原因。。。

总结:


PipedReader、PipedWriter两者的结合如鸳鸯一般、离开哪一方都不能继续存在、同时又如连理枝一般、PipedWriter先通过connect(PipedReader sink)来确定关系、并初始化PipedReader状态、告诉PipedReader只能属于这个PipedWriter、connect =true、当想赠与PipedReader字符时、就直接调用receive(char c) 、receive(char[] b, int off, int len)来将字符或者字符数组放入pr的存折buffer中。站在PipedReader角度上、看上哪个PipedWriter时就暗示pw、将主动权交给pw、调用pw的connect将自己给他去登记。当想要花(将字符读取到程序中)字符了就从buffer中拿、但是自己又没有本事挣字符、所以当buffer中没有字符时、自己就等着、并且跟pw讲没有字符了、pw就会向存折(buffer)中存字符、当然、pw不会一直不断往里存、当存折是空的时候也不会主动存、怕花冒、就等着pr要、要才存。过到最后两个只通过buffer来知道对方的存在与否、每次从buffer中存或者取字符时都会看看对方是否安康、若安好则继续生活、若一方不在、则另一方也不愿独存!


更多IO内容:java_io 体系之目录


版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/iteye_563/article/details/82552845

智能推荐

攻防世界_难度8_happy_puzzle_攻防世界困难模式攻略图文-程序员宅基地

文章浏览阅读645次。这个肯定是末尾的IDAT了,因为IDAT必须要满了才会开始一下个IDAT,这个明显就是末尾的IDAT了。,对应下面的create_head()代码。,对应下面的create_tail()代码。不要考虑爆破,我已经试了一下,太多情况了。题目来源:UNCTF。_攻防世界困难模式攻略图文

达梦数据库的导出(备份)、导入_达梦数据库导入导出-程序员宅基地

文章浏览阅读2.9k次,点赞3次,收藏10次。偶尔会用到,记录、分享。1. 数据库导出1.1 切换到dmdba用户su - dmdba1.2 进入达梦数据库安装路径的bin目录,执行导库操作  导出语句:./dexp cwy_init/[email protected]:5236 file=cwy_init.dmp log=cwy_init_exp.log 注释:   cwy_init/init_123..._达梦数据库导入导出

js引入kindeditor富文本编辑器的使用_kindeditor.js-程序员宅基地

文章浏览阅读1.9k次。1. 在官网上下载KindEditor文件,可以删掉不需要要到的jsp,asp,asp.net和php文件夹。接着把文件夹放到项目文件目录下。2. 修改html文件,在页面引入js文件:<script type="text/javascript" src="./kindeditor/kindeditor-all.js"></script><script type="text/javascript" src="./kindeditor/lang/zh-CN.js"_kindeditor.js

STM32学习过程记录11——基于STM32G431CBU6硬件SPI+DMA的高效WS2812B控制方法-程序员宅基地

文章浏览阅读2.3k次,点赞6次,收藏14次。SPI的详情简介不必赘述。假设我们通过SPI发送0xAA,我们的数据线就会变为10101010,通过修改不同的内容,即可修改SPI中0和1的持续时间。比如0xF0即为前半周期为高电平,后半周期为低电平的状态。在SPI的通信模式中,CPHA配置会影响该实验,下图展示了不同采样位置的SPI时序图[1]。CPOL = 0,CPHA = 1:CLK空闲状态 = 低电平,数据在下降沿采样,并在上升沿移出CPOL = 0,CPHA = 0:CLK空闲状态 = 低电平,数据在上升沿采样,并在下降沿移出。_stm32g431cbu6

计算机网络-数据链路层_接收方收到链路层数据后,使用crc检验后,余数为0,说明链路层的传输时可靠传输-程序员宅基地

文章浏览阅读1.2k次,点赞2次,收藏8次。数据链路层习题自测问题1.数据链路(即逻辑链路)与链路(即物理链路)有何区别?“电路接通了”与”数据链路接通了”的区别何在?2.数据链路层中的链路控制包括哪些功能?试讨论数据链路层做成可靠的链路层有哪些优点和缺点。3.网络适配器的作用是什么?网络适配器工作在哪一层?4.数据链路层的三个基本问题(帧定界、透明传输和差错检测)为什么都必须加以解决?5.如果在数据链路层不进行帧定界,会发生什么问题?6.PPP协议的主要特点是什么?为什么PPP不使用帧的编号?PPP适用于什么情况?为什么PPP协议不_接收方收到链路层数据后,使用crc检验后,余数为0,说明链路层的传输时可靠传输

软件测试工程师移民加拿大_无证移民,未受过软件工程师的教育(第1部分)-程序员宅基地

文章浏览阅读587次。软件测试工程师移民加拿大 无证移民,未受过软件工程师的教育(第1部分) (Undocumented Immigrant With No Education to Software Engineer(Part 1))Before I start, I want you to please bear with me on the way I write, I have very little gen...

随便推点

Thinkpad X250 secure boot failed 启动失败问题解决_安装完系统提示secureboot failure-程序员宅基地

文章浏览阅读304次。Thinkpad X250笔记本电脑,装的是FreeBSD,进入BIOS修改虚拟化配置(其后可能是误设置了安全开机),保存退出后系统无法启动,显示:secure boot failed ,把自己惊出一身冷汗,因为这台笔记本刚好还没开始做备份.....根据错误提示,到bios里面去找相关配置,在Security里面找到了Secure Boot选项,发现果然被设置为Enabled,将其修改为Disabled ,再开机,终于正常启动了。_安装完系统提示secureboot failure

C++如何做字符串分割(5种方法)_c++ 字符串分割-程序员宅基地

文章浏览阅读10w+次,点赞93次,收藏352次。1、用strtok函数进行字符串分割原型: char *strtok(char *str, const char *delim);功能:分解字符串为一组字符串。参数说明:str为要分解的字符串,delim为分隔符字符串。返回值:从str开头开始的一个个被分割的串。当没有被分割的串时则返回NULL。其它:strtok函数线程不安全,可以使用strtok_r替代。示例://借助strtok实现split#include <string.h>#include <stdio.h&_c++ 字符串分割

2013第四届蓝桥杯 C/C++本科A组 真题答案解析_2013年第四届c a组蓝桥杯省赛真题解答-程序员宅基地

文章浏览阅读2.3k次。1 .高斯日记 大数学家高斯有个好习惯:无论如何都要记日记。他的日记有个与众不同的地方,他从不注明年月日,而是用一个整数代替,比如:4210后来人们知道,那个整数就是日期,它表示那一天是高斯出生后的第几天。这或许也是个好习惯,它时时刻刻提醒着主人:日子又过去一天,还有多少时光可以用于浪费呢?高斯出生于:1777年4月30日。在高斯发现的一个重要定理的日记_2013年第四届c a组蓝桥杯省赛真题解答

基于供需算法优化的核极限学习机(KELM)分类算法-程序员宅基地

文章浏览阅读851次,点赞17次,收藏22次。摘要:本文利用供需算法对核极限学习机(KELM)进行优化,并用于分类。

metasploitable2渗透测试_metasploitable2怎么进入-程序员宅基地

文章浏览阅读1.1k次。一、系统弱密码登录1、在kali上执行命令行telnet 192.168.26.1292、Login和password都输入msfadmin3、登录成功,进入系统4、测试如下:二、MySQL弱密码登录:1、在kali上执行mysql –h 192.168.26.129 –u root2、登录成功,进入MySQL系统3、测试效果:三、PostgreSQL弱密码登录1、在Kali上执行psql -h 192.168.26.129 –U post..._metasploitable2怎么进入

Python学习之路:从入门到精通的指南_python人工智能开发从入门到精通pdf-程序员宅基地

文章浏览阅读257次。本文将为初学者提供Python学习的详细指南,从Python的历史、基础语法和数据类型到面向对象编程、模块和库的使用。通过本文,您将能够掌握Python编程的核心概念,为今后的编程学习和实践打下坚实基础。_python人工智能开发从入门到精通pdf