javascript – 来自使用RxJS的请求流的响应的同步流

前端之家收集整理的这篇文章主要介绍了javascript – 来自使用RxJS的请求流的响应的同步流前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。

我是RxJS的新手,想知道是否有人可以帮助我.

我想从请求流(有效载荷数据)创建一个同步的响应流(最好带有相应的请求).

我基本上希望逐个发送请求,每个请求等待最后一个请求.

我尝试了这个,但它立即发送了所有内容(jsbin):

var requestStream,responseStream;
requestStream = Rx.Observable.from(['a','b','c','d','e']);

responseStream = requestStream.flatMap(
  sendRequest,(val,response)=>{ return {val,response}; }
);

responseStream.subscribe(
  item=>{
    console.log(item);
  },err => {
    console.err(err);
  },()=>{
    console.log('Done');
  }
);

function sendRequest(val) {
  return new Promise((resolve,reject)=>{
    setTimeout(()=>{resolve('result for '+val);},1000);
  });
};

以下工作在某种程度上,但不使用流作为请求数据(jsbin).

var data,responseStream;
data = ['a','e'];
responseStream = Rx.Observable.create(observer=>{
  var sendNext = function(){
    var val = data.shift();
    if (!val) {
      observer.onCompleted();
      return;
    }
    sendRequest(val).then(response=>{
      observer.onNext({val,response});
      sendNext();
    });
  };
  sendNext();
});

responseStream.subscribe(
  item=>{
    console.log(item);
  },reject)=>{
    setTimeout(()=>{resolve('response for '+val);},Math.random() * 2500 + 500);
  });
};

谢谢!

编辑:

只是为了澄清,这就是我想要实现的目标:

“发送A,当您收到A的响应时,发送B,当您收到B的响应时,发送C等……”

使用concatMap和defer,如user3743222所建议的,似乎是这样做的(jsbin):

responseStream = requestStream.concatMap(
  (val)=>{
    return Rx.Observable.defer(()=>{
      return sendRequest(val);
    });
  },response}; }
);
最佳答案
尝试在第一个代码示例中使用concatMap替换flatMap,并告诉我生成的行为是否与您要查找的内容相符.

responseStream = requestStream.concatMap(//I replaced `flatMap`
  sendRequest,response}; }
);

基本上concatMap具有与flatMap类似的签名,行为的不同之处在于它将等待当前observable在继续下一个之前被展平.所以在这里:

> requestStream值将被推送到concatMap运算符.
> concatMap运算符将生成一个sendRequest observable,并且该observable中的任何值(似乎是一个元组(val,response))将通过选择器函数传递,其对象结果将传递给下游
>当sendRequest完成时,处理另一个requestStream值.
>简而言之,您的请求将逐一处理

或者,也许您想使用延迟来推迟执行sendRequest.

responseStream = requestStream.concatMap(//I replaced `flatMap`
  function(x){return Rx.Observable.defer(function(){return sendRequest(x);})},response}; }
);

猜你在找的JavaScript相关文章