The selective consumer is consumer that applies a filter to incoming messages so that only messages meeting that specific selection criteria are processed. A JMS selector is a predicate expression involving JMS headers and JMS properties. If the selector evaluates to true, the JMS message is allowed to reach the consumer.
Let us consider an Order processing example, we have two types of orders (One is Online and another one is physical store order) All orders are pushing in to ORDER queue along with the details and Two consumers (Online order processor and Physical order processor) are selecting messages based on the predicate expressions.
Prerequisites
ActiveMQ should be up and running1. Create a Maven Project
Create a Maven project with following dependencies<dependencies>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-core</artifactId>
<version>2.12.1</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-spring</artifactId>
<version>2.12.1</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-jms</artifactId>
<version>2.12.1</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.8.0</version>
</dependency>
</dependencies>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-core</artifactId>
<version>2.12.1</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-spring</artifactId>
<version>2.12.1</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-jms</artifactId>
<version>2.12.1</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.8.0</version>
</dependency>
</dependencies>
2. Setup Spring Configuration (camel-application-context-msgselector.xml)
In this configuration we will load the activemq component with name jms and start one route which needs to process the Order Queue.<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:camel="http://camel.apache.org/schema/spring"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
<bean id="route" class="com.vinod.test.MessageSelectorRoute" />
<camel:camelContext id="camel-client">
<camel:template id="camelTemplate" />
<camel:routeBuilder ref="route" />
</camel:camelContext>
<bean id="jms" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="brokerURL" value="tcp://localhost:61616" />
</bean>
</beans>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:camel="http://camel.apache.org/schema/spring"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
<bean id="route" class="com.vinod.test.MessageSelectorRoute" />
<camel:camelContext id="camel-client">
<camel:template id="camelTemplate" />
<camel:routeBuilder ref="route" />
</camel:camelContext>
<bean id="jms" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="brokerURL" value="tcp://localhost:61616" />
</bean>
</beans>
3. Create a Message Router
This route has the message selector and it will listen the ORDER Queue and based on the selector it will redirect to the other Queues.eg: ONLINE order details will go the the online processor and store orders will go the the store processor.
package com.vinod.test;
import org.apache.camel.builder.RouteBuilder;
public class MessageSelectorRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
from("jms:ORDER?selector=METHOD='ONLINE'").to("jms:queue:ORDERONLINEPROCESSOR");
from("jms:ORDER?selector=METHOD='STORE'").to("jms:queue:ORDERSTOREPROCESSOR");
}
}
import org.apache.camel.builder.RouteBuilder;
public class MessageSelectorRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
from("jms:ORDER?selector=METHOD='ONLINE'").to("jms:queue:ORDERONLINEPROCESSOR");
from("jms:ORDER?selector=METHOD='STORE'").to("jms:queue:ORDERSTOREPROCESSOR");
}
}
4. Create a Message producer
This class will push the order messages in to ORDER queue and add the selector name as well.package com.vinod.test;
import org.apache.camel.ProducerTemplate;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class MessageSelectorMain {
public static void main(String[] args) throws Exception {
// Sending Message to the order Queue
ApplicationContext context = new ClassPathXmlApplicationContext("camel-application-context-msgselector.xml");
ProducerTemplate camelTemplate = context.getBean("camelTemplate", ProducerTemplate.class);
System.out.println("Order Message Sending started");
camelTemplate.sendBodyAndHeader("jms:queue:ORDER", "Online Order Details", "METHOD", "ONLINE");
camelTemplate.sendBodyAndHeader("jms:queue:ORDER", "Store Order Details", "METHOD", "STORE");
camelTemplate.sendBodyAndHeader("jms:queue:ORDER", "Online Order Details", "METHOD", "ONLINE");
System.out.println("Order Message sending completed");
}
}
import org.apache.camel.ProducerTemplate;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class MessageSelectorMain {
public static void main(String[] args) throws Exception {
// Sending Message to the order Queue
ApplicationContext context = new ClassPathXmlApplicationContext("camel-application-context-msgselector.xml");
ProducerTemplate camelTemplate = context.getBean("camelTemplate", ProducerTemplate.class);
System.out.println("Order Message Sending started");
camelTemplate.sendBodyAndHeader("jms:queue:ORDER", "Online Order Details", "METHOD", "ONLINE");
camelTemplate.sendBodyAndHeader("jms:queue:ORDER", "Store Order Details", "METHOD", "STORE");
camelTemplate.sendBodyAndHeader("jms:queue:ORDER", "Online Order Details", "METHOD", "ONLINE");
System.out.println("Order Message sending completed");
}
}
Run it
Run the above main program and check the active mq console, we can see the massages are routed in to different queue based on the selector.Output
Download Example
https://github.com/kkvinodkumaran/camelReferences
1. http://fusesource.com/2. http://camel.apache.org/selective-consumer.html

No comments:
Post a Comment