×

IDMWORKS Blog

Request-Response Pattern With Spring Cloud Stream


In this post we’ll look at implementing the Request-response pattern using Spring Cloud Stream and Netty.

What is the Request-Response Pattern?

From the Wikipedia page for Request-response:

“Request–response is a message exchange pattern in which a requestor sends a request message to a replier system which receives and processes the request, ultimately returning a message in response. This is a simple, but powerful messaging pattern which allows two applications to have a two-way conversation with one another over a channel. This pattern is especially common in client–server architectures.

For simplicity, this pattern is typically implemented in a purely synchronous fashion, as in web service calls over HTTP, which holds a connection open and waits until the response is delivered or the timeout period expires. However, request–response may also be implemented asynchronously, with a response being returned at some unknown later time. This is often referred to as “sync over async”, or “sync/async”, and is common in enterprise application integration (EAI) implementations where slow aggregations, time-intensive functions, or human workflow must be performed before a response can be constructed and delivered.

It is the asynchronous implementation of Request-response referred to above – also known as “sync over async”, or “sync/async” – that we will be exploring.

What is Spring Cloud Stream?

From the Spring Cloud Stream homepage:

“Spring Cloud Stream is a framework for building highly scalable event-driven microservices connected with shared messaging systems.

The framework provides flexible programming model built on already established and familiar Spring idioms and best practices, including support for persistent pub/sub semantics, consumer groups, and stateful partitions.”

Overview

We’ll create two applications:

  • A TCP server that accepts requests, dispatching them via a message broker. The application will also subscribe to a response queue, processing those responses and sending them back to the requesting actor (e.g. a telnet session).
  • A request processor that subscribes to messages representing the incoming requests, and then publishes a transformed response back to the message broker.

We’ll do this in three stages to illustrate the concept:

  1. First, we’ll create a simple TCP echo server using Netty and Spring Boot
  2. Then, we’ll introduce code to publish the request via Spring Cloud Stream, and to consume those requests from a request processor application
  3. Finally, we’ll tie things together by processing the responses and sending them to the proper Netty channel

The first two sections will primarily focus on implementing simple examples demonstrated in the vendor-provided documentation. These examples will serve as the foundation for the final section, which will tackle the Request-response pattern with Spring Cloud Stream.

Note that this is not a getting started guide for either Netty or Spring Cloud Stream. Refer to the vendor-provided documentation for getting started guides.

Let’s get started!

Creating a TCP Server

To get started, we’ll create a simple TCP echo server using Spring Boot and Netty.

“Spring Boot makes it easy to create stand-alone, production-grade Spring based Applications that you can “just run”.

We take an opinionated view of the Spring platform and third-party libraries so you can get started with minimum fuss. Most Spring Boot applications need very little Spring configuration.”

“Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients.”

First, navigate to Spring Initializr – a helpful site for bootstrapping new Spring projects. Change the artifact to “tcp-server” and click the Generate Project button. If prompted, choose to download and save the project archive (by default it will be named tcp-server.zip).

Extract the downloaded archive and open it in your favorite IDE.

Next, delete the class com.example.tcpserver.TcpServerApplicationTests. This is automatically generated by Spring Initializr and won’t work out-of-the-box with the TCP server. Writing integration tests for TCP servers hosted in Spring Boot applications is beyond the scope of this article, so we’ll simply delete the sample (empty) and move on.

Our TCP echo server implementation will be taken from the official Netty documentation, with minor changes to work with Spring Boot initially and additional modifications – in later sections – to work with Spring Cloud Stream.

Add Netty as a dependency by opening your pom.xml file and adding the following:

Then, add a new class, TcpServerHandler, with the following code:

Next, add another new class, TcpServer, with the following content:

Again this code is taken almost directly from the Netty echo server example, with minor changes to integrate with Spring Boot. The class is annotated with the @Service annotation and the postConstruct() method is added with the @PostConstruct annotation to start the server when the Spring Boot application starts.

With the above Netty example code in place and our minor changes to work with Spring Boot, you can now start the application and test it. Either run the application from within your IDE or start it from the command line using:

Once the server is running, you can test it using Telnet, e.g.:


> telnet localhost 8080
Trying ::1...
Connected to localhost.
Escape character is '^]'.
test
test
foo
foo
bar
bar
^]
telnet> quit
Connection closed.

You should be able to connect, and any request you send should be “echoed” back as the response.

We now have a functioning TCP echo server using Netty and Spring Boot. In the next session we’ll add Spring Cloud Stream and use a second Spring Boot application to process messages.

Using Spring Cloud Stream to Process Requests

Now that we’ve demonstrated Request-response with Netty and Spring Boot, we’ll extend the example to also dispatch the request asynchronously using Spring Cloud Stream.

TCP Server Modifications

First, we’ll modify TCP server project to use an implementation of SmartLifecycle rather than the @PostConstruct annotation to start the server. This is necessary as @PostConstruct is fired too early during application initialization and will create the TCP server (and start listening) before Spring Cloud Stream is auto-wired. Specifically, you will see the error “Dispatcher has no subscribers for channel ‘unknown.channel.name'”

Create the class ApplicationLifecycle with the following content:

This is a fairly standard implementation of the Spring SmartLifecycle interface. Spring will automatically start the TCP server after Spring Cloud has been auto-wired, due to isAutoStartup() returning true and getPhase() returning a large integer value.

Next, we need to modify the TcpServer class to define the public start() and stop() methods called by the new ApplicationLifecycle class.

To do this, make the NioEventLoopGroup references fields instead of variables.

Then, replace the postConstruct() method (and its associated annotation) with a new stop() method:

Finally, rename the run() method to start() and make it public.

You can now recompile and run the application and you should see the same positive test results, seen in the previous section, using telnet. However, note that more messages are logged on startup as Spring has made it further through its initialization process before starting the TCP server.

Next, we’ll add Spring Cloud Stream to the TCP server project. This will involve a few steps that would not be necessary if we’d included “Cloud Stream” as a dependency when generating the project using Spring Initializr. However, this will serve to demonstrate how to add Spring Cloud to an existing Spring Boot project and we’ll do it the “automatic” way when creating the request processor project.

Open the pom.xml file in the TCP server project. Add the following to the element:

Visit https://cloud.spring.io to find the current version of Spring Cloud. Finchley was the latest version at the time this article was being written.

Add the following element under the top-level element:

Finally, add the following dependency:

Now, we’ll add the Java code necessary to dispatch requests to our message broker (preserving the existing implementation that echoes the request as the response).

Open TcpServerHandler and add the following constructor, which takes an org.springframework.cloud.stream.messaging.Source parameter:

Next, modify the channelRead() method to read as shown below.

With the changes above, the incoming request will be dispatched to the message broker as a String, preserving the origin code that also returns the request as a response via Netty.

Now open TcpServer so that we can pass the new Source parameter to the TcpServerHandler constructor. Modify the class as shown below:

There should be two additional annotations and a new auto-wired Source field.

For the final Java change, modify the body of the start() method to pass the source field as a parameter to the TcpServer constructor:

The final change we need to make to the TCP server project, in order to dispatch requests, is to modify the application.properties configuration file to specify where to dispatch messages. Simply open application.properties and add the following:

This tells Spring Cloud Stream that the destination for the output channel is tcp_request.

That concludes the changes to the TCP server project for this section. The application will now not only echo the request as a response, but will also dispatch the request to a message broker. However, nothing will subscribe to these messages (yet).

Creating a Request Processor

Now we’ll create a new request processor application to process the dispatched requests.

Revisit Spring Initializr – this time to create the request processor application. Change the artifact to “request-processor”, add “Cloud Stream” under the Dependencies heading, and click the Generate Project button. If prompted, choose to download and save the project archive (by default it will be named request-processor.zip).

Extract the downloaded archive and open the project alongside the TCP server project in your IDE.

Add the RabbitMQ binder for Spring Cloud Stream by adding the following dependency to the request processor pom.xml file:

Add a new class named RequestProcessor with the following content:

As with the TCP server, the code here is taken directly from the vendor documentation to keep the focus on the Request-response implementation in the final section. The processor will simply take the messages it receives, transform the payload to upper-case, and then return the result back to the message broker. The message will also be written to the console for demonstration purposes.

The only other change that needs to be made is to the application.properties file, essentially telling the application where messages come from and where they go to. Open application.properties and add the following lines:

This tells Spring Cloud Stream that the destination for the input channel is tcp_request, and the destination for the output channel is tcp_response.

At this point you can run both applications to demonstrate the achieved (but incomplete) functionality: sending a request via telnet should still result in an “echoed” response, but you should also see a message printed to the console by the request processor application indicating it received the message (the CRLF is captured as part of the request):


Received message 'foo
'
Received message 'bar
'

However, the “processed” request is not being used…yet. We’ll cover that in the final section.

Using Spring Cloud Stream to Process Responses

In this final section, we’ll process the response that is generated by the request processor – recall that this will be our request transformed to upper-case. The previous sections have primarily involved straight-forward implementations of the vendor-provided documentation. I’ve provided a some explanation and glue, but you are primarily seeing Netty and Spring Cloud Stream’s documented examples at work.

This final section will be the “meat” of the article – showing how to tie the example code together to implement the Request-response pattern with Spring Cloud Stream and a Netty TCP server.

To achieve this, we need to modify the TCP server project as follows:

  1. Keep track of the active Netty channels (so that we can find the channel that corresponds to each response)
  2. Send the ID of the Netty channel used to make the request along with the dispatched message
  3. Add a new response processor that handles response messages by sending a response to the requesting actor

There will be no changes required for the request processor application. Let’s tie this all together!

Keeping Track of Active Netty Channels

First, we’ll keep track of active Netty channels using an instance of the ChannelGroup class. This will be injected via the Spring IoC container, so we need to declare an instance using the @Bean annotation inside a class that has the @Configuration annotation. Create a new TcpServerConfiguration class with the following code:

Next, modify the TcpHandler constructor to take an instance of ChannelGroup (along with the existing Source parameter).

Override the channelActive() method in the TcpHandler class as well to keep track of active channels using the instance of ChannelGroup:

Note that there is no need to handle closed / deactivated channels as the implementation of ChannelGroup will do that automatically.

Next, open the code for TcpServer and auto-wire an instance of ChannelGroup.

And pass it to the existing call to the TcpServerHandler constructor.

That’s all that needs to be done in order to keep track of the active Netty channels.

Sending the Netty Channel ID

Now we need to send the ID of the Netty channel associated with the request along with the message published via Spring Cloud Stream.

First, modify the TcpServerHandler code by updating the channelRead() method and introducing a serializeChannelId() method as shown below.

We’ve removed the hard-coded echoing of the request as the response, and we now serialize the associated Netty channel ID, attaching it to the message as a header.

Processing Responses

We’re in the home stretch now! The final step is to start processing the responses.

First, add the class ResponseProcessor with the following code:

This is a pretty standard implementation of a stream processor in Spring Cloud Stream. The inline comments explain the logic of the method germane to this article.

Finally, modify the application.properties file by adding the following line so that the TCP server knows what messages to subscribe to:

This tells Spring Cloud Stream that the destination for the input channel is tcp_response.

And that is it! You can now run both projects and test them with telnet. You should see that the responses are now transformed to upper-case rather than being echoed exactly as the user inputs.

Wrapping Up

Spring Cloud Stream is a very promising framework, and the Request-response pattern a popular one commonly found in client-server architectures. With the information found in this article, you should be able to make use of Spring Cloud Stream in those cases where asynchronous processing should be used for synchronous requests that expect a response.

Click here to download the source code for the TCP server and request processor applications.

Questions, comments or concerns? Feel free to reach out to us below, or email us at IDMWORKS to learn more about how you can protect your organization and customers.

Leave a Reply

Your email address will not be published. Required fields are marked *