Node.js 中的 Stream

一、流的初印象

流,顾名思义,是流体的。与固体不同,它具有流动性,可轻易分离。

想象这样一个场景,我们要处理一大桶数据。

  • 情况一:待处理的数据被是一整个固体的形式,那么我们要获取它处理它都必须是一次性的。这是若程序只有一次性处理一个碗级别的数据的能力时,那么程序就崩溃了(内存溢出或未处理的数据丢失)。
  • 情况二:待处理的数据是一桶水(流),那么我们可以一次只倒出一碗水,待程序将碗中的数据处理完毕,我们将这碗水清空,便可继续处理下一碗的数据了。

至此,我们已经对流有了一个初印象。但本文要说的流不是流式的数据,而是可以接受或输出流式数据的容器,更类似于管道。

二、流的分类

任意一个流都是由一个可写端和一个可读端以及流内部的逻辑组成的。

  • 可读端:可以从流的可读端分步地拿到数据。
  • 可写端:可以通过可写端分步地往流内部写数据。
  • 内部逻辑:处理可写端写入的数据和负责往可读端输出数据。

流

在 Node.js 中,根据这三部分的性质不同,产生了一下这些类型的流。

1. Readable

Readable 流也称只读流, 使用 Readable 类可以实现自定义可读流。

  • 自定义 _read 方法来实现流的读取过程。
  • 调用 push 方法来实际触发一次流数据读取,触发 data 事件。
  • 使用 push(null) 来表示流读取完毕,触发 finish 事件。
const { Readable } = require('stream');

class StringReader extends Readable {
    constructor (str) {
        super();
        this._content = str;
    }
    _read () {
        for (let i = 0; i < this._content.length; i++) {
            this.push(this._content[i]);
        }
        this.push(null);
    }
}

const sr = new StringReader('qwert');

sr.on('data', (data) => {
    console.log('read:', data.toString());
});

sr.on('end', () => {
    console.log('end');
});

运行结果:

2. Writable

通过继承 Writable 类可以实现自定义可写流。

  • 自定义 _write(data, enc, next) 方法来实现流的写过程。
    • _write 函数中必须调用 next 函数来通知流写入下一个数据。
  • 调用 end 方法来结束流的写入,此时也可以传入 data 参数,表示最后一次传入可写流的数据。调用 end 会触发可写流的 finish 事件。
const { Writable } = require('stream');

class StringWriter extends Writable {
    constructor () {
        super();
        this.content = '';
    }
    _write (data, enc, next) {
        if (data) {
            console.log('write:', data.toString());
            this.content += data.toString();
        }
        next();
    }
}

const sr = new StringWriter('qwerty');

sr.on('finish', () => {
    console.log('finish:', sr.content);
});

sr.write('q');
sr.write('w');
sr.write('e');
sr.write('r');
sr.end('t');

运行结果:

3. Duplex

Duplex 继承了 Readable 和 Writable 的所有特性,因此它既可写也可读。使用它的方法是上面两个流的并集。

const { Duplex } = require('stream');

class MyDuplex extends Duplex {
    constructor (str) {
        super();
        this.readContent = str;
        this.writeContent = '';
    }
    _write (data, enc, next) {
        if (data) {
            console.log('write:', data.toString());
            this.writeContent += data.toString();
        }
        next();
    }
    _read () {
        for (let i = 0; i < this.readContent.length; i++) {
            this.push(this.readContent[i]);
        }
        this.push(null);
    }
}

const md = new MyDuplex('qwert');

md.on('data', (data) => {
    console.log('read:', data.toString());
});

md.on('end', () => {
    console.log('end');
});

md.on('finish', () => {
    console.log('finish:', md.writeContent);
});

md.write('q');
md.write('w');
md.write('e');
md.write('r');
md.end('t');

运行结果:

4. Transform

Transform 又是继承于 Duplex 的,Duplex 流的可读端和可写端是完全独立的,但 Transform 流会把自己可写端写入的内容经过一些处理输出到可读端。定义一个 Transform 流要做的就是:

  • 自定义 _transform(buf, enc, next) 方法来定义可读端输入结果的处理过程。
  • _transform 方法中调用 push 方法来将处理结果输出到可读端。
  • _transform 方法中调用 next 进行下一个处理。
const { Transform } = require('stream');

class MyTransform extends Transform {
    constructor () {
        super();
        this.content = '';
    }
    _transform (data, enc, next) {
        if (data) {
            console.log('write:', data.toString());
            const res = data.toString().toUpperCase();
            this.push(res);
            this.content += res;
        }
        next();
    }
}

const mt = new MyTransform('qwert');

mt.on('data', (data) => {
    console.log('read:', data.toString());
});

mt.on('end', () => {
    console.log('read end.');
});

mt.on('finish', () => {
    console.log('write finish.', mt.content);
});

mt.write('q');
mt.write('w');
mt.write('e');
mt.write('r');
mt.end('t');

运行结果:

三、objectMode

默认情况下流中传递的数据类型只能是 String 或 Buffer。但是在某些情况下,我们需要在流中传递复杂的对象,这时我们就需要在构造流时指定 objectMode,如下:

const {Readable} = require('stream');

const r = new Readable();
const ro = new Readable({objectMode: true});

r.push('a');
// r.push(1); 会报错,不接受 String 和 Buffer 以外的数据类型
r.push('sda');
r.push(null);

ro.push('a');
ro.push(1);
ro.push({});
ro.push(true);
ro.push(null);

r.on('data', console.log.bind(console));
ro.on('data', console.log.bind(console));

执行结果:

四、流和管道

我们在回顾一下流的模型:

流

若我们拥有两个流,一个可读,一个可写,那我们是否可以把两个流串起来,把可读流的可读端输出输入到可写流的可写端?

这就是最基本的一个 pipeline,我们把这个可读流的对读端数据写到可写流的可写端的操作叫做 pipe。我们很容易想到,通过这样的 pipe 操作,我们可以很轻易的把多个 Tansform 流串联成一个 pipeline。

这样的模型很容易让我们联想到中间件设计模式:

  • 我们的每个流都是负责一件独立的工作;
  • 每个流的输入和输出都符合约定的接口规范;
  • 每次这个 pipeline 的可写端写入数据,可读端便可获取到这份数据在整个 pipeline 中处理后的结果;

那么我们就可以随意拼接一些独立功能的子流来组成一个较复杂的功能。

在 Node.js 中,所有可读流原型上都有一个叫做 pipe 的方法,它接受一个可写流作为参数,并返回这个流以此让 pipe 方法可以被链式调用。

我们试着将上文例子中的只读流 pipe 到只写流中:

const { Readable } = require('stream');
const { Writable } = require('stream');

class StringReader extends Readable {
    constructor (str) {
        super();
        this._content = str;
    }
    _read () {
        for (let i = 0; i < this._content.length; i++) {
            this.push(this._content[i]);
        }
        this.push(null);
    }
}

class StringWriter extends Writable {
    constructor () {
        super();
        this.content = '';
    }
    _write (data, enc, next) {
        if (data) {
            console.log('write:', data.toString());
            this.content += data.toString();
        }
        next();
    }
}

const sr = new StringReader('qwert');

sr.on('data', (data) => {
    console.log('read:', data.toString());
});

sr.on('end', () => {
    console.log('end');
});

const sw = new StringWriter();

sw.on('finish', () => {
    console.log('finish:', sw.content);
});

sr.pipe(sw);

运行结果:

五、一些应用场景

1. gulp

在 gulp 中,它最核心的功能就是把一个文件以流的形式经过各种 中间件 流处理后保存到目标文件中:

gulp.src(somePath)
  .pipe(operatorA)
  .pipe(operatorB)
  .dest(someOtherPath);

现在我们就自己用流来实现一个简单版的 gulp 吧。

首先约定一个流之间传递数据的接口规范,在这里是指一个文件,我们声明一个文件流类:

const {Transform} = require('stream');
const fs = require('fs');
const nodePath = require('path');

module.exports = class File extends Transform {
    constructor (props) {
        super({
            objectMode: true,
            ...props
        });
    }
    _transform (data, enc, next) {
        if (data) {
            // 流把自己处理的结果输出到可读端
            this.push(this.handler(data));
        }
        next();
    }
    // 流的处理过程,默认不做任何处理
    handler (a) { return a; }
    dest (dir) {
        // 若使用 dest 接口,则把流的处理结果写到目标文件夹中
        this.on('data', ({
            path,
            content
        }) => {
            fs.writeFileSync(nodePath.join(dir, path), content);
        });
    }
};

gulp 入口对象:

// gulp.js
const File = require('./File');
const fs = require('fs');
const nodePath = require('path');

module.exports = {
    src (paths) {
        // 创建一个流
        const stream = new File();
        for (let i = 0; i < paths.length; i++) {
            const path = paths[i];
            // 把文件的内容写入到流的可写端
            stream.write({
                path,
                content: fs.readFileSync(nodePath.join(__dirname, path)).toString()
            });
        }
        return stream;
    }
};

两个中间处理流,他们都是 File 的实例,自定义了文件处理过程 :

// ./operator/toUpperCase.js

const File = require('../File');

const toUpperCase = new File();

module.exports = toUpperCase;

toUpperCase.handler = function (data) {
    if (data) {
        const {
            path,
            content
        } = data;
        console.log('[excute toUpperCase]:', data.path);
        const newContent = content.toUpperCase();
        return {
            path,
            content: newContent
        };
    }
};

// ./operator/toUpperCase.js

const File = require('../File');

const splitAndJoin = new File();

module.exports = splitAndJoin;

splitAndJoin.handler = function (data) {
    if (data) {
        const {
            path,
            content
        } = data;
        console.log('[excute splitAndJoin]:', data.path);
        const newContent = content.split('').join('-');
        return {
            path,
            content: newContent
        };
    }
};

程序入口:

const gulp = require('./gulp');
const toUpperCase = require('./opterators/toUpperCase');
const splitAndJoin = require('./opterators/splitAndJoin');

gulp.src(['./a', './b'])
.pipe(toUpperCase)
.pipe(splitAndJoin)
.dest('./dest');

结果:

// ./a
abc
// ./b
qwe
// ./dest/a
A-B-C
// ./dest/b
Q-W-E

以上就是我写的一个简单版的 gulp,它的核心思想就是规范一种对象格式,使文件作为流之间传递的数据,然后以中间件模式处理文件。

2. 响应式编程

响应式编程的一个核心概念点就是,数据的变化要引起绑定的 action 的执行。

// ./main.js
const Reactive = require('./reactive');

var a = 1;

const reactData = new Reactive({
    a
});

reactData.map((data) => {
    return {
        a: data.a + 2
    };
}).map((data) => {
    return {
        a: data.a * 3
    };
}).map((data) => {
    return console.log('reactive response:', data.a - 5);
});

for (let i = 1; i < 10; i++) {
    setTimeout(() => reactData.setState({
        a: ++a
    }));
}

下面我们就使用流来实现这个 reactive 吧!

// ./reactive.js
const {Transform} = require('stream');

module.exports = class Reactive extends Transform {
    constructor (obj) {
        super({
            objectMode: true
        });
        this.setState(obj);
    }
    _transform () {}
    setState (obj) {
        this.__state = Object.assign(this.__state || {}, obj);
        this.push(this.__state);
    }
    map (fn) {
        const s = new Reactive(fn(this.__state));
        s._transform = function (data, enc, next) {
            if (data) {
                this.push(fn(data));
            }
            next();
        };
        this.pipe(s);
        return s;
    }
};

执行结果:

知识共享许可协议
本作品采用 知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议 进行许可。