Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -236,4 +236,4 @@ The `share` method is an alias for `Observable.publish().ConnectableObservable.r

| Previous | Next |
| --- | --- |
| [Time-shifted sequences](/Part 3 - Taming the sequence/5. Time-shifted sequences.md) | [Chapter 4 - Concurrency](/Part 4 - Concurrency/1. Scheduling and threading.md) |
| [Time-shifted sequences](/Part 3 - Taming the sequence/5. Time-shifted sequences.md) | [Custom operators](/Part 3 - Taming the sequence/7. Custom operators.md) |
283 changes: 283 additions & 0 deletions Part 3 - Taming the sequence/7. Custom operators.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,283 @@
# Custom operators

RxJava offers a very large [operator set](http://reactivex.io/RxJava/javadoc/rx/Observable.html). Counting all the overloads, the number of operators on Rx is over 200. A smaller number of those is essential, meaning that you cannot achieve an Rx implementation without them. Others are there just for convenience and for a more self-descriptive name. For example, if `source.First(user -> user.isOnline())` didn't exist, we would still be able to do `source.filter(user -> user.isOnline()).First()`.

Despite many convenience operators, the operator set of RxJava is still very basic. It provides the building blocks for the developer to do something useful. There is no operator for calculating a running average from a sequence of numbers. But you can make one yourself:

```java
class AverageAcc {
public final int sum;
public final int count;
public AverageAcc(int sum, int count) {
this.sum = sum;
this.count = count;
}
}
```
```java
source
.scan(
new AverageAcc(0,0),
(acc, v) -> new AverageAcc(acc.sum + v, acc.count + 1))
.filter(acc -> acc.count > 0)
.map(acc -> acc.sum/(double)acc.count);
```

That does it, but it's not reusable. In a real application, you will probably want to do the same kind of processing over many different sequences. Even if you don't, you'd still want to hide all this code behind a single phrase: "running average". Understandably, your first instinct would be to make a function out of this:

```java
public static Observable<Double> runningAverage(Observable<Integer> source) {
return source
.scan(
new AverageAcc(0,0),
(acc, v) -> new AverageAcc(acc.sum + v, acc.count + 1))
.filter(acc -> acc.count > 0)
.map(acc -> acc.sum/(double)acc.count);
}
```

And you can easily use it:

```java
runningAverage(Observable.just(3, 5, 6, 4, 4))
.subscribe(System.out::println);
```
Output
```
3.0
4.0
4.666666666666667
4.5
4.4
```

The above example looks fine because it's simple. Let's do something a little more complicated with our custom operator. Let's take a phrase, turn it into a sequence of word lengths and calculate the running average for that.

```java
runningAverage(
Observable.just("The brown fox jumped and I forget the rest")
.flatMap(phrase -> Observable.from(phrase.split(" ")))
.map(word -> word.length()))
.subscribe(System.out::println);
```

Once again, this works, but how far can you go with this? Imagine if everything in Rx was done this way (including all the existing operators).

```java
subscribe(
lastOperator(
middleOperator(
firstOperator(source))))
```

We're reading our pipeline in reverse! Yikes!

## Chaining operators

Rx has a particular style for applying operators, by chaining them, rather than nesting them. This style is not uncommon for objects whose methods return instances of the same type. This makes even more sense for immutable objects and can be found even in standard features, such as strings: `String s = new String("Hi").toLowerCase().replace('a', 'c');`. This style allows you to see modifications in the order that they are applied, and it also looks neater when a lot of operators are applied.

Ideally, you would want your Rx operators to fit into the chain just like any other operator:
```java
Observable.range(0,10)
.map(i -> i*2)
.myOperator()
.subscribe();
```

Many languages have ways of supporting this. Inconveniently, Java doesn't. You'd have to edit `Observable` iteself to add your own methods. There's no point asking for it either, since the RxJava team are conservative about adding yet another operator. You could `extend` `Observable` and add your own operators there. In that case, as soon as you call any of the old operators, you're going to get a vanilla `Observable` back and lose access to the operators that you built.

### compose

There is a way of fitting a custom operator into the chain with the `compose` method.

```java
public <R> Observable<R> compose(Observable.Transformer<? super T,? extends R> transformer)
```

Aha! A `Transformer` interface! `Transformer<T,R>` actually just an alias for the `Func1<Observable<T>,Observable<R>>` interface. It is a method that takes an `Observable<T>` and returns an `Observable<R>`, just like the one we made for calculating a running average.

```java
Observable.just(3, 5, 6, 4, 4)
.compose(Main::runningAverage)
.subscribe(System.out::println);
```

Java doesn't let you reference a function by its name alone, so here we assumed the custom operator is in our Main class. We can see that now our operator fits perfecty into the chain, albeit with the boilderplate of calling `compose` first. For even better encapsulation, you should implement `Observable.Transformer` in a new class and move the whole thing out of sight, along with its helper class(es).

```java
public class RunningAverage implements Observable.Transformer<Integer, Double> {
private static class AverageAcc {
public final int sum;
public final int count;
public AverageAcc(int sum, int count) {
this.sum = sum;
this.count = count;
}
}

@Override
public Observable<Double> call(Observable<Integer> source) {
return source
.scan(
new AverageAcc(0,0),
(acc, v) -> new AverageAcc(acc.sum + v, acc.count + 1))
.filter(acc -> acc.count > 0)
.map(acc -> acc.sum/(double)acc.count);
}
}
```

And we use it like this

```java
source.compose(new RunningAverage())
```

Most Rx operators are parameterisable. We can do this too. Let's extend the functionality of our operator with the possiblity to ignore values above a certain threshold.

```java
public class RunningAverage implements Observable.Transformer<Integer, Double> {
private static class AverageAcc {
public final int sum;
public final int count;
public AverageAcc(int sum, int count) {
this.sum = sum;
this.count = count;
}
}

final int threshold;

public RunningAverage() {
this.threshold = Integer.MAX_VALUE;
}

public RunningAverage(int threshold) {
this.threshold = threshold;
}

@Override
public Observable<Double> call(Observable<Integer> source) {
return source
.filter(i -> i< this.threshold)
.scan(
new AverageAcc(0,0),
(acc, v) -> new AverageAcc(acc.sum + v, acc.count + 1))
.filter(acc -> acc.count > 0)
.map(acc -> acc.sum/(double)acc.count);
}
}
```

We just added the parameter as a field in the operator, added constructors for the uses that we cover and used the parameter in our Rx operations. Now we can do `source.compose(new RunningAverage(5))`, where, ideally, we would be calling `source.runningAverage(5)`. Java only lets us go this far. Rx is a functional paradigm, but Java is still primarily an object oriented language and quite conservative about it.

### lift

Every Rx operator, internally, does 3 things

1. Subscribes to the source and observes the values.
2. Transforms the observed sequence according to the operator's purpose.
3. Pushes the modified sequence to its own subscribers, by calling `onNext`, `onError` and `onCompleted`.

The `compose` operator works with a method that makes an observable out of another. By doing this, it spares you the trouble of doing the 3 steps above manually. That presumes that you can do the transformation by using existing operators. If the operators don't already exist, or if you think you can get better performance manually, you need to receive items manually, process them and re-push them. An `Observable.Transformer` that does this would include a `Subscribe` to the source `Observable` and the creation of a new `Observable`, possibly a `Subject`. As an exercise, you can try implementing an operator (e.g. reimplement `map`) without using any existing operators.

There's a simpler way. The `lift` operator is similar to `compose`. Instead of transforming `Observable`, it transforms `Subscriber`.

```java
public final <R> Observable<R> lift(Observable.Operator<? extends R,? super T> lift)
```

And `Observable.Operator<R,T>` is an alias for `Func1<Subscriber<? super R>,Subscriber<? super T>>`: a function that will transform a `Subscriber<R>` into `Subscriber<T>`. By dealing directly with `Subscriber` we avoid involving `Observable`.

This seems backwards at first: to turn an `Observable<T>` into `Observable<R>`, we need a function that turns `Subscriber<R>` into `Subscriber<T>`. To understand why that is the case, remember that a subscription begins at the end of the chain and is propagated to the source. In other words, a subscirption goes backwards through the chain of operators. Each operator receives a subscription (i.e. is subscribed to) and uses that subscription to create a subscription to the preceeding operator.

In the next example, we will reimplement `map`, without using the existing implementation or any other existing operator.

```java
class MyMap<T,R> implements Observable.Operator<R, T> {
private Func1<T,R> transformer;

public MyMap(Func1<T,R> transformer) {
this.transformer = transformer;
}

@Override
public Subscriber<? super T> call(Subscriber<? super R> subscriber) {
return new Subscriber<T>() {

@Override
public void onCompleted() {
if (!subscriber.isUnsubscribed())
subscriber.onCompleted();
}

@Override
public void onError(Throwable e) {
if (!subscriber.isUnsubscribed())
subscriber.onError(e);
}

@Override
public void onNext(T t) {
if (!subscriber.isUnsubscribed())
subscriber.onNext(transformer.call(t));
}

};
}
}
```

A `map` operator requires a function that transforms items from `T` to `R`. In our implementation, that's the `transformer` field. The key part is the `call` method. We receive a `Subscriber<R>` that wants to receive items of type `R`. For that subscriber we create a new `Subscriber<T>` that takes items of type `T`, transforms them to type `R` and pushes them to the `Subscriber<R>`. `lift` handles the boilerplate of receiving the `Subscriber<R>`, as well as using the created `Subscriber<T>` to subscribe on the source observable.

Using an `Observable.Operator` is as simple as using `Observable.Transformer`:
```java
Observable.range(0, 5)
.lift(new MyMap<Integer, String>(i -> i + "!"))
.subscribe(System.out::println);
```
Output
```
0!
1!
2!
3!
4!
```

A class constructor in Java can't have its type parameters infered. A logical last step would be to make a method that can infer the types for us
```java
public static <T,R> MyMap<T,R> create(Func1<T,R> transformer) {
return new MyMap<T,R>(transformer);
}
```
And use like this
```java
Observable.range(0, 5)
.lift(MyMap.create(i -> i + "!"))
.subscribe(System.out::println);
```

When doing manual pushes to subscribers, like we do when implementing `Observable.Operator`, there are a few things to consider
* A subscriber is free to unsubscribe. Don't push without checking first: `!subscriber.isUnsubscribed()`.
* You are responsible for complying with the Rx contract: any number of `onNext` notifications, optionally followed by a single `onCompleted` or `onError`.
* If you are going to use asynchronous operations and scheduling, use the [Schedulers](/Part%204%20-%20Concurrency/1.%20Scheduling%20and%20threading.md#Schedulers) of Rx. That will allow your operator to become [testable](/Part%204%20-%20Concurrency/2.%20Testing%20Rx.md#testscheduler).

### serialize

If you can't guarantee that your operator will obey the Rx contract, for example because you push asynchronously, you can use the `serialize` operator. The `serialize` operator will turn an unreliable observable into a lawful, sequential observable.

![](https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/synchronize.png)

### Extra benefits of lift

If the ability to use your custom operator in the chain like a standard operator is not convincing enough, using `lift` has one more unexpected advantage. Standard operators are also implemented using `lift`, which makes `lift` a hot method at runtime. JVM optimises for `lift` and operators that use `lift` receive a performance boost. That can include your operator, if you use `lift`.




#### Continue reading

| Previous | Next |
| --- | --- |
| [Hot and cold observables](/Part 3 - Taming the sequence/6. Hot and Cold observables.md) | [Chapter 4 - Concurrency](/Part 4 - Concurrency/1. Scheduling and threading.md) |