nodejs的流stream

buffer和stream

说道stream就不能不涉及到buffer,提到这2个东西首先得说一个都熟悉的场景-在线听歌或者在线看电影

在线看电影的时候,当你的网络带宽不足看高清视频或者突然下降到视频播放的比特率以下时,可能碰到缓冲中30%,50%最后100%

况,其实buffer缓冲区是为了流的消费者的体验服务的,比如当一首歌的音频流需要256kbps的速度播放,而你的带宽只有200kbps时,流可能会在开始播放之前30秒就预加载并放入缓冲区,然后你就一边听直到最后流下载完了你也听完了,缓冲区能提前加载你还没听到的部分,也有可能保存你听过的部分,这样你可以点击任意时间处可以从那个进度开始播放。说完这些,应该知道了流最大的好处就是你可以一边下载一边使用,看电影的时候不用下完了才看,可以边下边看

下载完电影再看就是同步的感觉,一边下载一边看就是一种异步的味道,仔细品位,facebook的bigpipe其实不也是利用了这样的一种思路去解决前端性能优化的问题吗。

而恰恰在nodejs中对于stream的应用使用的最多的方法就是pipe管道了

为什么使用nodejs中的stream

var http = require('http');  
var fs = require('fs');

var server = http.createServer(function (req, res) {  
    fs.readFile(__dirname + '/data.txt', function (err, data) {
        res.end(data);
    });
});
server.listen(8000);  

以上的代码中针对每个请求都会将data.txt文件读取到内存后才返回给用户,如果data.txt文件很大而且并发又高的情况下,就会吃很多内存

然而req和res参数都是实现了流的对象,因此代码可以修改为

var http = require('http');  
var fs = require('fs');

var server = http.createServer(function (req, res) {  
    var stream = fs.createReadStream(__dirname + '/data.txt');
    stream.pipe(res);
});
server.listen(8000);  

用fs.createReadStream替换fs.readFile,这里pipe方法会在fs.createReadStream方法中自动监听dataend事件,每次在data.txt中读取一点文件数据就返回给用户

使用pipe方法还有另外个好处,就是当用户网速比较慢的情况下,他会一点点接收到数据,但nodejs不会缓冲过多的数据到内存中

nodejs中的stream

nodejs中一共有5中stream: readable, writable, transform, duplex, 和 "classic".

所有的stream用pipe来配对输入和输出,即可读的流src向可写的流dest输出

src.pipe(dest)

pipe方法返回的又是dest流对象,因此可以链式调用

a.pipe(b).pipe(c).pipe(d)

#等价于

a.pipe(b);  
b.pipe(c);  
c.pipe(d);  

readable streams 可读流

readable的流通过pipe方法可以输出到writable, transform, 和 duplex 流

创建一个可读流

var Readable = require('stream').Readable;

var rs = new Readable;  
rs.push('beep ');  
rs.push('boop\n');  
rs.push(null);//告诉流的消费者输出已经结束

rs.pipe(process.stdout);  

writable stream 可写流

writable的流只能被写如但不能作为源读取

创建一个可写流

定义一个._write(chunk, enc, next)函数,然后你就可以往里面写入流

var Writable = require('stream').Writable;  
var ws = Writable();  
ws._write = function (chunk, enc, next) {  
    console.dir(chunk);
    next();
};

process.stdin.pipe(ws);  

第一个参数chunk是数据输入者写入的数据。第二个参数end是数据的编码格式。第三个参数next(err)通过回调函数通知数据写入者可以写入更多的数据,某些情况下如果给next函数传入一个error对象,就会触发stream的error事件,如果readable stream写入的是字符串,那么字符串会默认转换为Buffer,如果在创建流的时候设置Writable({ decodeStrings: false })参数,那么不会做转换.如果readable stream写入的数据有object对象,那么需要这样创建Writable({ objectMode: true })

向一个可写流中write数据

调用write方法

process.stdout.write('beep boop\n');  

调用end方法告诉可写流你的写入结束,同时可以在end方法中以参数的形式追加数据到末尾

var fs = require('fs');  
var ws = fs.createWriteStream('message.txt');

ws.write('beep ');

setTimeout(function () {  
    ws.end('boop\n');
}, 1000);

classic streams

Classic stream是比较老的接口了,最早出现在node 0.4版本中,当一个流被注册了"data" 事件的回调函数,那么流就会工作在老版本模式下,即会使用老的API

Classic readable streams事件就是一个事件触发器,如果Classic readable streams有数据可读取,那
么其触发 "data" 事件,等到数据读取完毕时,会触发"end" 事件。.pipe() 方法通过检查stream.readable的值是否为true确定流是否有数据可读

var Stream = require('stream');  
var stream = new Stream;  
stream.readable = true;

var c = 64;  
var iv = setInterval(function () {  
    if (++c >= 75) {
        clearInterval(iv);
        stream.emit('end');
    }
    else stream.emit('data', String.fromCharCode(c));
}, 100);

stream.pipe(process.stdout);  

示例打印出字母A到J

如果想要从classic的可读流从读取,只需要注册data和end的事件监听,例如用classic经典的api来从标准输入中读取

process.stdin.on('data', function (buf) {  
    console.log(buf);
});
process.stdin.on('end', function () {  
    console.log('__END__');
});
$ (echo beep; sleep 1; echo boop) | node classic1.js 
<Buffer 62 65 65 70 0a>  
<Buffer 62 6f 6f 70 0a>  
__END__  

如果你使用这种方式读取数据,那么会失去使用新的stream2 api接口带来的好处。比如你在往一个 延迟非常大的流写数据时,需要注意读取数据和写数据的平衡问题,否则会导致大量数据缓存在内 存中,导致浪费大量内存。一般这时候强烈建议使用流的.pipe()方法,这样就不用自己监听"data" 和"end"事件了,也不用担心读写不平衡的问题了

transform流

transform是一个对读入数据过滤然后输出的流

duplex流

duplex stream是一个可读也可写的双向流,如下面的a就是一个duplex stream

a.pipe(b).pipe(a)  

参考文档

作者:shaynegui
喜欢打德州,玩dota,听电音,web前端脑残粉
我的专栏 GitHub