资讯专栏INFORMATION COLUMN

node那点事(一) -- Readable streams(可读流)

rickchen / 948人阅读

摘要:流的类型中有四种基本的流类型可读的流例如可写的流例如可读写的流例如在读写过程中可以修改和变换数据的流例如可读流可读流有两种模式流动模式可读流自动读取数据,通过接口的事件尽快将数据提供给应用。

流的简介

流(stream)在 Node.js 中是处理流数据的抽象接口(abstract interface)。 stream 模块提供了基础的 API 。使用这些 API 可以很容易地来构建实现流接口的对象。

Node.js 提供了多种流对象。 例如, HTTP 请求 和 process.stdout 就都是流的实例。

流可以是可读的、可写的,或是可读写的。所有的流都是 EventEmitter 的实例。

为什么要用流

这里我们举一个简单的例子:

我们打算读取一个文件,使用 fs.readFileSync 同步读取一个文件,程序会被阻塞,所有的数据都会被读取到内存中。

换用 fs.readFile 读取文件,程序不会被阻塞,但是所有的数据依旧会被一次性全部被读取到内存中。

当处理大文件压缩、归档、媒体文件和巨大的日志文件的时候,内存使用就成了问题,现在大家一般家用机内存大多数都是8G、16G,软件包还在日益增大,在这种情况下,流的优势就体现出来了。

流被设计为异步的方式,在内存中只开启一个固定的空间,将文件化整为零,以流动的方式进行传输操作,解决了以上问题。

流的类型

Node.js 中有四种基本的流类型:

Readable - 可读的流 (例如 fs.createReadStream()).

Writable - 可写的流 (例如 fs.createWriteStream()).

Duplex - 可读写的流 (例如 net.Socket).

Transform - 在读写过程中可以修改和变换数据的 Duplex 流 (例如 zlib.createDeflate()).

可读流(Readable Stream)

可读流有两种模式:

1、流动模式(flowing):可读流自动读取数据,通过EventEmitter接口的事件尽快将数据提供给应用。

2、暂停模式(paused):必须显式调用stream.read()方法来从流中读取数据片段。

可以通过三种途径切换到流动模式:

监听 "data" 事件

调用 stream.resume() 方法

调用 stream.pipe() 方法将数据发送到 Writable

流动模式切换到暂停模式的api有:

如果不存在管道目标,调用stream.pause()方法

如果存在管道目标,调用 stream.unpipe()并取消"data"事件监听

可读流事件:"data","readable","error","close","end"

我们可以想象下家用热水器的模型,热水器的水箱(buffer缓存区)里面存着热水(数据),在我们用热水的时候,开启水龙头,自来水会不断的进入水箱,再从水箱由水龙头流出来供我们使用。这就是进入了“flowing”模式。当我们关闭水龙头时候,水箱则会暂停进水,水龙头也会暂停出水,这是就进入了“paused”模式。

flowing模式
const fs = require("fs")
const path = require("path")
const rs = fs.createReadStream(path.join(__dirname, "./1.txt"))

rs.setEncoding("utf8")

rs.on("data", (data) => {
    console.log(data)
})
paused模式
const fs = require("fs")
const path = require("path")
const rs = fs.createReadStream(path.join(__dirname, "./1.txt"))

rs.setEncoding("utf8")

rs.on("readable", () => {
    let d = rs.read(1)
    console.log(d)
})
实现原理 流动模式原理

我们来实现一个简单的流动模式下的可读流介绍其原理,由NODEJS官方文档可知,流继承自EventEmitter模块,然后我们定义一些默认参数、缓存区、模式:

let EventEmitter = require("events");
let fs = require("fs");

class ReadStream extends EventEmitter {
    constructor(path,options) {
        super();
        this.path = path;
        this.flags = options.flags || "r";
        this.autoClose = options.autoClose || true;
        this.highWaterMark = options.highWaterMark|| 64*1024;
        this.start = options.start||0;
        this.end = options.end;
        this.encoding = options.encoding || null
        
        this.buffer = Buffer.alloc(this.highWaterMark);//定义缓存区大小
        
        this.pos = this.start; // pos 读取的位置 可变 start不变的
        
        this.flowing = null; // null就是暂停模式
    }
}

module.exports = ReadStream;

接着在我们需要定义一个打开文件的方法用于打开文件。还有一个一个destroy方法,用于在文件操作出错或者读完之后关闭文件。

open(){
    fs.open(this.path,this.flags,(err,fd)=>{
        if(err){
            this.emit("error",err);
            if(this.autoClose){ // 是否自动关闭
                this.destroy();
            }
            return;
        }
        this.fd = fd; // 保存文件描述符
        this.emit("open"); // 文件打开了
    });
}
 destroy(){
    // 先判断有没有fd 有关闭文件 触发close事件
    if(typeof this.fd ==="number"){
        fs.close(this.fd,()=>{
            this.emit("close");
        });
        return;
    }
    this.emit("close"); // 销毁
}

接着要在构造函数中调用open方法,当用户绑定data监听时,修改可读流的模式:

constructor(path,options){
    super();
    this.path = path;
    this.flags = options.flags || "r";
    this.autoClose = options.autoClose || true;
    this.highWaterMark = options.highWaterMark|| 64*1024;
    this.start = options.start||0;
    this.end = options.end;
    this.encoding = options.encoding || null
    this.flowing = null; 
    this.buffer = Buffer.alloc(this.highWaterMark);
    this.pos = this.start;
    
    this.open();//打开文件 fd
    this.on("newListener",(eventName,callback)=>{
        if(eventName === "data"){
            // 相当于用户监听了data事件
            this.flowing  = true;
            // 监听了 就去读
            this.read(); // 去读内容了
        }
    })
}

接下来我们实现最总要的read方法,首先要保证文件已经打开,接着镀组文件进入缓存,触发data事件传入数据,如果处于流动模式,继续读取直到读完文件。

read(){
    // 此时文件还没打开呢
    if(typeof this.fd !== "number"){
        // 当文件真正打开的时候 会触发open事件,触发事件后再执行read,此时fd肯定有了
        return this.once("open",()=>this.read())
    }
    // 此时有fd了
    // 应该填highWaterMark?
    // 想读4个 写的是3  每次读3个
    // 123 4
    let howMuchToRead = this.end?Math.min(this.highWaterMark,this.end-this.pos+1):this.highWaterMark;
    fs.read(this.fd,this.buffer,0,howMuchToRead,this.pos,(err,bytesRead)=>{
        // 读到了多少个 累加
        if(bytesRead>0){
            this.pos+= bytesRead;
            let data = this.encoding?this.buffer.slice(0,bytesRead).toString(this.encoding):this.buffer.slice(0,bytesRead);
            this.emit("data",data);
            // 当读取的位置 大于了末尾 就是读取完毕了
            if(this.pos > this.end){
                this.emit("end");
                this.destroy();
            }
            if(this.flowing) { // 流动模式继续触发
                this.read(); 
            }
        }else{
            this.emit("end");
            this.destroy();
        }
    });
}

剩下的pause和resume方法,很简单

resume() {
    this.flowing = true;
    this.read();
}
pause() {
    this.flowing = false;
}

简单的流实现完成了,看一下完整代码

let EventEmitter = require("events");
let fs = require("fs");

class ReadStream extends EventEmitter {
    constructor(path, options) {
        super();
        this.path = path;
        this.flags = options.flags || "r";
        this.autoClose = options.autoClose || true;
        this.highWaterMark = options.highWaterMark|| 64*1024;
        this.start = options.start||0;
        this.end = options.end;
        this.encoding = options.encoding || null

        this.open();

        this.flowing = null; // null就是暂停模式

        this.buffer = Buffer.alloc(this.highWaterMark);

        this.pos = this.start; 
        this.on("newListener", (eventName,callback) => {
            if (eventName === "data") {
                this.flowing  = true;
                this.read(); 
            }
        })
    }
    
    read(){
        if (typeof this.fd !== "number") {
            return this.once("open", () => this.read())
        }
        let howMuchToRead = this.end ? Math.min(this.highWaterMark, this.end - this.pos+1) : this.highWaterMark;
        fs.read(this.fd, this.buffer, 0, howMuchToRead, this.pos, (err,bytesRead) => {
            if (bytesRead > 0) {
                this.pos += bytesRead;
                let data = this.encoding ? this.buffer.slice(0, bytesRead).toString(this.encoding) : this.buffer.slice(0, bytesRead);
                this.emit("data", data);
                if(this.pos > this.end){
                    this.emit("end");
                    this.destroy();
                }
                if(this.flowing) { 
                    this.read(); 
                }
            }else{
                this.emit("end");
                this.destroy();
            }
        });
    }
    
    resume() {
        this.flowing = true;
        this.read();
    }
    
    pause() {
        this.flowing = false;
    }
    
    destroy() {
        if(typeof this.fd === "number") {
            fs.close(this.fd, () => {
                this.emit("close");
            });
            return;
        }
        this.emit("close"); 
    };
    
    open() {
        fs.open(this.path, this.flags, (err,fd) => {
            if (err) {
                this.emit("error", err);
                if (this.autoClose) { 
                    this.destroy();
                }
                return;
            }
            this.fd = fd; 
            this.emit("open"); 
        });
    }
}
module.exports = ReadStream;
暂停模式原理

以上是流动模式的可读流实现原理,暂停模式的可读流原理与流动模式的主要区别在于监听readable事件的绑定与read方法,先实现监听绑定readable事件回调函数时,调用read方法读取数据到缓存区,定义一个读取方法_read

constructor(path, options) {
    super();
    this.path = path;
    this.highWaterMark = options.highWaterMark || 64 * 1024;
    this.autoClose = options.autoClose || true;
    this.start = 0;
    this.end = options.end;
    this.flags = options.flags || "r";

    this.buffers = []; // 缓存区 
    this.pos = this.start;
    this.length = 0; // 缓存区大小
    this.emittedReadable = false;
    this.reading = false; // 不是正在读取的
    this.open();
    this.on("newListener", (eventName) => {
        if (eventName === "readable") {
            this.read();
        }
    })
}

read(n) {
    if (this.length == 0) {
        this.emittedReadable = true;
    }
    if (this.length < this.highWaterMark) {
        if(!this.reading) {
            this.reading = true;
            this._read(); 
        }
    }
}

_read() {
    if (typeof this.fd !== "number") {
        return this.once("open", () => this._read());
    }
    let buffer = Buffer.alloc(this.highWaterMark);
    fs.read(this.fd, buffer, 0, buffer.length, this.pos, (err, bytesRead) => {
        if (bytesRead > 0) {
            this.buffers.push(buffer.slice(0, bytesRead));
            this.pos += bytesRead;
            this.length += bytesRead;
            this.reading = false;
            if (this.emittedReadable) {
                this.emittedReadable = false; 
                this.emit("readable");
            }
        } else {
            this.emit("end");
            this.destroy();
        }
    })
}

由api可知,暂停模式下的可读流手动调用read方法参数可以大于highWaterMark,为了处理这种情况,我们先写一个函数computeNewHighWaterMark,取到大于等于n的最小2的n次方的整数

function computeNewHighWaterMark(n) {
      n--;
      n |= n >>> 1;
      n |= n >>> 2;
      n |= n >>> 4;
      n |= n >>> 8;
      n |= n >>> 16;
      n++;
     return n;
  }

然后写read方法,要考虑全n的各种情况,上代码

read(n) { 

    if(n>this.length){
        // 更改缓存区大小  读取五个就找 2的几次放最近的
        this.highWaterMark = computeNewHighWaterMark(n)
        this.emittedReadable = true;
        this._read();
    }


    // 如果n>0 去缓存区中取吧
    let buffer=null;
    let index = 0; // 维护buffer的索引的
    let flag = true;
    if (n > 0 && n <= this.length) { // 读的内容 缓存区中有这么多
        // 在缓存区中取 [[2,3],[4,5,6]]
        buffer = Buffer.alloc(n); // 这是要返回的buffer
        let buf;
        while (flag&&(buf = this.buffers.shift())) {
            for (let i = 0; i < buf.length; i++) {
                buffer[index++] = buf[i];
                if(index === n){ // 拷贝够了 不需要拷贝了
                    flag = false;
                    this.length -= n;
                    let bufferArr = buf.slice(i+1); // 取出留下的部分
                    // 如果有剩下的内容 在放入到缓存中
                    if(bufferArr.length > 0){
                        this.buffers.unshift(bufferArr);
                    }
                    break;
                }
            }
        }
    }
    // 当前缓存区 小于highWaterMark时在去读取
    if (this.length == 0) {
        this.emittedReadable = true;
    }
    if (this.length < this.highWaterMark) {
        if(!this.reading){
            this.reading = true;
            this._read(); // 异步的
        }
    }
    return buffer
}

附上可读流暂停模式的完整实现原理代码

let fs = require("fs");
let EventEmitter = require("events");
function computeNewHighWaterMark(n) {
      n--;
      n |= n >>> 1;
      n |= n >>> 2;
      n |= n >>> 4;
      n |= n >>> 8;
      n |= n >>> 16;
      n++;
     return n;
  }
class ReadStream extends EventEmitter {
    constructor(path, options) {
        super();
        this.path = path;
        this.highWaterMark = options.highWaterMark || 64 * 1024;
        this.autoClose = options.autoClose || true;
        this.start = 0;
        this.end = options.end;
        this.flags = options.flags || "r";

        this.buffers = []; // 缓存区 
        this.pos = this.start;
        this.length = 0; // 缓存区大小
        this.emittedReadable = false;
        this.reading = false; // 不是正在读取的
        this.open();
        this.on("newListener", (eventName) => {
            if (eventName === "readable") {
                this.read();
            }
        })
    }
    read(n) { 

        if(n>this.length){
            // 更改缓存区大小  读取五个就找 2的几次放最近的
            this.highWaterMark = computeNewHighWaterMark(n)
            this.emittedReadable = true;
            this._read();
        }


        // 如果n>0 去缓存区中取吧
        let buffer=null;
        let index = 0; // 维护buffer的索引的
        let flag = true;
        if (n > 0 && n <= this.length) { // 读的内容 缓存区中有这么多
            // 在缓存区中取 [[2,3],[4,5,6]]
            buffer = Buffer.alloc(n); // 这是要返回的buffer
            let buf;
            while (flag&&(buf = this.buffers.shift())) {
                for (let i = 0; i < buf.length; i++) {
                    buffer[index++] = buf[i];
                    if(index === n){ // 拷贝够了 不需要拷贝了
                        flag = false;
                        this.length -= n;
                        let bufferArr = buf.slice(i+1); // 取出留下的部分
                        // 如果有剩下的内容 在放入到缓存中
                        if(bufferArr.length > 0){
                            this.buffers.unshift(bufferArr);
                        }
                        break;
                    }
                }
            }
        }
        // 当前缓存区 小于highWaterMark时在去读取
        if (this.length == 0) {
            this.emittedReadable = true;
        }
        if (this.length < this.highWaterMark) {
            if(!this.reading){
                this.reading = true;
                this._read(); // 异步的
            }
        }
        return buffer
    }
    // 封装的读取的方法
    _read() {
        // 当文件打开后在去读取
        if (typeof this.fd !== "number") {
            return this.once("open", () => this._read());
        }
        // 上来我要喝水 先倒三升水 []
        let buffer = Buffer.alloc(this.highWaterMark);
        fs.read(this.fd, buffer, 0, buffer.length, this.pos, (err, bytesRead) => {
            if (bytesRead > 0) {
                // 默认读取的内容放到缓存区中
                this.buffers.push(buffer.slice(0, bytesRead));
                this.pos += bytesRead; // 维护读取的索引
                this.length += bytesRead;// 维护缓存区的大小
                this.reading = false;
                // 是否需要触发readable事件
                if (this.emittedReadable) {
                    this.emittedReadable = false; // 下次默认不触发
                    this.emit("readable");
                }
            } else {
                this.emit("end");
                this.destroy();
            }
        })
    }
    destroy() {
        if (typeof this.fd !== "number") {
            return this.emit("close")
        }
        fs.close(this.fd, () => {
            this.emit("close")
        })
    }
    open() {
        fs.open(this.path, this.flags, (err, fd) => {
            if (err) {
                this.emit("error", err);
                if (this.autoClose) {
                    this.destroy();
                }
                return
            }
            this.fd = fd;
            this.emit("open");
        });
    }
}

module.exports = ReadStream;

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/93921.html

相关文章

  • node点事(二) -- Writable streams(可写流)、自定义流

    摘要:可写流可写流是对数据写入目的地的一种抽象。对象流的特点就是它有一个标志,我们可以设置它让流可以接受任何对象。 可写流(Writable Stream) 可写流是对数据写入目的地的一种抽象。 可写流的原理其实与可读流类似,当数据过来的时候会写入缓存池,当写入的速度很慢或者写入暂停时候,数据流便会进入到队列池缓存起来,当然即使缓存池满了,剩余的数据也是存在内存 可写流的简单用法如下代码 l...

    mtunique 评论0 收藏0
  • 通过源码解析 Node.js 中导流(pipe)的实现

    摘要:回调函数中检测该次写入是否被缓冲,若是,触发事件。若目标可写流表示该写入操作需要进行缓冲,则立刻将源可读流切换至暂停模式。监听源可读流的事件,相应地结束目标可写流。 在Node.js中,流(Stream)是其众多原生对象的基类,它对处理潜在的大文件提供了支持,也抽象了一些场景下的数据处理和传递。在它对外暴露的接口中,最为神奇的,莫过于导流(pipe)方法了。鉴于近期自己正在阅读Node...

    defcon 评论0 收藏0
  • Node.js中流的使用

    摘要:流是基于事件的用于管理和处理数据而且有不错的效率借助事件和非阻塞库流模块允许在其可用的时候动态处理在其不需要的时候释放掉使用流的好处举一个读取文件的例子使用同步读取一个文件程序会被阻塞所有的数据都会被读取到内存中换用读取文件程序不会被阻塞但 流是基于事件的API,用于管理和处理数据,而且有不错的效率.借助事件和非阻塞I/O库,流模块允许在其可用的时候动态处理,在其不需要的时候释放掉. ...

    h9911 评论0 收藏0
  • 浅谈node.js中的stream(流)

    摘要:在可读流事件里我们就必须调用方法。当一个对象就意味着我们想发出信号这个流没有更多数据了自定义可写流为了实现可写流,我们需要使用流模块中的构造函数。我们只需给构造函数传递一些选项并创建一个对象。 前言 什么是流呢?看字面意思,我们可能会想起生活中的水流,电流。但是流不是水也不是电,它只是描述水和电的流动;所以说流是抽象的。在node.js中流是一个抽象接口,它不关心文件内容,只关注是否从...

    elliott_hu 评论0 收藏0
  • [转]nodejs Stream使用手册

    摘要:方法也可以接收一个参数表示数据请求着请求的数据大小,但是可读流可以根据需要忽略这个参数。读取数据大部分情况下我们只要简单的使用方法将可读流的数据重定向到另外形式的流,但是在某些情况下也许直接从可读流中读取数据更有用。 介绍本文介绍了使用 node.js streams 开发程序的基本方法。 We should have some ways of connecting programs ...

    luffyZh 评论0 收藏0

发表评论

0条评论

rickchen

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<