一、什么是Stream(流)
流(stream)在 Node.js 中是处理流数据的抽象接口(abstract interface)。 stream 模块提供了基础的API。使用这些API可以很容易地来构建实现流接口的对象。例如, HTTP 请求 和 process.stdout 就都是流的实例。 流可以是可读的、可写的,或是可读写的。注意,所有的流都是 EventEmitter 的实例。
二、流的类型
Node.js 中有四种基本的流类型:
- Readable - 可读的流 (例如 fs.createReadStream())。
- Writable - 可写的流 (例如 fs.createWriteStream())。
- Duplex - 可读写的流(双工流) (例如 net.Socket)。
- Transform - 在读写过程中可以修改和变换数据的 Duplex 流 (例如 zlib.createDeflate())。
var Writable = Stream.Writable //可写的流
var Duplex = Stream.Duplex //可读写的流
var Transform = Stream.Transform //在读写过程中可以修改和变换数据的 Duplex 流
Node.js中关于流的操作被封装到了Stream模块中,这个模块也被多个核心模块所引用。例如在fs.createReadStream()和fs.createWriteStream()的源码实现里,都调用了Stream模块提供的抽象接口来实现对流数据的操作。
三、为什么使用Stream?
我们通过两个例子,了解一下为什么要使用Stream。
Exp1:
console.log(content)
console.log(content.toString())
})
但如果文件内容较大,譬如在500M时,执行上述代码的输出为:
Error: toString Failed
at Buffer.toString (buffer.js:382:11)
报错的原因是content这个Buffer对象的长度过大,导致toString方法失败。
fs.createReadStream创建一个可读流,连接了源头(上游,文件)和消耗方(下游,标准输出)。
执行上面代码时,流会逐次调用fs.read(ReadStream这个类的源码里有一个_read方法,这个_read方法在内部调用了fs.read来实现对文件的读取),将文件中的内容分批取出传给下游。
在下游看来,它收到的是一个先后到达的数据序列。
如果不需要一次操作全部内容,它可以处理完一个数据便丢掉。
在流看来,任一时刻它都只存储了文件中的一部分数据,只是内容在变化而已。
这种情况就像是用水管去取池子中的水。
每当用掉一点水,水管便会从池子中再取出一点。
无论水池有多大,都只存储了与水管容积等量的水。
Exp2:
下面是一个在线看视频的例子,假定我们通过HTTP请求返回视频内容给用户
fs.readFile(videoPath,(err,data) => {
res.end(data);
});
}).listen(8080);
但这样有两个明显的问题
用流可以将视频文件一点一点读到内存中,再一点一点返回给用户,读一部分,写一部分。(利用了 HTTP 协议的 Transfer-Encoding: chunked 分段传输特性),用户体验得到优化,同时对内存的开销明显下降。
fs.createReadStream(videoPath).pipe(res);
}).listen(8080);
通过上述两个例子,我们知道,在大数据情况下必须使用流式处理。
四、可读流(Readable Stream)
可读流(Readable streams)是对提供数据的源头(source)的抽象。
常见的可读流:
- HTTP responses,on the client
- HTTP requests,on the server
- fs read streams
- TCP sockets //sockets是一个双工流,即可读可写的流
- process.stdin //标准输入
所有的 Readable Stream 都实现了 stream.Readable 类定义的接口。
可读流的两种模式(flowing 和 paused)
- 在 flowing 模式下,可读流自动从系统底层读取数据,并通过 EventEmitter 接口的事件尽快将数据提供给应用(所有的流都是 EventEmitter 的实例)。
- 在 paused 模式下,必须显式调用 stream.read()方法来从流中读取数据片段。
创建流的Readable流,默认是非流动模式(paused模式),默认不会读取数据。所有初始工作模式为paused的Readable流,可以通过下面三种途径切换为flowing模式:
fs.createReadStream(path[,options])源码实现
this.flowing = null; // 默认为非流动模式
// 建一个buffer存放读出来的数据
this.buffer = Buffer.alloc(this.highWaterMark);
this.open();
// {newListener:[fn]}
// 次方法默认同步调用的
this.on('newListener',(type) => { // 等待着 它监听data事件
if (type === 'data') {//当监听到data事件时,把流设置为流动模式
this.flowing = true;
this.read();// 开始读取 客户已经监听了data事件
}
})
}
pause(){//将流从flowing模式切换为paused模式
this.flowing = false;
}
resume(){//将流从paused模式切换为flowing模式
this.flowing =true;
this.read();//将流从paused模式切换为flowing模式后,继续读取文件内容
}
read(){ // 默认第一次调用read方法时还没有获取fd,文件的打开是异步的,所以不能直接读
if(typeof this.fd !== 'number'){ //如果fd不是number类型,证明文件还没有打开,此时需要监听一次open事件,因为文件一打开,就会触发open事件,这个在this.open()里写了
return this.once('open',() => this.read()); // 等待着触发open事件后fd肯定拿到了,拿到以后再去执行read方法
}
// 当获取到fd时 开始读取文件了
// 第一次应该读2个 第二次应该读2个
// 第二次pos的值是4 end是4
// 读取文件里一共4有个数为123 4,我们读取里面的123 4
let howMuchToRead = this.end?Math.min(this.end-this.pos+1,this.highWaterMark): this.highWaterMark;//规定每次读取多少个字节
fs.read(this.fd,this.buffer,howMuchToRead,this.pos,(error,byteRead) => { // byteRead为真实的读到了几个字节的内容
// 读取完毕
this.pos += byteRead; // 读出来两个,pos位置就往后移两位
// this.buffer默认就是三个
let b = this.encoding ? this.buffer.slice(0,byteRead).toString(this.encoding) : this.buffer.slice(0,byteRead);//对读出来的内容进行编码
this.emit('data',b);//触发data事件,将读到的内容输出给用户
if ((byteRead === this.highWaterMark)&&this.flowing){
return this.read(); // 继续读
}
// 这里就是没有更多的逻辑了
if (byteRead < this.highWaterMark){
// 没有更多了
this.emit('end'); // 读取完毕
this.destroy(); // 销毁即可
}
});
}
// 打开文件用的
destroy() {
if (typeof this.fd != 'number') { return this.emit('close'); } //如果文件还没打开,直接触发close事件
fs.close(this.fd,() => {
// 如果文件打开过了 那就关闭文件并且触发close事件
this.emit('close');
});
}
open() {
fs.open(this.path,this.flags,fd) => { //fd是文件描述符,它标识的就是当前this.path这个文件,从3开始(number类型)
if (err) {
if (this.autoClose) { // 如果需要自动关闭我再去销毁fd
this.destroy(); // 销毁(关闭文件,触发关闭事件)
}
this.emit('error',err); // 如果有错误触发error事件
return;
}
this.fd = fd; // 保存文件描述符
this.emit('open',this.fd); // 文件被打开了,触发文件被打开的方法
});
}
pipe(dest){//管道流的实现 pipe()方法是ReadStream下的方法,它里面的参数是WritableStream
this.on('data',(data)=>{
let flag = dest.write(data);
if(!flag){//这个flag就是每次调用ws.write()后返回的读状态值
this.pause();// 已经不能继续写了,等他写完了再恢复
}
});
dest.on('drain',()=>{//当读取缓存区清空后
console.log('写一下停一下')
this.resume();//继续往dest写入数据
});
}
}
module.exports = ReadStream;//导出可读流
使用fs.createReadStream()
四、可写流(Writable Stream)
可写流是对数据流向设备的抽象,用来消费上游流过来的数据,通过可写流程序可以把数据写入设备,常见的是本地磁盘文件或者 TCP、HTTP 等网络响应。
常见的可写流:
- HTTP requests,on the client
- HTTP responses,on the server
- fs write streams
- zlib streams
- crypto streams
- TCP sockets
- child process stdin
- process.stdout,process.stderr
所有 Writable 流都实现了 stream.Writable 类定义的接口。
可写流的使用
调用可写流实例的 write() 方法就可以把数据写入可写流
rs.on('data',chunk => {
ws.write(chunk); // 写入数据
});
监听了可读流的data事件就会使可读流进入流动模式,我们在回调事件里调用了可写流的 write() 方法,这样数据就被写入了可写流抽象的设备destPath中。
write() 方法有三个参数
- chunk {String| Buffer},表示要写入的数据
- encoding 当写入的数据是字符串的时候可以设置编码
- callback 数据被写入之后的回调函数
drain事件
如果调用 stream.write(chunk)方法返回false,表示当前缓存区已满,流将在适当的时机(缓存区清空后)触发drain事件。
rs.on('data',chunk => {
let flag = ws.write(chunk); // 写入数据
if (!flag) { // 如果缓存区已满暂停读取
rs.pause();
}
});
ws.on('drain',() => {
rs.resume(); // 缓存区已清空 继续读取写入
});
fs.createWriteStream(path[,options])源码实现
// 第一次写入是真的往文件里写
this.writing = false; // 默认第一次就不是正在写入
// 用简单的数组来模拟一下缓存
this.cache = [];
// 维护一个变量,表示缓存的长度
this.len = 0;
// 是否触发drain事件
this.needDrain = false;
}
clearBuffer() {
let buffer = this.cache.shift();
if (buffer) { // 如果缓存里有
this._write(buffer.chunk,buffer.encoding,() => this.clearBuffer());
} else {// 如果缓存里没有了
if (this.needDrain) { // 需要触发drain事件
this.writing = false; // 告诉下次直接写就可以了 不需要写到内存中了
this.needDrain = false;
this.emit('drain');
}
}
}
_write(chunk,encoding,clearBuffer) { // 因为write方法是同步调用的此时fd还没有获取到,所以等待获取到再执行write操作
if (typeof this.fd != 'number') {
return this.once('open',() => this._write(chunk,clearBuffer));
}
fs.write(this.fd,chunk,chunk.length,byteWritten) => {
this.pos += byteWritten;
this.len -= byteWritten; // 每次写入后就要在内存中减少一下
clearBuffer(); // 第一次就写完了
})
}
write(chunk,encoding = this.encoding) { // 客户调用的是write方法去写入内容
// 要判断 chunk必须是buffer或者字符串 为了统一,如果传递的是字符串也要转成buffer
chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk,encoding);
this.len += chunk.length; // 维护缓存的长度 3
let ret = this.len < this.highWaterMark;
if (!ret) {
this.needDrain = true; // 表示需要触发drain事件
}
if (this.writing) { // 表示正在写入,应该放到内存中
this.cache.push({
chunk,});
} else { // 第一次
this.writing = true;
this._write(chunk,() => this.clearBuffer()); // 专门实现写的方法
}
return ret; // 能不能继续写了,false表示下次的写的时候就要占用更多内存了
}
destroy() {
if (typeof this.fd != 'number') {
this.emit('close');
} else {
fs.close(this.fd,() => {
this.emit('close');
});
}
}
open() {
fs.open(this.path,this.mode,fd) => {
if (err) {
this.emit('error',err);
if (this.autoClose) {
this.destroy(); // 如果自动关闭就销毁文件描述符
}
return;
}
this.fd = fd;
this.emit('open',this.fd);
});
}
}
module.exports = WriteStream;
使用fs.createWriteStream()
总结
stream(流)分为可读流(flowing mode和paused mode)、可写流、可读写流,Node.js 提供了多种流对象。 例如, HTTP 请求 和 process.stdout 就都是流的实例。stream 模块提供了基础的 API 。使用这些 API 可以很容易地来构建实现流接口的对象。它们底层都调用了stream模块并进行封装。
好了,以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对编程之家的支持。
原文链接:https://www.f2er.com/nodejs/31971.html