-
Notifications
You must be signed in to change notification settings - Fork 7.6k
Closed
Labels
Description
Since the merge changes in 1.0.13 we now have a bug in merge that limits concurrency and can cause an async "deadlock" since not all Observables being merged will be subscribed to.
These unit tests show the issue:
@Test
public void testUnboundedDefaultConcurrency() {
List<Observable<Integer>> os = new ArrayList<Observable<Integer>>();
for(int i=0; i < 2000; i++) {
os.add(Observable.<Integer>never());
}
os.add(Observable.range(0, 100));
TestSubscriber<Integer> ts = TestSubscriber.create();
Observable.merge(os).take(1).subscribe(ts);
ts.awaitTerminalEvent(5000, TimeUnit.MILLISECONDS);
ts.assertValue(0);
ts.assertCompleted();
}
@Test
public void testConcurrencyLimit() {
List<Observable<Integer>> os = new ArrayList<Observable<Integer>>();
for(int i=0; i < 2000; i++) {
os.add(Observable.<Integer>never());
}
os.add(Observable.range(0, 100));
TestSubscriber<Integer> ts = TestSubscriber.create();
Observable.merge(os, Integer.MAX_VALUE).take(1).subscribe(ts);
ts.awaitTerminalEvent(5000, TimeUnit.MILLISECONDS);
ts.assertValue(0);
ts.assertCompleted();
}Surprisingly, even when providing the maxConcurrent value the issue still happens.
We have bounded vertical buffers, but always must default to unbounded horizontal buffers, since it is the code that defines how many items are horizontally buffered. This affects both merge and groupBy. The maxConcurrent overload allows a developer to limit the horizontal buffering, and a developer controls the groupBy selector.
Reactions are currently unavailable