javascript – 如何将节点可读流转换为RX observable

前端之家收集整理的这篇文章主要介绍了javascript – 如何将节点可读流转换为RX observable前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。
如果我有一个Node js流,例如来自process.stdin或fs.createReadStream之类的东西,我如何使用RxJs5将其转换为RxJs Observable流?

我看到RxJs-Node有一个fromReadableStream方法,但看起来它在近一年内没有更新.

解决方法

对于任何寻找此项的人,请遵循Mark的建议,I adapted rx-node fromStream implementation for rxjs5.
import { Observable } from 'rxjs';

// Adapted from https://github.com/Reactive-Extensions/rx-node/blob/87589c07be626c32c842bdafa782fca5924e749c/index.js#L52
export default function fromStream(stream,finishEventName = 'end',dataEventName = 'data') {
  stream.pause();

  return new Observable((observer) => {
    function dataHandler(data) {
      observer.next(data);
    }

    function errorHandler(err) {
      observer.error(err);
    }

    function endHandler() {
      observer.complete();
    }

    stream.addListener(dataEventName,dataHandler);
    stream.addListener('error',errorHandler);
    stream.addListener(finishEventName,endHandler);

    stream.resume();

    return () => {
      stream.removeListener(dataEventName,dataHandler);
      stream.removeListener('error',errorHandler);
      stream.removeListener(finishEventName,endHandler);
    };
  }).share();
}

猜你在找的JavaScript相关文章