Project Reactor: Overview & Implementations

Project Reactor: Overview & Implementations

Java Reactive Programming Series : Episode - II

This article is a part of "Java Reactive Programming" series. If you haven't read the previous article, please refer here.

In this article, we will look into the Project Reactor library, its implementation of the Publisher interface and the different operators available to transform the sequences.

Overview

  • Project Reactor library is built around Reactive Streams specification delivering the paradigm of Reactive programming on the JVM.

  • The reactor library offers fully non-blocking and efficient demand management by leveraging higher-level APIs built on top of Reactive Streams specifications.

  • This article explains the implementation of the Publisher interface in the reactor-core library and the methods for creating, transforming, aggregating and combining the sequences by leveraging the library.

Implementation of Publisher Interface

  • As discussed in the previous article, a reactive library is the implementation of reactive stream specifications (Publisher, Subscriber, Subscription, Processor).

  • Following that, the project reactor library provides two important implementations of the Publisher interface.

    • Flux – Return 0 to N items. It can produce multiple values.

    • Mono – Returns 0 (or) 1 item. It produces, at most 1 value.

Flux

  • Flux is an implementation of the Publisher interface, with operators that be used to generate, transform, and orchestrate Flux sequences.

  • Flux emits 0 to N items via the onNext signal then terminates with either onComplete (or) onError signal.

Image Courtesy: Project Reactor Docs

The above marble diagram describes the transformation of Flux objects using the operator (filter (or) map (or) log, etc) along its timeline.

Mono

  • Mono is an implementation of the Publisher interface with operators that emits at most one item via the onNext signal then terminates with an onComplete signal (successful Mono, with or without value), or only emits a single onError signal (failed Mono).

Image Courtesy: Project Reactor Docs

The above marble diagram describes the transformation of the Mono object using the operator (map (or) flatMap (or) log, etc) along its timeline.

Different Operators of Mono and Flux Publishers

In this section, we will focus on the most common methods provided by the reactor-core library to create and transform Mono and Flux sequences.

Creating a New Sequence

Flux#just()

Creates a Flux, that emits the provided elements (user-specified elements) and then completes.

Marble Diagram:

Method:

/**
* Param: <T> the emitted data type
* Param: data, the elements to emit, as a vararg
* Returns: A new Flux
*/
public static <T> Flux<T> just(T... data)

Method Usage:

Flux<Integer> oddNum = Flux.just(1,3,5,7,9);
        oddNum.subscribe(i -> System.out.println(i));
//Output
1
3
5
7
9

Mono#just()

Creates a new Mono that emits the provided item, which is captured at instantiation time.

Marble Diagram:

Method:

/**
* Param: <T> the emitted data type
* Param: data, the element to emit
* Returns: A new Mono
*/
public static <T> Mono<T> just(T data)

Method Usage:

Mono.just("Hello").subscribe(System.out::print);
//Output
Hello

From Iterable

Creates a Flux that emits the items contained in the provided Iterable (or) Collection.

Marble Diagram:

Method:

/**
* Param: The iterable of type <T> to read the data from
* Return:  A new Flux
*/
public static <T> Flux<T> fromIterable(Iterable<? extends T> it)

Method Usage:

Flux<Integer> evenNum = Flux.fromIterable(List.of(2,4,6,8,10));
evenNum.subscribe(i -> System.out.println(i));
//Output
2
4
6
8
10

From Array

Creates a Flux that emits the elements provided in the array.

Method:

/**
* Param: The array of type <T> to read the data from
* Return:  A new Flux
*/
public static <T> Flux<T> fromArray(T[] array)

Method Usage:

Flux<String> southStates = Flux.fromArray(new String[] {"TN", "AP", "KL", "KA", "TG"});
southStates.subscribe(s -> System.out.println(s));
//Output
TN
AP
KL
KA
TG

By Range

Creates a Flux that emits a sequence of integers which starts from {start} (included) and gets incremented for {count}-1 times

Marble Diagram:

Method:

/**
* Param: start - The first integer to be emitted
* Param: count - Total number of incrementing values to emit including the first value.
* Return: A new Flux contains the range of values specified.
*/
public static Flux<Integer> range(int start, int count)

Method Usage:

Flux<Integer> rangeFlux = Flux.range(100, 5);
rangeFlux.subscribe(s -> System.out.println(s));
//Output
100
101
102
103
104

From Stream

Creates a Flux that emits the items contained in the provided Stream.

Note: Once consumed, Stream cannot be reused again. Multiple subscription or re-subscription is not possible. Stream is closed automatically by the operator on error/cancellation/completion.

Marble Diagram:

Method:

/**
* Param: Stream from which data to be read.
* Return: A new Flux containing the elements provided by Stream.
*/
public static <T> Flux<T> fromStream(Stream<? extends T> s)

Method Usage:

List<Integer> nums = List.of(1,2,3,4,5);
Stream<Integer> streamNums = nums.stream();

Flux<Integer> streamFlux = Flux.fromStream(streamNums);
streamFlux.subscribe(s -> System.out.println(s));
//Output
1
2
3
4
5

Transforming an Existing Sequence

Flux#Map

Transforms the items emitted by the Flux, by applying the synchronous function to each item. The map is used when transforming the data on a 1-to-1 basis.

Marble Diagram:

Method:

/**
* Param: mapper is the synchronous function which performs transformation on the item supplied.
* Param: <V> is the transformed datatype.
* Return: A transformed Flux.
*/
public final <V> Flux<V> map(Function<? super T, ? extends V> mapper)

Method Usage:

Flux<String> animals = Flux.fromIterable(List.of("lion", "tiger", "monkey", "wolf"));
animals.map(s -> s.toUpperCase()).subscribe(System.out::println);
//Output
LION
TIGER
MONKEY
WOLF

Note: It can be observed that the map preserves the order.


Mono#Map

Transform the item emitted by this Mono by applying a synchronous function to it.

Marble Diagram:

Method:

/**
* Param: R - the transformed type.
* Param: mapper - the synchronous transforming Function.
* Return: a new Mono.
*/
public final <R> Mono<R> map(Function<? super T, ? extends R> mapper)

Method Usage:

Mono.just("testMono").map(s -> s.toUpperCase())
                     .subscribe(System.out::print);
//Output
TESTMONO

Flux#FlatMap

Transforms the element emitted by the Flux asynchronously into Publishers, then flattens these inner publishers into a single Flux through merging which allows them to interleave. The flatMap is used when transforming the data on a 1-to-many basis.

Marble Diagram:

Method:

/**
* Param: mapper is the function that transforms the input item into Publisher object. This function take input of type ‘T’ and produces an output of Publisher type ‘R’.
* Param: <R> is the merged output sequence type.
* Return: A new Flux.
*/
public final <R> Flux<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper)

Method Implementation:

Flux<String> animals = Flux.fromIterable(List.of("lion", "tiger", "monkey", "wolf"));
List<String> outputList = new ArrayList<>();
animals.flatMap(s -> {
                    //Two Operations - Converting the String 
                    //to UpperCase and splitting by character
                    String[] ss = s.toUpperCase().split("");
                    return Flux.fromArray(ss);    
                }).subscribe(outputList::add);

System.out.print(outputList);
//Output
[L,I,O,N,T,I,G,E,R,M,O,N,K,E,Y,W,O,L,F]

Note: flatMap output order may differ from input order. If the order of the input is to be preserved, then use flatMapSequential.

The flatMapSequential() method is to transform the elements emitted by the Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux, but merge them in the order of their source element.


Mono#FlatMap

Transforms the item emitted by the Mono asynchronously, returning the value emitted by another Mono (possibly changing the value type).

Marble Diagram:

Method:

/**
* Param: R - the result type bound
* transformer - the function to dynamically bind a new Mono
* Return: a new Mono with an asynchronously mapped value.
*/
public final <R> Mono<R> flatMap(Function<? super T, ? extends Mono<? extends R>> transformer)

Method Usage:

Mono.just("testMono").flatMap(s -> Mono.just(List.of(s.split(""))))
                     .subscribe(System.out::print);
//Output
[t,e,s,t,M,o,n,o]

Aggregating a Flux

CollectList

Collects all the elements emitted by the Flux into a Mono<List<T>>

Marble Diagram:

Method:

/**
* Return: Return a Mono of a List of all values from the supplied Flux.
*/
public final Mono<List<T>> collectList()

Method Usage:

Mono<List<String>> animalList = animals.flatMap(s -> {
            // Two Operations - Converting the String to UpperCase 
            // and splitting by character
            String[] ss = s.toUpperCase().split("");
            return Flux.fromArray(ss);
        }).collectList();
animalList.subscribe(System.out::print);

//Output
[L,I,O,N,T,I,G,E,R,M,O,N,K,E,Y,W,O,L,F]

CollectSortedList

Collect all elements emitted by the Flux until this sequence completes, and then sort them in natural order into a List that is emitted by the resulting Mono.

Marble Diagram:

Method:

/**
* Return: A Mono of a sorted List of all values from the Flux, in natural order
*/
public final Mono<List<T>> collectSortedList()

Method Implementation:

Mono<List<String>> animalList = animals.flatMap(s -> {
            // Two Operations - Converting the String to UpperCase and splitting by
            // character
            String[] ss = s.toUpperCase().split("");
            return Flux.fromArray(ss);
        }).collectSortedList();
animalList.subscribe(System.out::print);
//Output
[E,E,F,G,I,I,K,L,L,M,N,N,O,O,O,R,T,W,Y]

Collect Map

  • Collect all elements emitted by the Flux into a Hash Map that is emitted by the resulting Mono when this sequence completes, emitting the empty Map if the sequence was empty.

  • The key is extracted from each element by applying the keyExtractor Function, and the value is extracted by the valueExtractor Function.

  • In case several elements map to the same key, the associated value will be derived from the most recently emitted element.

Marble Diagram:

Method:

/**
* Param: K - the type of the key extracted from each source element
* Param: V - the type of the value extracted from each source element.
* Param: keyExtractor - a Function to map elements to a key for the Map
* Param: valueExtractor - a Function to map elements to a value for the Map.
* Return: a Mono of a Map of key-element pairs
*/
public final <K, V> Mono<Map<K, V>> collectMap(Function<? super T, ? extends K> keyExtractor, Function<? super T, ? extends V> valueExtractor)

Method Implementation:

public static void main(String[] args) {
    Mono<Map<String, Integer>> employee = getEmployeeList().collectMap(Employee::getName, Employee::getEmployeeCode);
    employee.subscribe(System.out::println);
}

static Flux<Employee> getEmployeeList(){
    List<Employee> employeeList =List.of(
                                    new Employee("Ram", 123), 
                                    new Employee("Ravi", 456), 
                                    new Employee("Raj", 901)
                                    );

    return Flux.fromIterable(employeeList);
}

//Output
{Ravi=456, Raj=901, Ram=123}

Combining the Publishers

Merge

Merge data from Publisher sequences contained in an array / vararg into an interleaved merged sequence.

Marble Diagram:

Method:

/**
* Param: I - The source type of the data sequence
* Param: sources - the array of Publisher sources to merge
* Return: a fresh Reactive Flux publisher ready to be subscribed
*/
public static <I> Flux<I> merge(Publisher<? extends I>... sources)

Method Usage:

Flux<String> fluxStringOne = Flux.fromIterable(Arrays.asList("Ravi", "Gill", "Tony", "Laos"));
Flux<String> fluxStringTwo = Flux.fromIterable(List.of("Timber", "Mary", "Kane"));

Flux.merge(fluxStringOne, fluxStringTwo).collectList().subscribe(System.out::print);
//Output
[Ravi, Gill, Tony, Laos, Timber, Mary, Kane]

Zip

Zip multiple sources together, i.e., it waits for all the sources to emit one element and combines these elements into an output value (constructed by the provided combinator). The operator will continue doing so until any one of the sources is completed.

Marble Diagram:

Method:

/**
* Param: I - the type of the input sources
* Param: O - the combined produced type
* Param: combinator - The aggregate function that will receive a unique value from each upstream and return the value to signal downstream
* Param: sources - the array providing sources to zip
*
* Return: a zipped Flux
*/
public static <T1, T2, T3> Flux<Tuple3<T1, T2, T3>> zip(Publisher<? extends T1> source1,
            Publisher<? extends T2> source2,
            Publisher<? extends T3> source3)

Method Implementation:

class Driver{
    public static void main(String[] args){
        Flux<String> names = Flux.just("Raj", "Ravi", "Kate");
        Flux<Integer> code = Flux.just(1,2,3);
        Flux<String> deps = Flux.just("IT", "Production", "Warehouse");

        Flux<Employee> employeeDetail = Flux.zip(names, code, deps)
                                            .flatMap(s -> Flux.just(new Employee(s.getT1(), s.getT2(), s.getT3())));
        employeeDetail.subscribe(System.out::println);
    }
}

class Employee {
    String name;
    Integer employeeCode;
    String department;
    //Constructor
    .....
    //Getters and Setters
    .....
    @Override
    public String toString(){
        return this.name + "-" + this.employeeCode + "-" + this.department;
}

//Output
Raj-1-IT
Ravi-2-Production
Kate-3-Warehouse

References