Protobuf Integration with Netty
Protocol Buffers Overview
Protocol Buffers (Protobuf) is Google's language-neutral, platform-neutral mechanism for serializing structured data. It's more efficient than XML for data exchange due to its binary format. The protocol supports various languages including Java, C++, C#, Go, and Python. This makes it ideal for distributed systems communication and heterogeneous environment data exchange.
Official repository: https://github.com/google/protobuf
Java Implementation
First, create a .proto file defining the message structure. For a user entity with ID, name, age, and status fields:
syntax = "proto3";
option java_package="com.example.protobuf";
option java_outer_classname = "UserData";
message User {
int32 id = 1;
string name = 2;
int32 age = 3;
int32 status = 4;
}
Generate Java classes using the protoc compiler:
protoc --java_out=./src/main/java User.proto
Example usage of the generated classes:
UserData.User.Builder userBuilder = UserData.User.newBuilder();
userBuilder.setId(100);
userBuilder.setName("Alice");
userBuilder.setAge(25);
UserData.User message = userBuilder.build();
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
message.writeTo(outputStream);
byte[] serializedData = outputStream.toByteArray();
ByteArrayInputStream inputStream = new ByteArrayInputStream(serializedData);
UserData.User parsedMessage = UserData.User.parseFrom(inputStream);
System.out.println("ID: " + parsedMessage.getId());
System.out.println("Name: " + parsedMessage.getName());
System.out.println("Age: " + parsedMessage.getAge());
Spring Boot Netty Server Implementation
Project Dependencies
<properties>
<java.version>11</java.version>
<netty.version>4.1.77.Final</netty.version>
<protobuf.version>3.19.4</protobuf.version>
<spring-boot.version>2.7.0</spring-boot.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<version>${spring-boot.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
</dependency>
</dependencies>
Server Configuration
@Service
public class NettyServer {
private final int PORT = 8080;
private EventLoopGroup bossGroup = new NioEventLoopGroup(1);
private EventLoopGroup workerGroup = new NioEventLoopGroup();
@Autowired
private ServerChannelInitializer initializer;
@PostConstruct
public void start() {
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(initializer)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture future = bootstrap.bind(PORT).sync();
System.out.println("Server started on port: " + PORT);
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
Channel Pipeline Setup
@Component
public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
@Autowired
private ServerHandler serverHandler;
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS));
pipeline.addLast(new ProtobufVarint32FrameDecoder());
pipeline.addLast(new ProtobufDecoder(UserData.User.getDefaultInstance()));
pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
pipeline.addLast(new ProtobufEncoder());
pipeline.addLast(serverHandler);
}
}
Server Handler Logic
@Component
public class ServerHandler extends ChannelInboundHandlerAdapter {
private static final int MAX_IDLE_COUNT = 2;
private int idleCount = 0;
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("Client connected: " + ctx.channel().remoteAddress());
UserData.User welcome = UserData.User.newBuilder()
.setId(0)
.setName("Server")
.setAge(0)
.setStatus(0)
.build();
ctx.writeAndFlush(welcome);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
if (msg instanceof UserData.User) {
UserData.User user = (UserData.User) msg;
switch (user.getStatus()) {
case 1:
System.out.println("Received acknowledgment from client");
break;
case 2:
System.out.println("Received heartbeat from client");
idleCount = 0;
break;
default:
System.out.println("Unknown message type");
}
}
} finally {
ReferenceCountUtil.release(msg);
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
idleCount++;
System.out.println("Client idle count: " + idleCount);
if (idleCount >= MAX_IDLE_COUNT) {
System.out.println("Closing inactive client connection");
ctx.close();
}
}
} else {
super.userEventTriggered(ctx, evt);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
Netty Client Implementation
Client Configuration
@Component
public class NettyClient {
private final String HOST = "localhost";
private final int PORT = 8080;
private EventLoopGroup workerGroup = new NioEventLoopGroup();
@Autowired
private ClientChannelInitializer initializer;
public void connect() {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workerGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(initializer);
bootstrap.connect(HOST, PORT).addListener((ChannelFuture future) -> {
if (!future.isSuccess()) {
System.out.println("Connection failed, retrying in 5 seconds");
future.channel().eventLoop().schedule(() -> connect(), 5, TimeUnit.SECONDS);
}
});
}
@PreDestroy
public void shutdown() {
workerGroup.shutdownGracefully();
}
}
Client Channel Pipeline
@Component
public class ClientChannelInitializer extends ChannelInitializer<SocketChannel> {
@Autowired
private ClientHandler clientHandler;
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new IdleStateHandler(0, 5, 0, TimeUnit.SECONDS));
pipeline.addLast(new ProtobufVarint32FrameDecoder());
pipeline.addLast(new ProtobufDecoder(UserData.User.getDefaultInstance()));
pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
pipeline.addLast(new ProtobufEncoder());
pipeline.addLast(clientHandler);
}
}
Client Handler
@Component
@ChannelHandler.Sharable
public class ClientHandler extends ChannelInboundHandlerAdapter {
private AtomicInteger heartbeatCount = new AtomicInteger(0);
@Autowired
private NettyClient nettyClient;
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("Connected to server: " + ctx.channel().remoteAddress());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
System.out.println("Disconnected from server");
ctx.channel().eventLoop().schedule(() -> nettyClient.connect(), 3, TimeUnit.SECONDS);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
if (msg instanceof UserData.User) {
UserData.User user = (UserData.User) msg;
System.out.println("Received message - ID: " + user.getId() +
", Name: " + user.getName() +
", Age: " + user.getAge());
UserData.User response = UserData.User.newBuilder()
.setStatus(1)
.build();
ctx.writeAndFlush(response);
}
} finally {
ReferenceCountUtil.release(msg);
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.WRITER_IDLE) {
int count = heartbeatCount.incrementAndGet();
System.out.println("Sending heartbeat #" + count);
UserData.User heartbeat = UserData.User.newBuilder()
.setStatus(2)
.build();
ctx.writeAndFlush(heartbeat);
}
} else {
super.userEventTriggered(ctx, evt);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
Application Bootstrap
Main Application Class
@SpringBootApplication
public class NettyProtobufApplication {
@Autowired
private NettyServer server;
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(NettyProtobufApplication.class, args);
NettyServer nettyServer = context.getBean(NettyServer.class);
new Thread(nettyServer::start).start();
}
}
Test Configuration
@Component
public class ClientStarter implements ApplicationListener<ContextRefreshedEvent> {
@Autowired
private NettyClient client;
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
new Thread(() -> {
try {
Thread.sleep(2000);
client.connect();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}
}