data:image/s3,"s3://crabby-images/3665b/3665bb3faf72d1400f28fc7ec6ec51a42784df08" alt="Spring Integration Essentials"
Selecting a channel
Let's discuss what default implementations have been provided by Spring Integration and how they can be leveraged.
Publish-subscribe channel
This is the only implementation of the publish-subscribe model of channel. The primary purpose of this channel is to send messages to registered endpoints; this cannot be polled. It can be declared as follows:
<int:publish-subscribe-channel id="pubSubChannel"/>
Let's discuss each of the elements in this line; this will be used throughout the examples of this chapter:
int
: This is a namespace that declares all Spring Integration components. As discussed in Chapter 1, Getting Started, the STS visual editor can be used to add different namespaces from Spring Integration.publish-subscribe-channel
: This is the type exposed by Spring.Id
: This is the unique name through which the channel can be referred.
To refer to these elements from the code, we can use:
public class PubSubExample { private ApplicationContext ctx = null; private MessageChannel pubSubChannel = null; public PubSubChannelTest() { ctx = new ClassPathXmlApplicationContext("spring-integration-context.xml"); pubSubChannel = ctx.getBean("pubSubChannel", MessageChannel.class); } }
Queue channel
Remember queue concepts from good old data structures? QueueChannel
employs the same concept—it enforces First in First out (FIFO) ordering and a message can be consumed by one and only one endpoint. It's a strictly one-to-one relationship, even if the channel has multiple consumers; one message will be delivered to only one of them. In Spring Integration, it can be defined as follows:
<int:channel id="queueChannel"> <queue capacity="50"/> </int:channel>
As soon as a message is available on the channel, it will try to send the message to the subscribed consumer. The element capacity
indicates the maximum number of undelivered messages to be held in the queue. If the queue is full, which is determined by the capacity
parameter, the sender will be blocked until messages are consumed and further room is available in the queue. Alternatively, if a timeout parameter has been specified for the sender, the sender will wait for the specified timeout interval—if space is created in the queue within the timeout interval, the sender will put the message there, else it will discard that message and start with an other one.
Priority channel
Queue enforces FIFO, but what if a message needs urgent attention and needs to be processed out of the queue? For example, a server health monitoring service might send health audits to an audit service, but if it sends a server down event, it needs urgent processing. This is where PriorityChannel
is handy; it can pick messages based on their priority rather than arrival order. Messages can be prioritized as follows:
- By adding a
priority
header within each message - By providing a comparator of type
Comparator<Message<?>>
to the priority channel's constructor
Let's take the following example of a priority channel and inject a comparator there, which will be used to decide the priority of the message:
<int:channel id="priorityChannel"> <int:priority-queue capacity="50"/> </int:channel>
A comparator can be injected as follows:
<int:channel id="priorityChannel" datatype="com.example.result"> <int:priority-queue comparator="resultComparator" capacity="50"/> </int:channel>
Rendezvous channel
Often, it is desirable to have an acknowledgement that the message has indeed reached the endpoint. The rendezvousChannel
interface, which is a subclass of the queue channel, serves this purpose. Producer and consumer work in a blocking mode. As soon as the producer sends a message on the channel, it is blocked until that message has been consumed. Similarly, a consumer is blocked until a message arrives in the queue. It can be configured as follows:
<int:channel id="rendezvousChannel"/> <int:rendezvous-queue/> </int:channel>
The RendezvousChannel
interface implements a zero capacity queue, which means that at any given point, there can exist only one message on the queue. No wonder there is no capacity element.
Direct channel
Direct channel is the default channel type used by Spring Integration.
Multiple endpoints can subscribe message handlers with the direct channel; whenever a producer puts a message on the channel, it is delivered to one and only one of the message handlers of subscribed endpoints. The introduction of multiple subscribers with a restriction to deliver a message to one and only one of the handlers introduces new challenges—how and which handler will be selected and what will happen if the handler is not able to process the message? This is where a load balancer and failover come into the picture. A load balancer can be defined on this channel with a round-robin delivery strategy:
<int:channel id="newQuestions"> <dispatcher failover="false" load-balancer="round-robin"/> </int:channel>
This will deliver messages to subscribers on a round-robin basis. This is the only strategy defined out-of-the-box by Spring, but a custom strategy can be defined using interface
:
public interface LoadBalancingStrategy { public Iterator<MessageHandler> getHandlerIterator( Message<?> message, List<MessageHandler> handlers); }
Here is an example of introducing a custom load balancer:
<int:channel id="lbChannel"> <int:dispatcher load-balancer-ref="customLb"/> </int:channel> <bean id="customLb" class="com.chandan.CustomLoadBalancingImpl"/>
Tip
Downloading the example code
You can download the example code files for all Packt books you have purchased from your account at http://www.packtpub.com. If you purchased this book elsewhere, you can visit http://www.packtpub.com/support and register to have the files e-mailed directly to you. The code can also be pulled from https://github.com/cpandey05/siessentials.
Failover, on other hand, is a Boolean value. If this is set to true, then if the first handler fails to process the message, then all subsequent handlers will be tried. Even if one of the handlers successfully processes the message, Spring Integration will not report an error. Only if all of the handlers fail, will it throw an exception.
Executor channel
The ExecutorChannel
interface is a point-to-point message channel. This is very similar to the direct channel, except that custom executors can be used to dispatch the messages. Let's have a look at the configuration:
<int:channel id="results"> <int:dispatcher task-executor="resultExecutor"/></int:channel> // define the executor <bean id=" resultExecutor " class="com.example.ResultExecutor"/>
The com.example.ResultExecutor
interface is an implementation of java.uti.concurrent.Executor
.
A transaction link cannot be established between producer and consumer because a producer thread hands off the message to an executor instance and backs off—the consumption of the message is processed in the executor thread.
As in direct channels, a load-balancing strategy and failover can be set. The default values are a round-robin strategy with failover enabled:
<int:channel id="results"> <int:dispatcher load-balancer="none" failover="false" taskexecutor="resultsExecutor"/> </int:channel>
Scoped channel
When a channel is declared, it is in a global space and visible to all of the threads. But what if we want to restrict the visibility of the channel to a certain scope, such as a particular thread, web session request scope, and so on? The scope
attribute does just this: it defines the scope in which the channel is visible. For example, a channel defined in the following code snippet is visible in the thread
scope:
<int:channel id="threadScopeChannel" scope="thread"> <int:queue /> </int:channel>
A custom scope can also be defined, as follows:
<bean class="org.springframework.beans.factory.config.CustomScopeConfigurer"> <property name="scopes"> <map> <entry key="thread" value="org.springframework.context.support.SimpleThreadScope" /> </map> </property> </bean>
This is an example of a thread scoped channel. If we observe the entries, a key-value pair has been defined for the scope. For the thread, the key-value pair is org.springframework.context.support.SimpleThreadScope
. It can be any Spring-defined or a user-defined scope.
Datatype channel
A channel can be restricted to accept messages having only a certain type of payload, for example, numbers, string, or any other custom type. The code is as follows:
<int:channel id="examMarksChannel" datatype="java.lang.Number"/>
Multiple types can also be provided, as follows:
<int:channel id="stringOrNumberChannel" datatype="java.lang.String,java.lang.Number"/>
What will happen if a message arrives in a format other than the one given in the preceding code? By default, an exception will be thrown. However, if the use case warrants, we can define converters, which will try to convert incoming messages into an acceptable format. A typical use case is the conversion of a string to an integer. For this to happen, a bean named integrationConversionService
that is an instance of Spring's Conversion Service must be defined as follows:
public static class StringToIntegerConverter implements Converter<String, Integer> { public Integer convert(String source) { return Integer.parseInt(source); } } <int:converter ref="strToInt"/> <bean id="strToInt" class="com.chandan.StringToIntegerConverter"/>
When the converter
element is parsed, it will create the integrationConversionService
bean on-demand, if one is not already defined. With that converter in place, if a string message arrives on a channel defined as an integer, an attempt would be made to convert it to an integer.