正确处理RxJava中的空Observable

前端之家收集整理的这篇文章主要介绍了正确处理RxJava中的空Observable前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。
我有一种情况,我正在创建一个包含数据库结果的Observable.我正在为他们应用一系列过滤器.然后我有一个订阅者记录结果.可能情况下,没有任何元素通过过滤器.我的业务逻辑表明这不是错误.但是,当这种情况发生时,我的onError被调用并且包含以下异常: java.util.NoSuchElementException:Sequence不包含元素

接受的做法是检测那种类型的异常并忽略它吗?还是有更好的处理方法

版本是1.0.0.

这是一个简单的测试用例,暴露了我看到的内容.它似乎与使所有事件在到达地图之前被过滤并减少相关.

@Test
public void test()
{

    Integer values[] = new Integer[]{1,2,3,4,5};

    Observable.from(values).filter(new Func1<Integer,Boolean>()
    {
        @Override
        public Boolean call(Integer integer)
        {
            if (integer < 0)
                return true;
            else
                return false;
        }
    }).map(new Func1<Integer,String>()
    {
        @Override
        public String call(Integer integer)
        {
            return String.valueOf(integer);
        }
    }).reduce(new Func2<String,String,String>()
    {
        @Override
        public String call(String s,String s2)
        {
            return s + "," + s2;
        }
    })

            .subscribe(new Action1<String>()
            {
                @Override
                public void call(String s)
                {
                    System.out.println(s);
                }
            });
}

因为我正在使用一个安全的用户,所以它首先抛出一个OnErrorNotImplementedException,其中包含以下异常:

java.util.NoSuchElementException: Sequence contains no elements
    at rx.internal.operators.OperatorSingle$1.onCompleted(OperatorSingle.java:82)
    at rx.internal.operators.NotificationLite.accept(NotificationLite.java:140)
    at rx.internal.operators.TakeLastQueueProducer.emit(TakeLastQueueProducer.java:73)
    at rx.internal.operators.TakeLastQueueProducer.startEmitting(TakeLastQueueProducer.java:45)
    at rx.internal.operators.OperatorTakeLast$1.onCompleted(OperatorTakeLast.java:59)
    at rx.internal.operators.OperatorScan$2.onCompleted(OperatorScan.java:121)
    at rx.internal.operators.OperatorMap$1.onCompleted(OperatorMap.java:43)
    at rx.internal.operators.OperatorFilter$1.onCompleted(OperatorFilter.java:42)
    at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:79)
    at rx.internal.operators.OperatorScan$2$1.request(OperatorScan.java:147)
    at rx.Subscriber.setProducer(Subscriber.java:139)
    at rx.internal.operators.OperatorScan$2.setProducer(OperatorScan.java:139)
    at rx.Subscriber.setProducer(Subscriber.java:133)
    at rx.Subscriber.setProducer(Subscriber.java:133)
    at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:47)
    at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:33)
    at rx.Observable$1.call(Observable.java:144)
    at rx.Observable$1.call(Observable.java:136)
    at rx.Observable$1.call(Observable.java:144)
    at rx.Observable$1.call(Observable.java:136)
    at rx.Observable$1.call(Observable.java:144)
    at rx.Observable$1.call(Observable.java:136)
    at rx.Observable$1.call(Observable.java:144)
    at rx.Observable$1.call(Observable.java:136)
    at rx.Observable$1.call(Observable.java:144)
    at rx.Observable$1.call(Observable.java:136)
    at rx.Observable.subscribe(Observable.java:7284)

根据下面@davem的答案,我创建了一个新的测试用例:

@Test
public void testFromBlockingAndSingle()
{

    Integer values[] = new Integer[]{-2,-1,1,5};

    List<String> results = Observable.from(values).filter(new Func1<Integer," + s2;
        }
    }).toList().toBlocking().single();

    System.out.println("Test: " + results + " Size: " + results.size());

}

而这个测试产生以下行为:

当输入为:

Integer values[] = new Integer[]{-2,5};

那么结果(如预期的)是:

Test: [-2,-1] Size: 1

当输入为:

Integer values[] = new Integer[]{0,5};

那么结果是下面的堆栈跟踪:

java.util.NoSuchElementException: Sequence contains no elements
at rx.internal.operators.OperatorSingle$1.onCompleted(OperatorSingle.java:82)
at rx.internal.operators.NotificationLite.accept(NotificationLite.java:140)
at rx.internal.operators.TakeLastQueueProducer.emit(TakeLastQueueProducer.java:73)
at rx.internal.operators.TakeLastQueueProducer.startEmitting(TakeLastQueueProducer.java:45)
at rx.internal.operators.OperatorTakeLast$1.onCompleted(OperatorTakeLast.java:59)
at rx.internal.operators.OperatorScan$2.onCompleted(OperatorScan.java:121)
at rx.internal.operators.OperatorMap$1.onCompleted(OperatorMap.java:43)
at rx.internal.operators.OperatorFilter$1.onCompleted(OperatorFilter.java:42)
at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:79)
at rx.internal.operators.OperatorScan$2$1.request(OperatorScan.java:147)
at rx.Subscriber.setProducer(Subscriber.java:139)
at rx.internal.operators.OperatorScan$2.setProducer(OperatorScan.java:139)
at rx.Subscriber.setProducer(Subscriber.java:133)
at rx.Subscriber.setProducer(Subscriber.java:133)
at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:47)
at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:33)
at rx.Observable$1.call(Observable.java:144)
at rx.Observable$1.call(Observable.java:136)
at rx.Observable$1.call(Observable.java:144)
at rx.Observable$1.call(Observable.java:136)
at rx.Observable$1.call(Observable.java:144)
at rx.Observable$1.call(Observable.java:136)
at rx.Observable$1.call(Observable.java:144)
at rx.Observable$1.call(Observable.java:136)
at rx.Observable$1.call(Observable.java:144)
at rx.Observable$1.call(Observable.java:136)
at rx.Observable$1.call(Observable.java:144)
at rx.Observable$1.call(Observable.java:136)
at rx.Observable$1.call(Observable.java:144)
at rx.Observable$1.call(Observable.java:136)
at rx.Observable.subscribe(Observable.java:7284)
at rx.observables.BlockingObservable.blockForSingle(BlockingObservable.java:441)
at rx.observables.BlockingObservable.single(BlockingObservable.java:340)
at EmptyTest2.test(EmptyTest2.java:19)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:74)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:211)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:67)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)

所以看起来问题肯定是使用reduce函数.请参阅以下两种情况的测试用例:

@Test
public void testNoReduce()
{

    Integer values[] = new Integer[]{-2,String>()
    {
        @Override
        public String call(Integer integer)
        {
            return String.valueOf(integer);
        }
    }).toList().toBlocking().first();

    Iterator<String> itr = results.iterator();
    StringBuilder b = new StringBuilder();

    while (itr.hasNext())
    {
        b.append(itr.next());

        if (itr.hasNext())
            b.append(",");
    }

    System.out.println("Test NoReduce: " + b);

}

通过以下输入:

Integer values[] = new Integer[]{-2,5};

我得到以下结果:

Test NoReduce: -2,-1

并输入以下内容

Integer values[] = new Integer[]{0,5};

我得到以下输出

Test NoReduce:

所以,除非我完全误解了一些事情,否则真正处理零元素的唯一方法来自过滤器的Observable,后跟一个map和reduce是在Observable链之外实现reduce逻辑.你们都同意这个说法吗?

最终解决方

在实施TomášDvořák和David Motten建议之后,这是我的最终解决方案.我认为这个解决方案是合理的.

@Test
public void testWithToList()
{

    Integer values[] = new Integer[]{-2,5};

     Observable.from(values).filter(new Func1<Integer,Boolean>()
     {
         @Override
         public Boolean call(Integer integer)
         {
             if (integer < 0)
                 return true;
             else
                 return false;
         }
     }).toList().map(new Func1<List<Integer>,String>()
     {
         @Override
         public String call(List<Integer> integers)
         {
             Iterator<Integer> intItr = integers.iterator();
             StringBuilder b = new StringBuilder();

             while (intItr.hasNext())
             {
                 b.append(intItr.next());

                 if (intItr.hasNext())
                 {
                     b.append(",");
                 }
             }

             return b.toString();
         }
     }).subscribe(new Action1<String>()
     {
         @Override
         public void call(String s)
         {
             System.out.println("With a toList: " + s);
         }
     });

}

以下是给出以下输入时该测试的行为.

当给定一个具有某些值的流通过过滤器时:

Integer values[] = new Integer[]{-2,5};

结果是:

With a toList: -2,-1

当给定一个没有任何值的流通过过滤器时:

Integer values[] = new Integer[]{0,5};

结果是:

With a toList: <empty string>

解决方法

现在更新之后,错误是很明显的. RxJava中的减少将失败,并且IllegalArgumentException如果可观察到的是减少则为空,正如根据规范( http://reactivex.io/documentation/operators/reduce.html)所示.

功能编程中,通常有两个通用运算符将​​集合聚合成单个值,折叠和缩小.在接受的术语中,fold采用初始累加器值,并且使用运行累加器和来自集合的值并产生另一个累加器值的函数.伪码中的一个例子:

[1,4] .fold(0,(累加器,值)=>累加器值)

将从0开始,最后将1,4加到运行累加器,最后得到10,这些值的总和.

减少非常相似,只是不明确地采用初始累加器值,它使用第一个值作为初始累加器,然后累加剩余的所有值.这是有道理的,如果你是例如寻找最小或最大值.

[1,4] .reduce((cumulative,value)=> min(累加器,值))

看着折叠和减少不同的方式,你可能会使用折中,即使在空集合(如,总和,0有意义),聚合值也是有意义的,并且减少(最小对空集合没有意义,并减少意志在这种情况下无法操作,在您的情况下抛出异常).

你正在做一个类似的聚合,用逗号分隔一组字符串以产生一个单一的字符串.那是一个有点困难的情况.这可能是一个空的集合(你可能会期望一个空字符串)是有意义的,但另一方面,如果你从一个空的累加器开始,结果中还会有一个比你预期的更多的逗号.正确的解决方案是检查集合是否为空,并返回空集合的回退字符串,或者对非空集合进行缩减.您可能会观察到,通常您不需要在空的集合案例中使用空字符串,但是“收集为空”可能更合适,因此进一步确保您的解决方案是干净的.

Btw,我在这里使用这个词汇而不是自由地观察,只是为了教育目的.此外,在RxJava中,fold和reduce都称为相同的,减少,只有该方法有两个版本,一个只使用一个参数,另外两个参数.

至于你最后的问题:你不必离开Observable链.只要使用toList(),就像David Motten所说.

.filter(...)
.toList()
.map(listOfValues => listOfValues.intersperse(","))

其中散文可以在减少方面实现,如果不是图书馆功能(很常见).

collection.intersperse(separator) = 
    if (collection.isEmpty()) 
      ""
    else
      collection.reduce(accumulator,element => accumulator + separator + element)

猜你在找的Java相关文章