Request-Response Pattern With Spring Cloud Stream

Request Response Pattern With Spring Cloud Stream IAM Image

In this post we’ll look at implementing the Request-response pattern using Spring Cloud Stream and Netty.

What is the Request-Response Pattern?

From the Wikipedia page for Request-response:

“Request–response is a message exchange pattern in which a requestor sends a request message to a replier system which receives and processes the request, ultimately returning a message in response. This is a simple, but powerful messaging pattern which allows two applications to have a two-way conversation with one another over a channel. This pattern is especially common in client–server architectures.

For simplicity, this pattern is typically implemented in a purely synchronous fashion, as in web service calls over HTTP, which holds a connection open and waits until the response is delivered or the timeout period expires. However, request–response may also be implemented asynchronously, with a response being returned at some unknown later time. This is often referred to as “sync over async”, or “sync/async”, and is common in enterprise application integration (EAI) implementations where slow aggregations, time-intensive functions, or human workflow must be performed before a response can be constructed and delivered.

It is the asynchronous implementation of Request-response referred to above – also known as “sync over async”, or “sync/async” – that we will be exploring.

What is Spring Cloud Stream?

From the Spring Cloud Stream homepage:

“Spring Cloud Stream is a framework for building highly scalable event-driven microservices connected with shared messaging systems.

The framework provides flexible programming model built on already established and familiar Spring idioms and best practices, including support for persistent pub/sub semantics, consumer groups, and stateful partitions.”

Overview

We’ll create two applications:

  • A TCP server that accepts requests, dispatching them via a message broker. The application will also subscribe to a response queue, processing those responses and sending them back to the requesting actor (e.g. a telnet session).
  • A request processor that subscribes to messages representing the incoming requests, and then publishes a transformed response back to the message broker.

We’ll do this in three stages to illustrate the concept:

  1. First, we’ll create a simple TCP echo server using Netty and Spring Boot
  2. Then, we’ll introduce code to publish the request via Spring Cloud Stream, and to consume those requests from a request processor application
  3. Finally, we’ll tie things together by processing the responses and sending them to the proper Netty channel

The first two sections will primarily focus on implementing simple examples demonstrated in the vendor-provided documentation. These examples will serve as the foundation for the final section, which will tackle the Request-response pattern with Spring Cloud Stream.

Note that this is not a getting started guide for either Netty or Spring Cloud Stream. Refer to the vendor-provided documentation for getting started guides.

Let’s get started!

Creating a TCP Server

To get started, we’ll create a simple TCP echo server using Spring Boot and Netty.

“Spring Boot makes it easy to create stand-alone, production-grade Spring based Applications that you can “just run”.

We take an opinionated view of the Spring platform and third-party libraries so you can get started with minimum fuss. Most Spring Boot applications need very little Spring configuration.”

“Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients.”

First, navigate to Spring Initializr – a helpful site for bootstrapping new Spring projects. Change the artifact to “tcp-server” and click the Generate Project button. If prompted, choose to download and save the project archive (by default it will be named tcp-server.zip).

Extract the downloaded archive and open it in your favorite IDE.

Next, delete the class com.example.tcpserver.TcpServerApplicationTests. This is automatically generated by Spring Initializr and won’t work out-of-the-box with the TCP server. Writing integration tests for TCP servers hosted in Spring Boot applications is beyond the scope of this article, so we’ll simply delete the sample (empty) and move on.

Our TCP echo server implementation will be taken from the official Netty documentation, with minor changes to work with Spring Boot initially and additional modifications – in later sections – to work with Spring Cloud Stream.

Add Netty as a dependency by opening your pom.xml file and adding the following:

    io.netty
    netty-all

Then, add a new class, TcpServerHandler, with the following code:

package com.example.tcpserver;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class TcpServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ctx.write(msg);
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}

Next, add another new class, TcpServer, with the following content:

package com.example.tcpserver;

import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;

@Service
public class TcpServer {
    private static final int TCP_PORT = 8080;

    private void run() throws InterruptedException {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new TcpServerHandler());
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            // Bind and start to accept incoming connections.
            ChannelFuture f = b.bind(TCP_PORT).sync();

            // Wait until the server socket is closed.
            // In this example, this does not happen, but you can do that to gracefully
            // shut down your server.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    @PostConstruct
    public void postConstruct() throws InterruptedException {
        run();
    }
}

Again this code is taken almost directly from the Netty echo server example, with minor changes to integrate with Spring Boot. The class is annotated with the @Service annotation and the postConstruct() method is added with the @PostConstruct annotation to start the server when the Spring Boot application starts.

With the above Netty example code in place and our minor changes to work with Spring Boot, you can now start the application and test it. Either run the application from within your IDE or start it from the command line using:

mvn package
java -jar target/tcp-server-0.0.1-SNAPSHOT.jar

Once the server is running, you can test it using Telnet, e.g.:


> telnet localhost 8080
Trying ::1...
Connected to localhost.
Escape character is '^]'.
test
test
foo
foo
bar
bar
^]
telnet> quit
Connection closed.

You should be able to connect, and any request you send should be “echoed” back as the response.

We now have a functioning TCP echo server using Netty and Spring Boot. In the next session we’ll add Spring Cloud Stream and use a second Spring Boot application to process messages.

Using Spring Cloud Stream to Process Requests

Now that we’ve demonstrated Request-response with Netty and Spring Boot, we’ll extend the example to also dispatch the request asynchronously using Spring Cloud Stream.

TCP Server Modifications

First, we’ll modify TCP server project to use an implementation of SmartLifecycle rather than the @PostConstruct annotation to start the server. This is necessary as @PostConstruct is fired too early during application initialization and will create the TCP server (and start listening) before Spring Cloud Stream is auto-wired. Specifically, you will see the error “Dispatcher has no subscribers for channel ‘unknown.channel.name'”

Create the class ApplicationLifecycle with the following content:

package com.example.tcpserver;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.SmartLifecycle;
import org.springframework.stereotype.Component;

@Component
public class ApplicationLifecycle implements SmartLifecycle {
    private final TcpServer server;
    private boolean running;

    @Autowired
    public ApplicationLifecycle(final TcpServer server) {
        this.server = server;
    }

    @Override
    public boolean isAutoStartup() {
        return true;
    }

    @Override
    public void stop(final Runnable runnable) {
        stop();
        runnable.run();
    }

    @Override
    public void start() {
        try {
            this.server.start();
            this.running = true;
        } catch (InterruptedException e) {
            e.printStackTrace();
            Thread.currentThread().interrupt();
        }
    }

    @Override
    public void stop() {
        this.server.stop();
        this.running = false;
    }

    @Override
    public boolean isRunning() {
        return this.running;
    }

    @Override
    public int getPhase() {
        return Integer.MAX_VALUE;
    }
}

This is a fairly standard implementation of the Spring SmartLifecycle interface. Spring will automatically start the TCP server after Spring Cloud has been auto-wired, due to isAutoStartup() returning true and getPhase() returning a large integer value.

Next, we need to modify the TcpServer class to define the public start() and stop() methods called by the new ApplicationLifecycle class.

To do this, make the NioEventLoopGroup references fields instead of variables.

private NioEventLoopGroup bossGroup;
private NioEventLoopGroup workerGroup;

private void run() throws InterruptedException {
    this.bossGroup = new NioEventLoopGroup();
    this.workerGroup = new NioEventLoopGroup();

Then, replace the postConstruct() method (and its associated annotation) with a new stop() method:

public void stop() {
    this.bossGroup.shutdownGracefully();
    this.workerGroup.shutdownGracefully();
}

Finally, rename the run() method to start() and make it public.

You can now recompile and run the application and you should see the same positive test results, seen in the previous section, using telnet. However, note that more messages are logged on startup as Spring has made it further through its initialization process before starting the TCP server.

Next, we’ll add Spring Cloud Stream to the TCP server project. This will involve a few steps that would not be necessary if we’d included “Cloud Stream” as a dependency when generating the project using Spring Initializr. However, this will serve to demonstrate how to add Spring Cloud to an existing Spring Boot project and we’ll do it the “automatic” way when creating the request processor project.

Open the pom.xml file in the TCP server project. Add the following to the element:

Finchley.RELEASE

Visit here to find the current version of Spring Cloud. Finchley was the latest version at the time this article was being written.

Add the following element under the top-level element:

    
        
            org.springframework.cloud
            spring-cloud-dependencies
            ${spring-cloud.version}
            pom
            import
        
    

Finally, add the following dependency:

    org.springframework.cloud
    spring-cloud-starter-stream-rabbit

Now, we’ll add the Java code necessary to dispatch requests to our message broker (preserving the existing implementation that echoes the request as the response).

Open TcpServerHandler and add the following constructor, which takes an org.springframework.cloud.stream.messaging.Source parameter:

private Source source;

public TcpServerHandler(Source source) {
    this.source = source;
}

Next, modify the channelRead() method to read as shown below.

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    final ByteBuf byteBuf = (ByteBuf) msg;
    final String msgPayload = byteBuf.toString(StandardCharsets.UTF_8);     // get ByteBuf as UTF8 String

    ctx.write(byteBuf);
    ctx.flush();

    final Message message = MessageBuilder.withPayload(msgPayload)  // build Message with payload
            .build();
    this.source.output().send(message);                                     // dispatch message
}

With the changes above, the incoming request will be dispatched to the message broker as a String, preserving the origin code that also returns the request as a response via Netty.

Now open TcpServer so that we can pass the new Source parameter to the TcpServerHandler constructor. Modify the class as shown below:

@EnableBinding(Source.class)
public class TcpServer {
    @Autowired
    private Source source;

There should be two additional annotations and a new auto-wired Source field.

For the final Java change, modify the body of the start() method to pass the source field as a parameter to the TcpServer constructor:

.childHandler(new ChannelInitializer() {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new TcpServerHandler(source));
    }
})
.option(ChannelOption.SO_BACKLOG, 128)

The final change we need to make to the TCP server project, in order to dispatch requests, is to modify the application.properties configuration file to specify where to dispatch messages. Simply open application.properties and add the following:

spring.cloud.stream.bindings.output.destination=tcp_request

This tells Spring Cloud Stream that the destination for the output channel is tcp_request.

That concludes the changes to the TCP server project for this section. The application will now not only echo the request as a response, but will also dispatch the request to a message broker. However, nothing will subscribe to these messages (yet).

Creating a Request Processor

Now we’ll create a new request processor application to process the dispatched requests.

Revisit Spring Initializr – this time to create the request processor application. Change the artifact to “request-processor”, add “Cloud Stream” under the Dependencies heading, and click the Generate Project button. If prompted, choose to download and save the project archive (by default it will be named request-processor.zip).

Extract the downloaded archive and open the project alongside the TCP server project in your IDE.

Add the RabbitMQ binder for Spring Cloud Stream by adding the following dependency to the request processor pom.xml file:

    org.springframework.cloud
    spring-cloud-starter-stream-rabbit

Add a new class named RequestProcessor with the following content:

package com.example.requestprocessor;

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.messaging.handler.annotation.SendTo;

@EnableBinding(Processor.class)
public class RequestProcessor {
    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public Object transform(String message) {
        System.out.printf("Received message '%s'%n", message);
        return message.toUpperCase();
    }
}

As with the TCP server, the code here is taken directly from the vendor documentation to keep the focus on the Request-response implementation in the final section. The processor will simply take the messages it receives, transform the payload to upper-case, and then return the result back to the message broker. The message will also be written to the console for demonstration purposes.

The only other change that needs to be made is to the application.properties file, essentially telling the application where messages come from and where they go to. Open application.properties and add the following lines:

spring.cloud.stream.bindings.input.destination=tcp_request
spring.cloud.stream.bindings.output.destination=tcp_response

This tells Spring Cloud Stream that the destination for the input channel is tcp_request, and the destination for the output channel is tcp_response.

At this point you can run both applications to demonstrate the achieved (but incomplete) functionality: sending a request via telnet should still result in an “echoed” response, but you should also see a message printed to the console by the request processor application indicating it received the message (the CRLF is captured as part of the request):


Received message 'foo
'
Received message 'bar
'

However, the “processed” request is not being used…yet. We’ll cover that in the final section.

Using Spring Cloud Stream to Process Responses

In this final section, we’ll process the response that is generated by the request processor – recall that this will be our request transformed to upper-case. The previous sections have primarily involved straight-forward implementations of the vendor-provided documentation. I’ve provided a some explanation and glue, but you are primarily seeing Netty and Spring Cloud Stream’s documented examples at work.

This final section will be the “meat” of the article – showing how to tie the example code together to implement the Request-response pattern with Spring Cloud Stream and a Netty TCP server.

To achieve this, we need to modify the TCP server project as follows:

  1. Keep track of the active Netty channels (so that we can find the channel that corresponds to each response)
  2. Send the ID of the Netty channel used to make the request along with the dispatched message
  3. Add a new response processor that handles response messages by sending a response to the requesting actor

There will be no changes required for the request processor application. Let’s tie this all together!

Keeping Track of Active Netty Channels

First, we’ll keep track of active Netty channels using an instance of the ChannelGroup class. This will be injected via the Spring IoC container, so we need to declare an instance using the @Bean annotation inside a class that has the @Configuration annotation. Create a new TcpServerConfiguration class with the following code:

package com.example.tcpserver;

import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class TcpServerConfiguration {
    @Bean
    public ChannelGroup channelGroup() {
        return new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    }
}

Next, modify the TcpHandler constructor to take an instance of ChannelGroup (along with the existing Source parameter).

private final ChannelGroup channelGroup;

public TcpServerHandler(final Source source, final ChannelGroup channelGroup) {
    this.source = source;
    this.channelGroup = channelGroup;
}

Override the channelActive() method in the TcpHandler class as well to keep track of active channels using the instance of ChannelGroup:

@Override
public void channelActive(final ChannelHandlerContext ctx) throws Exception {
    this.channelGroup.add(ctx.channel());
    super.channelActive(ctx);
}

Note that there is no need to handle closed / deactivated channels as the implementation of ChannelGroup will do that automatically.

Next, open the code for TcpServer and auto-wire an instance of ChannelGroup.

@Autowired
private ChannelGroup channelGroup;

And pass it to the existing call to the TcpServerHandler constructor.

.childHandler(new ChannelInitializer() {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new TcpServerHandler(source, channelGroup));
    }
})
.option(ChannelOption.SO_BACKLOG, 128)

That’s all that needs to be done in order to keep track of the active Netty channels.

Sending the Netty Channel ID

Now we need to send the ID of the Netty channel associated with the request along with the message published via Spring Cloud Stream.

First, modify the TcpServerHandler code by updating the channelRead() method and introducing a serializeChannelId() method as shown below.

@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
    final ByteBuf byteBuf = (ByteBuf) msg;
    final String msgPayload = byteBuf.toString(StandardCharsets.UTF_8);

    final Message message = MessageBuilder
            .withPayload(msgPayload)
            .setHeader("CHANNEL_ID", serializeChannelId(ctx.channel().id()))
            .build();
    this.source.output().send(message);
}

private String serializeChannelId(final ChannelId channelId) {
    return Base64.getEncoder().encodeToString(SerializationUtils.serialize(channelId));
}

We’ve removed the hard-coded echoing of the request as the response, and we now serialize the associated Netty channel ID, attaching it to the message as a header.

Processing Responses

We’re in the home stretch now! The final step is to start processing the responses.

First, add the class ResponseProcessor with the following code:

package com.example.tcpserver;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelId;
import io.netty.channel.group.ChannelGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.util.SerializationUtils;

import java.lang.invoke.MethodHandles;
import java.nio.charset.StandardCharsets;
import java.util.Base64;

@EnableBinding(Sink.class)
public class ResponseProcessor {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

    private ChannelGroup channelGroup;

    @Autowired
    public ResponseProcessor(final ChannelGroup channelGroup) {
        this.channelGroup = channelGroup;
    }

    @StreamListener(target = Sink.INPUT)
    public void processResponse(final Message message) {
        // log a message if there is no ChannelGroup assigned
        if (this.channelGroup == null) {
            // received a message from the MQ without receiving a TCP/IP message
            logger.info("Unable to send response - no channel group assigned");
            return;
        }

        // read the serialized Netty channel from the Spring Cloud Stream message header
        final MessageHeaders messageHeaders = message.getHeaders();
        final String serializedId = (String)messageHeaders.get("CHANNEL_ID");
        // deserialize the Netty channel ID
        final ChannelId channelId = deserializeChannelId(serializedId);
        // look for an active channel matching the ID
        final Channel clientChannel = this.channelGroup.find(channelId);
        // log a message if there is no matching channel
        if (clientChannel == null) {
            // connection reset by peer
            logger.info("Unable to send response - channel {} not found", channelId);
            return;
        }

        // wrap the message payload as a ByteBuf
        final String responsePayload = message.getPayload();
        final ByteBuf responseBuf = Unpooled.wrappedBuffer(responsePayload.getBytes(StandardCharsets.UTF_8));
        // write the ByteBuf to the Netty channel
        clientChannel.writeAndFlush(responseBuf);
    }

    private ChannelId deserializeChannelId(final String src) {
        return (ChannelId)SerializationUtils.deserialize(Base64.getDecoder().decode(src));
    }
}

This is a pretty standard implementation of a stream processor in Spring Cloud Stream. The inline comments explain the logic of the method germane to this article.

Finally, modify the application.properties file by adding the following line so that the TCP server knows what messages to subscribe to:

spring.cloud.stream.bindings.input.destination=tcp_response

This tells Spring Cloud Stream that the destination for the input channel is tcp_response.

And that is it! You can now run both projects and test them with telnet. You should see that the responses are now transformed to upper-case rather than being echoed exactly as the user inputs.

Wrapping Up

Spring Cloud Stream is a very promising framework, and the Request-response pattern a popular one commonly found in client-server architectures. With the information found in this article, you should be able to make use of Spring Cloud Stream in those cases where asynchronous processing should be used for synchronous requests that expect a response.

Click here to download the source code for the TCP server and request processor applications.