我开始用Rx
Java和ReactFX玩,我变得非常着迷.但是正在试验我有几十个问题,我不断研究答案.
我观察的一件事(不是双关语)当然是懒惰的执行.在下面的探索性代码中,我注意到没有任何内容被执行,直到merge.subscribe(pet – > System.out.println(pet))被调用.但令我着迷的是当我订阅第二个订阅者merge.subscribe(pet – > System.out.println(“Feed”pet))时,它再次启动了“迭代”.
我想要理解的是迭代的行为.它似乎没有像只能使用一次的Java 8流.它一字一行地经历每个String一个,并将其作为当时的值发布?在任何以前发布的订阅者接收到这些项目的新用户,如果它们是新的?
public class RxTest { public static void main(String[] args) { Observable<String> dogs = Observable.from(ImmutableList.of("Dasher","Rex")) .filter(dog -> dog.matches("D.*")); Observable<String> cats = Observable.from(ImmutableList.of("Tabby","Grumpy Cat","Meowmers","Peanut")); Observable<String> ferrets = Observable.from(CompletableFuture.supplyAsync(() -> "Harvey")); Observable<String> merge = dogs.mergeWith(cats).mergeWith(ferrets); merge.subscribe(pet -> System.out.println(pet)); merge.subscribe(pet -> System.out.println("Feed " + pet)); } }
解决方法
可观察到的< T>代表一个monad,一个链接的操作,而不是执行操作本身.它是描述性语言,而不是你习惯的命令.要执行一个操作,你.subscribe()到它.每次订阅新的执行流都是从头开始创建的.除非您使用.subscribeOn()或.observeOn()指定线程更改,否则不要将流与线程混淆,因为预订将同步执行.您将新元素链接到任何现有操作/ monad / Observable以添加
new behaviour,例如更改线程,过滤,累积,转换等.如果您的可观察是昂贵的操作,则不想在每个订阅上重复,您可以防止使用.cache()进行娱乐.
为了使任何异步/同步的可观察<使用.toBlocking()将其类型更改为BlockingObservable<T>
.而不是.subscribe()它包含用.forEach()执行每个结果的操作的新方法,或强制使用.first()
可观察是一个很好的工具,因为它们主要是确定性的(相同的输入总是产生相同的输出,除非你做错了),可重用(可以作为命令/策略模式的一部分发送),大部分都忽略同意,因为他们不应该依靠共同的国家(也就是做错事). BlockingObservables是好的,如果你试图把一个基于可观察的库到命令式语言,或者只是执行一个可观察的操作,你有100%的信心,它的管理良好.
围绕这些原则构建您的应用程序是一个范例的变化,我无法真正地回答这个答案.
*There are breaches like
Subject
andObservable.create()
that are needed to integrate with imperative frameworks.