Implementing Netty with Protobuf in Spring Boot Applications

Protobuf Integration with Netty

Protocol Buffers Overview

Protocol Buffers (Protobuf) is Google's language-neutral, platform-neutral mechanism for serializing structured data. It's more efficient than XML for data exchange due to its binary format. The protocol supports various languages including Java, C++, C#, Go, and Python. This makes it ideal for distributed systems communication and heterogeneous environment data exchange.

Official repository: https://github.com/google/protobuf

Java Implementation

First, create a .proto file defining the message structure. For a user entity with ID, name, age, and status fields:

syntax = "proto3";

option java_package="com.example.protobuf";
option java_outer_classname = "UserData";

message User {  
    int32 id = 1;  
    string name = 2;  
    int32 age = 3;  
    int32 status = 4;  
}

Generate Java classes using the protoc compiler:

protoc --java_out=./src/main/java User.proto

Example usage of the generated classes:

UserData.User.Builder userBuilder = UserData.User.newBuilder();
userBuilder.setId(100);
userBuilder.setName("Alice");
userBuilder.setAge(25);
UserData.User message = userBuilder.build();

ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
message.writeTo(outputStream);
byte[] serializedData = outputStream.toByteArray();

ByteArrayInputStream inputStream = new ByteArrayInputStream(serializedData);
UserData.User parsedMessage = UserData.User.parseFrom(inputStream);

System.out.println("ID: " + parsedMessage.getId());
System.out.println("Name: " + parsedMessage.getName());
System.out.println("Age: " + parsedMessage.getAge());

Spring Boot Netty Server Implementation

Project Dependencies

<properties>
    <java.version>11</java.version>
    <netty.version>4.1.77.Final</netty.version>
    <protobuf.version>3.19.4</protobuf.version>
    <spring-boot.version>2.7.0</spring-boot.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
        <version>${spring-boot.version}</version>
    </dependency>
    
    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>${netty.version}</version>
    </dependency>
    
    <dependency>
        <groupId>com.google.protobuf</groupId>
        <artifactId>protobuf-java</artifactId>
        <version>${protobuf.version}</version>
    </dependency>
</dependencies>

Server Configuration

@Service
public class NettyServer {
    private final int PORT = 8080;
    private EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    private EventLoopGroup workerGroup = new NioEventLoopGroup();
    
    @Autowired
    private ServerChannelInitializer initializer;
    
    @PostConstruct
    public void start() {
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(initializer)
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);
            
            ChannelFuture future = bootstrap.bind(PORT).sync();
            System.out.println("Server started on port: " + PORT);
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}

Channel Pipeline Setup

@Component
public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
    
    @Autowired
    private ServerHandler serverHandler;
    
    @Override
    protected void initChannel(SocketChannel ch) {
        ChannelPipeline pipeline = ch.pipeline();
        
        pipeline.addLast(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS));
        pipeline.addLast(new ProtobufVarint32FrameDecoder());
        pipeline.addLast(new ProtobufDecoder(UserData.User.getDefaultInstance()));
        pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
        pipeline.addLast(new ProtobufEncoder());
        pipeline.addLast(serverHandler);
    }
}

Server Handler Logic

@Component
public class ServerHandler extends ChannelInboundHandlerAdapter {
    private static final int MAX_IDLE_COUNT = 2;
    private int idleCount = 0;
    
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        System.out.println("Client connected: " + ctx.channel().remoteAddress());
        
        UserData.User welcome = UserData.User.newBuilder()
                .setId(0)
                .setName("Server")
                .setAge(0)
                .setStatus(0)
                .build();
        ctx.writeAndFlush(welcome);
    }
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        try {
            if (msg instanceof UserData.User) {
                UserData.User user = (UserData.User) msg;
                
                switch (user.getStatus()) {
                    case 1:
                        System.out.println("Received acknowledgment from client");
                        break;
                    case 2:
                        System.out.println("Received heartbeat from client");
                        idleCount = 0;
                        break;
                    default:
                        System.out.println("Unknown message type");
                }
            }
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }
    
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state() == IdleState.READER_IDLE) {
                idleCount++;
                System.out.println("Client idle count: " + idleCount);
                
                if (idleCount >= MAX_IDLE_COUNT) {
                    System.out.println("Closing inactive client connection");
                    ctx.close();
                }
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

Netty Client Implementation

Client Configuration

@Component
public class NettyClient {
    private final String HOST = "localhost";
    private final int PORT = 8080;
    private EventLoopGroup workerGroup = new NioEventLoopGroup();
    
    @Autowired
    private ClientChannelInitializer initializer;
    
    public void connect() {
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(workerGroup)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .handler(initializer);
        
        bootstrap.connect(HOST, PORT).addListener((ChannelFuture future) -> {
            if (!future.isSuccess()) {
                System.out.println("Connection failed, retrying in 5 seconds");
                future.channel().eventLoop().schedule(() -> connect(), 5, TimeUnit.SECONDS);
            }
        });
    }
    
    @PreDestroy
    public void shutdown() {
        workerGroup.shutdownGracefully();
    }
}

Client Channel Pipeline

@Component
public class ClientChannelInitializer extends ChannelInitializer<SocketChannel> {
    
    @Autowired
    private ClientHandler clientHandler;
    
    @Override
    protected void initChannel(SocketChannel ch) {
        ChannelPipeline pipeline = ch.pipeline();
        
        pipeline.addLast(new IdleStateHandler(0, 5, 0, TimeUnit.SECONDS));
        pipeline.addLast(new ProtobufVarint32FrameDecoder());
        pipeline.addLast(new ProtobufDecoder(UserData.User.getDefaultInstance()));
        pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
        pipeline.addLast(new ProtobufEncoder());
        pipeline.addLast(clientHandler);
    }
}

Client Handler

@Component
@ChannelHandler.Sharable
public class ClientHandler extends ChannelInboundHandlerAdapter {
    private AtomicInteger heartbeatCount = new AtomicInteger(0);
    
    @Autowired
    private NettyClient nettyClient;
    
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        System.out.println("Connected to server: " + ctx.channel().remoteAddress());
    }
    
    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        System.out.println("Disconnected from server");
        ctx.channel().eventLoop().schedule(() -> nettyClient.connect(), 3, TimeUnit.SECONDS);
    }
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        try {
            if (msg instanceof UserData.User) {
                UserData.User user = (UserData.User) msg;
                System.out.println("Received message - ID: " + user.getId() + 
                        ", Name: " + user.getName() + 
                        ", Age: " + user.getAge());
                
                UserData.User response = UserData.User.newBuilder()
                        .setStatus(1)
                        .build();
                ctx.writeAndFlush(response);
            }
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }
    
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state() == IdleState.WRITER_IDLE) {
                int count = heartbeatCount.incrementAndGet();
                System.out.println("Sending heartbeat #" + count);
                
                UserData.User heartbeat = UserData.User.newBuilder()
                        .setStatus(2)
                        .build();
                ctx.writeAndFlush(heartbeat);
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

Application Bootstrap

Main Application Class

@SpringBootApplication
public class NettyProtobufApplication {
    
    @Autowired
    private NettyServer server;
    
    public static void main(String[] args) {
        ConfigurableApplicationContext context = SpringApplication.run(NettyProtobufApplication.class, args);
        NettyServer nettyServer = context.getBean(NettyServer.class);
        new Thread(nettyServer::start).start();
    }
}

Test Configuration

@Component
public class ClientStarter implements ApplicationListener<ContextRefreshedEvent> {
    
    @Autowired
    private NettyClient client;
    
    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        new Thread(() -> {
            try {
                Thread.sleep(2000);
                client.connect();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }).start();
    }
}

Tags: Spring Boot Netty Protobuf Network Programming java

Posted on Sun, 10 May 2026 23:51:22 +0000 by EZE