Make your web application reactive with RxJava.
RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.
<dependency>
<groupId>org.jooby</groupId>
<artifactId>jooby-rxjava</artifactId>
<version>1.0.0.CR2</version>
</dependency>- map route operator: Rx.rx() that converts
Observable(and family) into Deferred API. - manage the lifecycle of
Schedulersand make sure they go down on application shutdown time. - set a default server thread pool with the number of available processors.
...
import org.jooby.rx.Rx;
...
{
use(new Rx());
get("/", req -> Observable.from("reactive programming in jooby!"))
.map(Rx.rx());
}Previous example is translated to:
{
use(new Rx());
get("/", req -> {
return new Deferred(deferred -> {
Observable.from("reactive programming in jooby!")
.subscribe(deferred::resolve, deferred::reject);
});
});
}Translation is done with the Rx.rx() route operator. If you are a RxJava programmer then you don't need to worry for learning a new API and semantic. The Rx.rx() route operator deal and take cares of the Deferred API API.
We just learn that we are not force to learn a new API, just write RxJava code. That's cool!
But.. what if you have 10 routes? 50 routes?
...
import org.jooby.rx.Rx;
...
{
use(new Rx());
get("/1", req -> Observable...)
.map(Rx.rx());
get("/2", req -> Observable...)
.map(Rx.rx());
....
get("/N", req -> Observable...)
.map(Rx.rx());
}This is better than written N routes using the Deferred API route by route... but still there is one more option to help you (and your fingers) to right less code:
...
import org.jooby.rx.Rx;
...
{
use(new Rx());
with(() -> {
get("/1", req -> Observable...);
get("/2", req -> Observable...);
....
get("/N", req -> Observable...);
}).map(Rx.rx());
}Beautiful, hugh?
The with operator let you group any number of routes and apply common attributes and/or operator to all them!!!
You can provide a Scheduler to the Rx.rx() operator:
...
import org.jooby.rx.Rx;
...
{
use(new Rx());
with(() -> {
get("/1", req -> Observable...);
get("/2", req -> Observable...);
....
get("/N", req -> Observable...);
}).map(Rx.rx(Schedulers::io));
}All the routes here will Observable#subscribeOn(Scheduler) the provided Scheduler.
This module provides the default Scheduler from RxJava. But also let you define your own Scheduler using the executor module.
rx.schedulers.io = forkjoin
rx.schedulers.computation = fixed
rx.schedulers.newThread = "fixed = 10"
The previous example defines a:
- forkjoin pool for
Schedulers#io() - fixed thread pool equals to the number of available processors for
Schedulers#computation() - fixed thread pool with a max of 10 for
Schedulers#newThread()
Of course, you can define/override all, some or none of them. In any case the Scheduler will be shutdown at application shutdown time.