Core concepts
Reactor:
Reactor is a framework to make event driven programming much easier and it is based on Reactor Design Pattern. Reactor is good for asynchronous applications on the JVM, it is an event gateway where event consumers are registered with a notification key.
Selector:
Selector is an abstraction to find consumer by invoking event.
Consumers and Event:
Consumers and Events as core module, Consumer is event consumer which needs to be notified for the event.
Producer:
Producer produces the Events and publish.
Here is one example which use the Reactor pattern to Message produce and consumes.
1. Create a Maven project and add below dependencies
<java.version>1.8</java.version>
<version.reactor>2.0.6.RELEASE</version.reactor>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-bus</artifactId>
<version>${version.reactor}</version>
</dependency>
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
<version>1.0.0.final</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>${version.reactor}</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-stream</artifactId>
<version>${version.reactor}</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-groovy</artifactId>
<version>${version.reactor}</version>
</dependency>
<dependency>
<groupId>io.projectreactor.spring</groupId>
<artifactId>reactor-spring-core</artifactId>
<version>${version.reactor}</version>
</dependency>
<dependency>
<groupId>io.projectreactor.spring</groupId>
<artifactId>reactor-spring-context</artifactId>
<version>${version.reactor}</version>
</dependency>
</dependencies>
2. Create Customer POJO
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
@JsonIgnoreProperties(ignoreUnknown = true)
public class Customer {
private String name;
private String address;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getAddress() {
return address;
}
public void setAddress(String address) {
this.address = address;
}
@Override
public String toString() {
return "Customer [name=" + name + ", address=" + address + "]";
}
}
3. Create a Consumer
import org.springframework.stereotype.Service;
import reactor.bus.Event;
import reactor.fn.Consumer;
@Service
class CustomerReceiver implements Consumer<Event<Customer>> {
public void accept(Event<Customer> ev) {
System.out.println("Customer " + ev.getData());
}
}
4. Create a publisher
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import reactor.bus.Event;
import reactor.bus.EventBus;
@Service
public class CustomerPublisher {
@Autowired
EventBus eventBus;
public void publishCustomerDetails() throws InterruptedException {
Customer customer = new Customer();
customer.setName("vinod");
customer.setAddress("Sasi area");
eventBus.notify("customer", Event.wrap(customer));
System.out.println("Message sent");
}
}
5. Spring boot Main class
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import reactor.Environment;
import reactor.bus.EventBus;
import static reactor.bus.selector.Selectors.$;
@Configuration
@EnableAutoConfiguration
@ComponentScan
public class Application implements CommandLineRunner {
@Bean
Environment env() {
return Environment.initializeIfEmpty().assignErrorJournal();
}
@Bean
EventBus createEventBus(Environment env) {
return EventBus.create(env, Environment.THREAD_POOL);
}
@Autowired
private EventBus eventBus;
@Autowired
private CustomerReceiver customerReceiver;
@Autowired
private CustomerPublisher customerPublisher;
@Autowired
private AdminReceiver adminReceiver;
public void run(String... args) throws Exception {
eventBus.on($("customer"), customerReceiver);
customerPublisher.publishCustomerDetails();
}
public static void main(String[] args) throws InterruptedException {
SpringApplication.run(Application.class, args);
}
}
6. Run the program (Application.java)
Here we can see the publisher sends the message and consumer is consuming the same.
Customer Customer [name=vinod, address=Sasi area]
7. Download Example