正确处理RxJava中的空Observable
|
我有一种情况,我正在创建一个包含数据库结果的Observable.我正在为他们应用一系列过滤器.然后我有一个订阅者记录结果.可能情况下,没有任何元素通过过滤器.我的业务逻辑表明这不是错误.但是,当这种情况发生时,我的onError被调用并且包含以下异常: java.util.NoSuchElementException:Sequence不包含元素 接受的做法是检测那种类型的异常并忽略它吗?还是有更好的处理方法? 版本是1.0.0. 这是一个简单的测试用例,暴露了我看到的内容.它似乎与使所有事件在到达地图之前被过滤并减少相关. @Test
public void test()
{
Integer values[] = new Integer[]{1,2,3,4,5};
Observable.from(values).filter(new Func1<Integer,Boolean>()
{
@Override
public Boolean call(Integer integer)
{
if (integer < 0)
return true;
else
return false;
}
}).map(new Func1<Integer,String>()
{
@Override
public String call(Integer integer)
{
return String.valueOf(integer);
}
}).reduce(new Func2<String,String,String>()
{
@Override
public String call(String s,String s2)
{
return s + "," + s2;
}
})
.subscribe(new Action1<String>()
{
@Override
public void call(String s)
{
System.out.println(s);
}
});
}
因为我正在使用一个安全的用户,所以它首先抛出一个OnErrorNotImplementedException,其中包含以下异常: java.util.NoSuchElementException: Sequence contains no elements
at rx.internal.operators.OperatorSingle$1.onCompleted(OperatorSingle.java:82)
at rx.internal.operators.NotificationLite.accept(NotificationLite.java:140)
at rx.internal.operators.TakeLastQueueProducer.emit(TakeLastQueueProducer.java:73)
at rx.internal.operators.TakeLastQueueProducer.startEmitting(TakeLastQueueProducer.java:45)
at rx.internal.operators.OperatorTakeLast$1.onCompleted(OperatorTakeLast.java:59)
at rx.internal.operators.OperatorScan$2.onCompleted(OperatorScan.java:121)
at rx.internal.operators.OperatorMap$1.onCompleted(OperatorMap.java:43)
at rx.internal.operators.OperatorFilter$1.onCompleted(OperatorFilter.java:42)
at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:79)
at rx.internal.operators.OperatorScan$2$1.request(OperatorScan.java:147)
at rx.Subscriber.setProducer(Subscriber.java:139)
at rx.internal.operators.OperatorScan$2.setProducer(OperatorScan.java:139)
at rx.Subscriber.setProducer(Subscriber.java:133)
at rx.Subscriber.setProducer(Subscriber.java:133)
at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:47)
at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:33)
at rx.Observable$1.call(Observable.java:144)
at rx.Observable$1.call(Observable.java:136)
at rx.Observable$1.call(Observable.java:144)
at rx.Observable$1.call(Observable.java:136)
at rx.Observable$1.call(Observable.java:144)
at rx.Observable$1.call(Observable.java:136)
at rx.Observable$1.call(Observable.java:144)
at rx.Observable$1.call(Observable.java:136)
at rx.Observable$1.call(Observable.java:144)
at rx.Observable$1.call(Observable.java:136)
at rx.Observable.subscribe(Observable.java:7284)
根据下面@davem的答案,我创建了一个新的测试用例: @Test
public void testFromBlockingAndSingle()
{
Integer values[] = new Integer[]{-2,-1,1,5};
List<String> results = Observable.from(values).filter(new Func1<Integer," + s2;
}
}).toList().toBlocking().single();
System.out.println("Test: " + results + " Size: " + results.size());
}
而这个测试产生以下行为: 当输入为: Integer values[] = new Integer[]{-2,5};
那么结果(如预期的)是: Test: [-2,-1] Size: 1 当输入为: Integer values[] = new Integer[]{0,5};
那么结果是下面的堆栈跟踪: java.util.NoSuchElementException: Sequence contains no elements at rx.internal.operators.OperatorSingle$1.onCompleted(OperatorSingle.java:82) at rx.internal.operators.NotificationLite.accept(NotificationLite.java:140) at rx.internal.operators.TakeLastQueueProducer.emit(TakeLastQueueProducer.java:73) at rx.internal.operators.TakeLastQueueProducer.startEmitting(TakeLastQueueProducer.java:45) at rx.internal.operators.OperatorTakeLast$1.onCompleted(OperatorTakeLast.java:59) at rx.internal.operators.OperatorScan$2.onCompleted(OperatorScan.java:121) at rx.internal.operators.OperatorMap$1.onCompleted(OperatorMap.java:43) at rx.internal.operators.OperatorFilter$1.onCompleted(OperatorFilter.java:42) at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:79) at rx.internal.operators.OperatorScan$2$1.request(OperatorScan.java:147) at rx.Subscriber.setProducer(Subscriber.java:139) at rx.internal.operators.OperatorScan$2.setProducer(OperatorScan.java:139) at rx.Subscriber.setProducer(Subscriber.java:133) at rx.Subscriber.setProducer(Subscriber.java:133) at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:47) at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:33) at rx.Observable$1.call(Observable.java:144) at rx.Observable$1.call(Observable.java:136) at rx.Observable$1.call(Observable.java:144) at rx.Observable$1.call(Observable.java:136) at rx.Observable$1.call(Observable.java:144) at rx.Observable$1.call(Observable.java:136) at rx.Observable$1.call(Observable.java:144) at rx.Observable$1.call(Observable.java:136) at rx.Observable$1.call(Observable.java:144) at rx.Observable$1.call(Observable.java:136) at rx.Observable$1.call(Observable.java:144) at rx.Observable$1.call(Observable.java:136) at rx.Observable$1.call(Observable.java:144) at rx.Observable$1.call(Observable.java:136) at rx.Observable.subscribe(Observable.java:7284) at rx.observables.BlockingObservable.blockForSingle(BlockingObservable.java:441) at rx.observables.BlockingObservable.single(BlockingObservable.java:340) at EmptyTest2.test(EmptyTest2.java:19) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) at org.junit.runners.ParentRunner.run(ParentRunner.java:309) at org.junit.runner.JUnitCore.run(JUnitCore.java:160) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:74) at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:211) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:67) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134) 所以看起来问题肯定是使用reduce函数.请参阅以下两种情况的测试用例: @Test
public void testNoReduce()
{
Integer values[] = new Integer[]{-2,String>()
{
@Override
public String call(Integer integer)
{
return String.valueOf(integer);
}
}).toList().toBlocking().first();
Iterator<String> itr = results.iterator();
StringBuilder b = new StringBuilder();
while (itr.hasNext())
{
b.append(itr.next());
if (itr.hasNext())
b.append(",");
}
System.out.println("Test NoReduce: " + b);
}
通过以下输入: Integer values[] = new Integer[]{-2,5};
我得到以下结果: Test NoReduce: -2,-1 并输入以下内容: Integer values[] = new Integer[]{0,5};
我得到以下输出: Test NoReduce: 所以,除非我完全误解了一些事情,否则真正处理零元素的唯一方法来自过滤器的Observable,后跟一个map和reduce是在Observable链之外实现reduce逻辑.你们都同意这个说法吗? 最终解决方案 (编辑:十堰站长网) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
