资讯专栏INFORMATION COLUMN

Rx.js使用之结合node的读写流进行数据处理

Tecode / 2844人阅读

摘要:的读写流模块提供来创建行读取流,即读取文件的每一行作为持续的输入数据模块提供来创建写入流,其返回的有和两个方法,来完成流式的写入与结束写入。

前几天接到任务要使用第三方API处理几千张图片,得到结果集。我的做法就是使用Rx.js结合node的读写流来完成数据读入、接口请求、数据处理、数据写入这些操作。本篇就来分享这个代码和其逻辑。

Rx.js是什么

Rx.js是一个响应式编程库,能简化事件/异步处理逻辑代码。其视所有的事件/数据为__流__,提供各种流处理的operators,将输入与输出平滑的链接起来。可以类比为linux上的pipe操作符: ls | grep a*b | less

Node的读写流

readline模块提供readline.createInterface来创建行读取流,即读取文件的每一行作为持续的输入数据

fs模块提供fs.createWriteStream来创建写入流, 其返回的writerwriteend两个方法,来完成流式的写入与结束写入。

第三方接口的使用情况

并发数有限制,3个是出现其出现并发错误概率最低的最大并发数

接口请求过于频繁,会较大概率出现连续的并发错误, 大概延迟400秒效果尚可

提供给第三方的图片是链接,其需要服务器自己下载,会出现操作超时或者长时间不返回的情况。

任务列表

从文件读取图片文件名

拼接url

发送3个并发请求

请求出现超时问题重试3次,最后如果失败则放弃

出现非超时错误(如并发错误等)则一直重试,直到成功

请求成功后延迟400秒继续发起下一个请求

处理返回的数据

写入文件

代码分析 引入依赖,创建读取与写入流
const https = require("https");
const querystring = require("querystring");
const Rx = require("rxjs");
const readline = require("readline");
const fs = require("fs");

const imgStream = readline.createInterface({  // 创建行读取流
    input: fs.createReadStream("filelist.txt")
});

const writeStream = fs.createWriteStream("output.txt");  // 创建写入流
使用Rx处理读取并反馈结果给写入
Rx.Observable.fromEvent(imgStream, "line")  // 将行读取流转化为Rx的事件流
.takeUntil(Rx.Observable.fromEvent(imgStream, "close"))  // 读取流截止时终止Rx流
.map(img => generateData(img))  // 将文件名处理成post的数据
 // 发起请求,并发3个,请求返回后延迟400ms后再进行下一步处理并发起下一个请求
.mergeMap(data => requestAPI(data).delay(400), (o, i) => i, 3) 
.subscribe(data => {
    // 处理数据并写入文件
    let str = data.url;
    if (data.status === 200 && data.data.xxx.length) {
        zzz = data.data.xxx.map(x => x.zzz);
        str += `    ${JSON.stringify(zzz)}`;
    }
    writeStream.write(`${str}
`);
}, (err) => {
    console.log(err);
    console.log("!!!!!!!!!!!ERROR!!!!!!!!!");
}, () => {
    console.log("=====complete======");
    writeStream.end();
});

其中的需要关注的点在.mergeMap(data => requestAPI(data).delay(400), (o, i) => i, 3) ,这里内部requestAPI返回一个封装了http异步请求并延迟400ms的Rx流,当请求完成并延迟完成后将数据返回上一层继续进行处理(可以类比为Promisethen)

使用Rx的自定义流封装一个带错误重试机制的http请求
const requestFacepp = dataStr => {
    const options = {
        hostname: "api.xxx.com",
        port: 443,
        path: "/xxx",
        method: "POST",
        headers: {
            "Content-Type": "application/x-www-form-urlencoded",
            "Content-Length": Buffer.byteLength(dataStr)
        }
    };
    const reqData = querystring.parse(dataStr);
    const retry$ = new Rx.Subject();  // 触发重试的流,当其发出数据时会使`retryWhen`触发重试错误流
    let retryTimes = 3;  // 设置非正常失败(超时)重试的上限

    // 使用Rx的自定义流封装一个带错误重试机制的http请求,可以类比为new Promise
    // 但要注意的是Rx是流,即数据是可以持续的,而Promise则只有一个结果和状态
    return Rx.Observable.create(observer => {
        const req = https.request(options, res => {
            let data = "";
            res.setEncoding("utf8");
            res.on("data", chunk => {
                data += chunk;
            });
            res.on("end", () => {
                if (res.statusCode === 200) {
                    // 请求正常返回,向流内推送结果并结束
                    observer.next({
                        status: res.statusCode,
                        url: reqData.image_url,
                        data: JSON.parse(data)
                    });
                    observer.complete();
                } else {
                    // 请求正常返回,但不是正常结果,抛出错误并重试
                    console.log(`retring: ${reqData.image_url}`);
                    observer.error({
                        status: res.statusCode,
                        url: reqData.image_url
                    });
                    retry$.next(true);
                }
            });
        });

        req.setTimeout(4000, () => {
            // 设置请求4s超时,超时后终止,引发请求抛错
            req.abort();
        });

        req.on("error", err => {
            console.log(`retring(${retryTimes}): ${reqData.image_url}`);
            // 请求抛错时重试,超出次数则终止本次请求
            observer.error(`error: ${err.message}`);
            if (retryTimes > 0) {
                retryTimes--;
                retry$.next(true);
            } else {
                retry$.complete();
            }
        });

        req.write(dataStr);

        req.end();
        return () => { req.abort() };  // 返回终止流的处理回调
    })
    .retryWhen(errs => errs.switchMap(err => {
        // 未超过次数返回重试流,超出则返回错误数据并终止本次Rx流
        return retryTimes > 0 ? retry$ : Rx.Observable.of({
            status: 500,
            url: reqData.image_url
        });
    }));
};
收尾

到此就搬砖完毕,开个车让他慢慢跑就可以了。
本篇展示了Rx在流数据处理与异步处理上的方式,逻辑与代码都挺清晰、扁平。在处理交杂的逻辑时也不错(重试部分)。如果喜欢或者有帮助的话可以后面在发一篇Rx在复杂DOM事件处理上的应用。;-)

本文始发于本人的公众号:枫之叶。公众号二维码

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/83331.html

相关文章

  • 认识node核心模块--从Buffer、Stream到fs

    摘要:端输入数据到端,对就是输入流,得到的对象就是可读流对就是输出端得到的对象是可写流。在中,这四种流都是的实例,它们都有事件,可读流具有监听数据到来的事件等,可写流则具有监听数据已传给低层系统的事件等,和都同时实现了和的事件和接口。 原文地址在我的博客 node中的Buffer和Stream会给刚接触Node的前端工程师们带来困惑,原因是前端并没有类似概念(or 有我们也没意识到)。然而,...

    TANKING 评论0 收藏0
  • Node.js学习路08——fs文件系统stream基本介绍

    摘要:中各种用于读取数据的对象对象描述用于读取文件代表客户端请求或服务器端响应代表一个端口对象用于创建子进程的标准输出流。如果子进程和父进程共享输入输出流,则子进程的标准输出流被废弃用于创建子进程的标准错误输出流。 9. stream流 fs模块中集中文件读写方法的区别 用途 使用异步方式 使用同步方式 将文件完整读入缓存区 readFile readFileSync 将文件部...

    BoYang 评论0 收藏0
  • 前端阅读笔记 2016-11-25

    摘要:为了防止某些文档或脚本加载别的域下的未知内容,防止造成泄露隐私,破坏系统等行为发生。模式构建函数响应式前端架构过程中学到的经验模式的不同之处在于,它主要专注于恰当地实现应用程序状态突变。严重情况下,会造成恶意的流量劫持等问题。 今天是编辑周刊的日子。所以文章很多和周刊一样。微信不能发链接,点了也木有用,所以请记得阅读原文~ 发个动图娱乐下: 使用 SVG 动画制作游戏 使用 GASP ...

    KoreyLee 评论0 收藏0
  • 初识 Node Stream

    摘要:是在完成处理数据块后需要调用的函数。这是写数据成功与否的标志。若要发出故障信号,请用错误对象调用回调函数。双工流的可读性和可写性操作完全独立于彼此。这仅仅是将两个特性组合成一个对象。 showImg(https://segmentfault.com/img/remote/1460000013228112?w=533&h=300); Streams 是一个数据集——和数组、字符串一样。不...

    fobnn 评论0 收藏0
  • 深入nodeTransform

    摘要:内部架构上图表示一个实例的组成部分部分缓冲数组内部函数部分缓冲链表内部函数实例必须实现的内部函数以及系统提供的回调函数。有三个参数,第一个为待处理的数据,第二个为编码,第三个为回调函数。 Transform流特性 在开发中直接接触Transform流的情况不是很多,往往是使用相对成熟的模块或者封装的API来完成流的处理,最为特殊的莫过于through2模块和gulp流操作。那么,Tra...

    williamwen1986 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<