Spring boot RxJava Integration
What is RxJava
RxJava is a Java implementation of ReactiveX
library for composing asynchronous and event-based programs by using observable sequences.
The building blocks of RxJava are Observables
and Subscribers
.
Observable is used for emitting items and Subscriber is used for consuming those items.
Types of Observables and Observers
The following are the different types of Observables in RxJava:
-
Single
-
Observable
-
Flowable
-
Maybe
-
Completable
For each Observable
there is one Observer
Following are the different types of Observers in RxJava:
-
Observer
-
SingleObserver
-
MaybeObserver
-
CompletableObserver
Single Observable
This tutorial focus on Single observable , which helps to make parallel calls to downstream systems.
Consider a scenario where you need construct api response by reading information from different down-steam systems.
__(10 sec)__ SERVICE-B
/
/
CLIENT --> API ------
<1> <2> \
\ __(14 sec)__ SERVICE-C
<1> CLIENT making call to API
<2> API makes calls to downstream service SERVICE-B and SERVICE-C
One option is, to make sequence of calls to SERVICE-B and then SERVICE-C , which takes (10+ 14= 24) seconds to respond back to CLIENT.
Other option is , to make parallel calls so that max (10, 14) = 10 seconds could be response time to your CLIENT.
Different ways we can achieve this
-
by initiating direct threads.
-
using executor service
-
Reactive libraries such as RxJava.
RxJava library helps you to achieve this in seamless way to so that you dont need worry about thread pool management and cleaning up thread once task is done.
Parallel calls with Single
Single Observable is best option to make network calls
Single is used when the Observable has to emit only one value like a response from a network call.
Single<CityStateRes> csRes = cityClient.findCityNameAsync(zipCode)
.subscribeOn(Schedulers.io()); (1)
Single<AlternateCities> csRes2 = alternateCitiesClient
.findAlternateCitiesAsync(zipCode)
.subscribeOn(Schedulers.io()); (2)
CityFinder cityFinderResponse= Single.zip(csRes, csRes2,
(res, res2) -> (3)
{
CityFinder cityFinder = new CityFinder();
cityFinder.setCityStateRes(res);
cityFinder.setAlternateCities(res2);
return cityFinder;
}
).subscribeOn(Schedulers.io()).toBlocking().value();
Single.zip will helps us to initiate parallel calls on IO Scheduler.
Schedulers
If you want to introduce multithreading into your cascade of Observable operators, you can do so by instructing those operators (or particular Observables) to operate on particular Schedulers.
-
IO — This scheduler used for IO related stuff. Such as network requests, file system operations. IO scheduler is backed by thread-pool which are re-used based on availability.
Spring boot RxJava Integration
RxJavaHooks helpful to pass parent Thread context(Tomcat worker thread) to Child (RxJava )Threads.
Create RxJava Action0 instance
package com.tvajjala.reactive.spring.hooks;
import brave.Span;
import brave.Tracer;
import com.tvajjala.reactive.spring.context.ThreadContext;
import com.tvajjala.reactive.spring.context.ThreadContextHolder;
import rx.functions.Action0;
public class ContextHandoverAction implements Action0{
private Action0 actual;
private final Tracer tracer;
private final Span parent;
private final ThreadContext threadContext;
public ContextHandoverAction(Action0 actual, ThreadContext threadContext, Tracer tracer ){
this.actual=actual;
this.threadContext=threadContext;
this.tracer = tracer;
this.parent = this.tracer.currentSpan();
}
@Override
public void call() {
Span span = this.parent;
boolean created = false;
if (span != null) {
span = this.tracer.toSpan(this.parent.context());
} else {
span = this.tracer.nextSpan().name("rxjava").start();
span.tag("thread", Thread.currentThread().getName());
created = true;
}
try {
Tracer.SpanInScope ws = this.tracer.withSpanInScope(span);
Throwable var4 = null;
try {
ThreadContextHolder.setContext(threadContext); //(1)
this.actual.call();
} catch (Throwable var20) {
var4 = var20;
throw var20;
} finally {
if (ws != null) {
if (var4 != null) {
try {
ws.close();
} catch (Throwable var19) {
var4.addSuppressed(var19);
}
} else {
ws.close();
}
}
}
} finally {
ThreadContextHolder.clear(); //(2)
if (created) {
span.finish();
}
}
}
}
-
Passing ThreadContext
-
Must clear after thread execution to avoid Memory issues
Registering RxJavaHooks
Register RxJavaHooks after application startup
package com.tvajjala.reactive.spring;
import brave.Tracer;
import com.tvajjala.reactive.spring.context.ThreadContextHolder;
import com.tvajjala.reactive.spring.hooks.ApplicationPostInitializationHook;
import com.tvajjala.reactive.spring.hooks.ApplicationPreInitializationHook;
import com.tvajjala.reactive.spring.hooks.ContextHandoverAction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import rx.plugins.RxJavaHooks;
/**
* @author ThirupathiReddy Vajjala
*/
@SpringBootApplication
public class ReactiveSpringApplication implements CommandLineRunner {
private static final Logger LOGGER = LoggerFactory.getLogger(ReactiveSpringApplication.class);
@Autowired
Tracer tracer;
public static void main(String[] args) {
SpringApplication springApplication = new SpringApplication(ReactiveSpringApplication.class);
springApplication.run(args);
}
@Override
public void run(String... args) {
LOGGER.info("Application initialization completed ");
/* Any scheduler actions (Scheduler.io) this code executes */
RxJavaHooks.setOnScheduleAction(action0-> new ContextHandoverAction(action0, ThreadContextHolder.getThreadContext(), tracer)); (1)
}
}
-
Registering RxJavaHooks
Comments
Post a Comment