我的角度应用程序使用websocket与后端进行通信.
在我的测试用例中,我有2个客户端组件. Observable计时器按预期打印两个不同的客户端ID.
每个ngOnInit()还打印其客户端的id.
现在出于某种原因,对于每条消息,websocketService.observeClient()的订阅被调用2次,但是this.client.id总是打印第二个客户端的值.
继承我的客户组件
@Component({ ... }) export class ClientComponent implements OnInit { @Input() client: Client; constructor(public websocketService: WebsocketService) { Observable.timer(1000,1000).subscribe(() => console.log(this.client.id)); } ngOnInit() { console.log(this.client.id); this.websocketService.observeClient().subscribe(data => { console.log('message',this.client.id); }); } }
还有我的websocket服务
@Injectable() export class WebsocketService { private observable: Observable<MessageEvent>; private observer: Subject<Message>; constructor() { const socket = new WebSocket('ws://localhost:9091'); this.observable = Observable.create( (observer: Observer<MessageEvent>) => { socket.onmessage = observer.next.bind(observer); socket.onerror = observer.error.bind(observer); socket.onclose = observer.complete.bind(observer); return socket.close.bind(socket); } ); this.observer = Subject.create({ next: (data: Message) => { if (socket.readyState === WebSocket.OPEN) { socket.send(JSON.stringify(data)); } } }); } observeClient(): Observable<MessageEvent> { return this.observable; } }
编辑
好吧,据我所知,它与Observables是单播对象这一事实有关,我必须使用Subject,但我不知道如何创建Subject.
从rxjs 5开始,您可以使用内置的websocket功能为您创建主题.当您在发生错误后重新订阅流时,它也会重新连接.请参考这个答案:
https://stackoverflow.com/a/44067972/552203
TLDR:
let subject = Observable.webSocket('ws://localhost:8081'); subject .retry() .subscribe( (msg) => console.log('message received: ' + msg),(err) => console.log(err),() => console.log('complete') ); subject.next(JSON.stringify({ op: 'hello' }));