资讯专栏INFORMATION COLUMN

[转]nodejs Stream使用手册

luffyZh / 604人阅读

摘要:方法也可以接收一个参数表示数据请求着请求的数据大小,但是可读流可以根据需要忽略这个参数。读取数据大部分情况下我们只要简单的使用方法将可读流的数据重定向到另外形式的流,但是在某些情况下也许直接从可读流中读取数据更有用。

介绍
本文介绍了使用 node.js streams 开发程序的基本方法。

"We should have some ways of connecting programs like garden hose--screw in
another segment when it becomes necessary to massage data in
another way. This is the way of IO also."

Doug McIlroy. October 11, 1964

最早接触Stream是从早期的unix开始的
数十年的实践证明Stream 思想可以很简单的开发出一些庞大的系统。在unix里,Stream是通过
|实现的;在node中,作为内置的stream模块,很多核心模块和三方模块都使用到。和unix一样,
node Stream主要的操作也是.pipe(),使用者可以使用反压力机制来控制读和写的平衡。
Stream 可以为开发者提供可以重复使用统一的接口,通过抽象的Stream接口来控制Stream之间的读写平衡。

为什么使用Stream
node中的I/O是异步的,因此对磁盘和网络的读写需要通过回调函数来读取数据,下面是一个文件下载服务器

的简单代码:

var http = require("http");
var fs = require("fs");

var server = http.createServer(function (req, res) {
    fs.readFile(__dirname + "/data.txt", function (err, data) {
        res.end(data);
    });
});
server.listen(8000);

这些代码可以实现需要的功能,但是服务在发送文件数据之前需要缓存整个文件数据到内存,如果"data.txt"文件很
大并且并发量很大的话,会浪费很多内存。因为用户需要等到整个文件缓存到内存才能接受的文件数据,这样导致
用户体验相当不好。不过还好(req, res)两个参数都是Stream,这样我们可以用fs.createReadStream()代替fs.readFile():

var http = require("http");
var fs = require("fs");

var server = http.createServer(function (req, res) {
    var stream = fs.createReadStream(__dirname + "/data.txt");
    stream.pipe(res);
});
server.listen(8000);

.pipe()方法监听fs.createReadStream()"data""end"事件,这样"data.txt"文件就不需要缓存整
个文件,当客户端连接完成之后马上可以发送一个数据块到客户端。使用.pipe()另一个好处是可以解决当客户
端延迟非常大时导致的读写不平衡问题。如果想压缩文件再发送,可以使用三方模块实现:

var http = require("http");
var fs = require("fs");
var oppressor = require("oppressor");

var server = http.createServer(function (req, res) {
    var stream = fs.createReadStream(__dirname + "/data.txt");
    stream.pipe(oppressor(req)).pipe(res);
});
server.listen(8000);

这样文件就会对支持gzipdeflate的浏览器进行压缩。oppressor 模块会处理所有的content-encoding

Stream使开发程序变得简单。

基础概念
有五种基本的Stream: readable, writable, transform, duplex, and"classic”.

pipe

所有类型的Stream收是使用 .pipe() 来创建一个输入输出对,接收一个可读流src并将其数据输出到可写流dst,如下:

src.pipe(dst)    

.pipe(dst)方法为返回dst流,这样就可以接连使用多个.pipe(),如下:

a.pipe(b).pipe(c).pipe(d)

功能与下面的代码相同:

a.pipe(b);
b.pipe(c);
c.pipe(d);

这样的用法十分类似于unix命令下面用法:

a | b | c | d
readable streams

通过调用Readable streams的 .pipe()方法可以把Readable streams的数据写入一个

Writable , Transform, 或者Duplex stream。

readableStream.pipe(dst)
创建 readable stream

这里我们创建一个readable stream!

var Readable = require("stream").Readable;

var rs = new Readable;
rs.push("beep ");
rs.push("boop
");
rs.push(null);

rs.pipe(process.stdout);


$ node read0.js
beep boop

rs.push(null) 通知数据接收者数据已经发送完毕.
注意到我们在将所有数据内容压入可读流之前并没有调用rs.pipe(process.stdout);,但是我们压入的所有数据
内容还是完全的输出了,这是因为可读流在接收者没有读取数据之前,会缓存所有压入的数据。但是在很多情况下,更好的方法是只有数据接收着请求数据的时候,才压入数据到可读流而不是缓存整个数据。下面我们重写 一下
._read()函数:

var Readable = require("stream").Readable;
var rs = Readable();

var c = 97;
rs._read = function () {
    rs.push(String.fromCharCode(c++));
    if (c > "z".charCodeAt(0)) rs.push(null);
};

rs.pipe(process.stdout);

$ node read1.js
abcdefghijklmnopqrstuvwxyz

上面的代码通过重写_read()方法实现了只有在数据接受者请求数据才向可读流中压入数据。_read()方法也可以接收一个size参数表示数据请求着请求的数据大小,但是可读流可以根据需要忽略这个参数。注意我们也可以用util.inherits()继承可读流。为了说明只有在数据接受者请求数据时_read()方法才被调用,我们在向可读流压入数据时做一个延时,如下:

var Readable = require("stream").Readable;
var rs = Readable();

var c = 97 - 1;

rs._read = function () {
    if (c >= "z".charCodeAt(0)) return rs.push(null);

    setTimeout(function () {
        rs.push(String.fromCharCode(++c));
    }, 100);
};

rs.pipe(process.stdout);

process.on("exit", function () {
    console.error("
_read() called " + (c - 97) + " times");
});
process.stdout.on("error", process.exit);

用下面的命令运行程序我们发现_read()方法只调用了5次:

$ node read2.js | head -c5
abcde
_read() called 5 times

使用计时器的原因是系统需要时间来发送信号来通知程序关闭管道。使用process.stdout.on("error", fn) 是为了处理系统因为header命令关闭管道而发送SIGPIPE信号,因为这样会导致process.stdout触发EPIPE事件。如果想创建一个的可以压入任意形式数据的可读流,只要在创建流的时候设置参数objectModetrue即可,例如:Readable({ objectMode: true })

读取readable stream数据

大部分情况下我们只要简单的使用pipe方法将可读流的数据重定向到另外形式的流,但是在某些情况下也许直接从可读流中读取数据更有用。如下所示:

process.stdin.on("readable", function () {
    var buf = process.stdin.read();
    console.dir(buf);
});



$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume0.js 



null

当可读流中有数据可读取时,流会触发"readable" 事件,这样就可以调用.read()方法来读取相关数据,当可读流中没有数据可读取时,.read() 会返回null,这样就可以结束.read() 的调用, 等待下一次"readable" 事件的触发。下面是一个使用.read(n)从标准输入每次读取3个字节的例子:

process.stdin.on("readable", function () {
    var buf = process.stdin.read(3);
    console.dir(buf);
});

如下运行程序发现,输出结果并不完全!

$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume1.js 



这是应为额外的数据数据留在流的内部缓冲区里了,而我们需要通知流我们要读取更多的数据.read(0) 可以达到这个目的。

process.stdin.on("readable", function () {
    var buf = process.stdin.read(3);
    console.dir(buf);
    process.stdin.read(0);
});

这次运行结果如下:

$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume2.js 



我们可以使用 .unshift() 将数据重新押回流数据队列的头部,这样可以接续读取押回的数据。如下面的代码,会按行输出标准输入的内容:

var offset = 0;

process.stdin.on("readable", function () {
    var buf = process.stdin.read();
    if (!buf) return;
    for (; offset < buf.length; offset++) {
        if (buf[offset] === 0x0a) {
            console.dir(buf.slice(0, offset).toString());
            buf = buf.slice(offset + 1);
            offset = 0;
            process.stdin.unshift(buf);
            return;
        }
    }
    process.stdin.unshift(buf);
});

$ tail -n +50000 /usr/share/dict/american-english | head -n10 | node lines.js 
"hearties"
"heartiest"
"heartily"
"heartiness"
"heartiness"s"
"heartland"
"heartland"s"
"heartlands"
"heartless"
"heartlessly"

当然,有很多模块可以实现这个功能,如:split

writable streams

重写 ._write(chunk, enc, next) 方法就可以接受一个readable stream的数据。

var Writable = require("stream").Writable;
var ws = Writable();
ws._write = function (chunk, enc, next) {
    console.dir(chunk);
    next();
};

process.stdin.pipe(ws);

$ (echo beep; sleep 1; echo boop) | node write0.js 


第一个参数chunk是数据输入者写入的数据。第二个参数end是数据的编码格式。第三个参数next(err)通过回调函数通知数据写入者可以写入更多的时间。如果readable stream写入的是字符串,那么字符串会默认转换为Buffer,如果在创建流的时候设置Writable({ decodeStrings: false })参数,那么不会做转换。如果readable stream写入的数据时对象,那么需要这样创建writable stream

Writable({ objectMode: true })

写数据到 writable stream

调用writable stream的.write(data)方法即可完成数据写入。

process.stdout.write("beep boop
");

调用.end()方法通知writable stream 数据已经写入完成。

var fs = require("fs");
var ws = fs.createWriteStream("message.txt");

ws.write("beep ");

setTimeout(function () {
    ws.end("boop
");
}, 1000);

$ node writing1.js 
$ cat message.txt
beep boop

如果需要设置writable stream的缓冲区的大小,那么在创建流的时候,需要设置opts.highWaterMark,这样如果缓冲区里的数据超过opts.highWaterMark.write(data)方法会返回false。当缓冲区可写的时候,writable stream会触发"drain" 事件。

classic streams

Classic streams比较老的接口了,最早出现在node 0.4版本中,但是了解一下其运行原理还是十分有好处的。当一个流被注册了"data"事件的回到函数,那么流就会工作在老版本模式下,即会使用老的API。

classic readable streams

Classic readable streams事件就是一个事件触发器,如果Classic readable streams有数据可读取,那么其触发 "data" 事件,等到数据读取完毕时,会触发"end" 事件。.pipe() 方法通过检查stream.readable的值确定流是否有数据可读。下面是一个使用Classic readable streams打印A-J字母的例子:

var Stream = require("stream");
var stream = new Stream;
stream.readable = true;

var c = 64;
var iv = setInterval(function () {
    if (++c >= 75) {
        clearInterval(iv);
        stream.emit("end");
    }
    else stream.emit("data", String.fromCharCode(c));
}, 100);

stream.pipe(process.stdout);


$ node classic0.js
ABCDEFGHIJ

如果要从classic readable stream中读取数据,注册"data""end"两个事件的回调函数即可,代码如下:

process.stdin.on("data", function (buf) {
    console.log(buf);
});
process.stdin.on("end", function () {
    console.log("__END__");
});


$ (echo beep; sleep 1; echo boop) | node classic1.js 


__END__

需要注意的是如果你使用这种方式读取数据,那么会失去使用新接口带来的好处。比如你在往一个
延迟非常大的流写数据时,需要注意读取数据和写数据的平衡问题,否则会导致大量数据缓存在内
存中,导致浪费大量内存。一般这时候强烈建议使用流的.pipe()方法,这样就不用自己监听"data"
"end"事件了,也不用担心读写不平衡的问题了。当然你也可以用 through代替自己监听"data""end" 事件,如下面的代码:

var through = require("through");
process.stdin.pipe(through(write, end));

function write (buf) {
    console.log(buf);
}
function end () {
    console.log("__END__");
}

$ (echo beep; sleep 1; echo boop) | node through.js 


__END__

或者也可以使用concat-stream来缓存整个流的内容:

var concat = require("concat-stream");
process.stdin.pipe(concat(function (body) {
    console.log(JSON.parse(body));
}));


$ echo "{"beep":"boop"}" | node concat.js 
{ beep: "boop" }

当然如果你非要自己监听"data""end"事件,那么你可以在写数据的流写的 时候使用.pause()方法暂停Classic readable streams继续触发"data" 事件。等到写数据的流可写的时候再使用.resume() 方法通知流继续触发"data" 事件继续读取数据。

classic writable streams

Classic writable streams 非常简单。只有 .write(buf), .end(buf).destroy()三个方法。.end(buf) 方法的buf参数是可选的,如果选择该参数,相当于stream.write(buf); stream.end() 这样的操作,需要注意的是当流的缓冲区写满即流不可写时.write(buf)方法会返回false,如果流再次可写时,流会触发drain事件。

transform

transform是一个对读入数据过滤然输出的流。

duplex

duplex stream是一个可读也可写的双向流,如下面的a就是一个duplex stream:

a.pipe(b).pipe(a)

read more

core stream documentation

You can use the readable-stream

本文翻译来自 https://github.com/substack/stream-handb...

本文转载来自 http://www.open-open.com/lib/view/open13...

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/79078.html

相关文章

  • Node.js 中的缓冲区(Buffer)究竟是什么?

    摘要:在创建时大小已经被确定且是无法调整的,在内存分配这块是由层面提供而不是具体后面会讲解。在这里不知道你是否认为这是很简单的但是上面提到的一些关键词二进制流缓冲区,这些又都是什么呢下面尝试做一些简单的介绍。 showImg(https://segmentfault.com/img/remote/1460000019894717?w=1280&h=850); 多数人都拥有自己不了解的能力和机...

    scwang90 评论0 收藏0
  • Node.js 中度体验

    摘要:创建简单应用使用指令来载入模块创建服务器使用方法创建服务器,并使用方法绑定端口。全局安装将安装包放在下。的核心就是事件触发与事件监听器功能的封装。通常我们用于从一个流中获取数据并将数据传递到另外一个流中。压缩文件为文件压缩完成。 创建简单应用 使用 require 指令来载入 http 模块 var http = require(http); 创建服务器 使用 http.create...

    CastlePeaK 评论0 收藏0
  • 【开源】小程序And公众号商城,外加后台,功能齐全!

    摘要:前言一个集微信公众号商城小程序商城商城后台的一个开源项目,后台是基于开发的,是一个简洁而强大的开源微信公众平台开发框架,微信功能插件化开发多公众号管理配置简单。微信小程序项目下载整个包之后,进行根目录文件夹。 前言 一个集微信公众号商城/小程序商城/商城后台的一个开源项目,后台是基于WeiPHP5.0开发的,WeiPHP是一个简洁而强大的开源微信公众平台开发框架,微信功能插件化开发,多...

    VishKozus 评论0 收藏0
  • 【开源】小程序And公众号商城,外加后台,功能齐全!

    摘要:前言一个集微信公众号商城小程序商城商城后台的一个开源项目,后台是基于开发的,是一个简洁而强大的开源微信公众平台开发框架,微信功能插件化开发多公众号管理配置简单。微信小程序项目下载整个包之后,进行根目录文件夹。 前言 一个集微信公众号商城/小程序商城/商城后台的一个开源项目,后台是基于WeiPHP5.0开发的,WeiPHP是一个简洁而强大的开源微信公众平台开发框架,微信功能插件化开发,多...

    linkFly 评论0 收藏0
  • 深入浅出nodeJS - 4 - (玩进程、测试、产品化)

    摘要:进程间通信的目的是为了让不同的进程能够互相访问资源,并进程协调工作。这个过程的示意图如下端口共同监听集群稳定之路进程事件自动重启负载均衡状态共享模块工作原理事件二测试单元测试性能测试三产品化项目工程化部署流程性能日志监控报警稳定性异构共存 内容 9.玩转进程10.测试11.产品化 一、玩转进程 node的单线程只不过是js层面的单线程,是基于V8引擎的单线程,因为,V8的缘故,前后...

    henry14 评论0 收藏0

发表评论

0条评论

luffyZh

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<