Developing network applications often requires managing complex connection states and custom protocols. A practical way to understand these concepts is by constructing a chat application and a lightweight RPC framework.
Chat Application Architecture
The system consists of a client module, a server module, message definitions, protocol codecs, and session management components. The client handles user input and displays responses, while the server manages business logic, user sessions, and group states.
Supported Commands
The client console accepts specific instructions to interact with the server:
| Command Format | Functionality |
|---|---|
send [user] [text] |
Send a private message to a specific user |
gsend [group] [text] |
Broadcast content to a specific group |
gcreate [group] [users] |
Create a new group with initial members |
gmembers [group] |
List members of a specific group |
gjoin [group] |
Join an existing group |
gquit [group] |
Leave a specific group |
quit |
Terminate the client connection |
Each command maps to a distinct message type. The server pipeline routes these messages to specific handlers based on their type.
Channel Lifecycle Events
Netty provides specific callbacks for channel state changes. Understanding the order of these events is crucial for resource management.
- channelRegistered: Invoked when the channel is registered to an EventLoop.
- channelActive: Invoked when the channel becomes active (connection established).
- channelInactive: Invoked when the connection is closed.
- channelUnregistered: Invoked when the channel is removed from the EventLoop.
The typical lifecycle follows: registered -> active -> inactive -> unregistered. Additionally, channelRead handles inbound data, and userEventTriggered handles custom events like idle states.
Authentication Flow
Client Side
Upon establishing a connection, the client must authenticate. This involves capturing user credentials in a separate thread to avoid blocking the NIO event loop, then sending a login request object.
public class AuthClientHandler extends ChannelInboundHandlerAdapter {
private final CountDownLatch authLatch;
private final AtomicBoolean authStatus;
public AuthClientHandler(CountDownLatch latch, AtomicBoolean status) {
this.authLatch = latch;
this.authStatus = status;
}
@Override
public void channelActive(ChannelHandlerContext context) {
new Thread(() -> {
Scanner scanner = new Scanner(System.in);
System.out.print("Username: ");
String user = scanner.nextLine();
System.out.print("Password: ");
String pass = scanner.nextLine();
LoginRequestPacket packet = new LoginRequestPacket(user, pass);
context.writeAndFlush(packet);
try {
authLatch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}
@Override
public void channelRead(ChannelHandlerContext context, Object message) {
if (message instanceof LoginResponsePacket) {
LoginResponsePacket response = (LoginResponsePacket) message;
authStatus.set(response.isSuccess());
authLatch.countDown();
}
}
}
Server Side
The server validates credentials and binds the channel to the username if successful.
@ChannelHandler.Sharable
public class AuthRequestHandler extends SimpleChannelInboundHandler<LoginRequestPacket> {
@Override
protected void channelRead0(ChannelHandlerContext context, LoginRequestPacket packet) {
UserService service = ServiceRegistry.getUserService();
boolean valid = service.validate(packet.getUsername(), packet.getPassword());
LoginResponsePacket response;
if (valid) {
SessionRegistry.bind(context.channel(), packet.getUsername());
response = new LoginResponsePacket(true, "Authentication successful");
} else {
response = new LoginResponsePacket(false, "Invalid credentials");
}
context.writeAndFlush(response);
}
}
Private Messaging
Private messaging requires locating the recipient's channel based on their username.
@ChannelHandler.Sharable
public class PrivateChatHandler extends SimpleChannelInboundHandler<ChatRequestPacket> {
@Override
protected void channelRead0(ChannelHandlerContext context, ChatRequestPacket packet) {
String targetUser = packet.getReceiver();
Channel targetChannel = SessionRegistry.getChannel(targetUser);
ChatResponsePacket reply;
if (targetChannel != null && targetChannel.isActive()) {
targetChannel.writeAndFlush(new ChatResponsePacket(packet.getSender(), packet.getContent()));
reply = new ChatResponsePacket(true, "Delivered");
} else {
reply = new ChatResponsePacket(false, "User offline");
}
context.writeAndFlush(reply);
}
}
Group Chat Implementation
Group management involves maintaining a mapping of group names to member channels.
Group Creation
@ChannelHandler.Sharable
public class GroupCreateHandler extends SimpleChannelInboundHandler<GroupCreateRequestPacket> {
@Override
protected void channelRead0(ChannelHandlerContext context, GroupCreateRequestPacket packet) {
String groupName = packet.getGroupName();
GroupManager manager = GroupRegistry.getManager();
if (manager.exists(groupName)) {
context.writeAndFlush(new GroupCreateResponsePacket(false, "Group exists"));
return;
}
manager.createGroup(groupName, packet.getMembers());
context.writeAndFlush(new GroupCreateResponsePacket(true, "Group created"));
// Notify members
for (Channel ch : manager.getGroupChannels(groupName)) {
ch.writeAndFlush(new GroupCreateResponsePacket(true, "Joined " + groupName));
}
}
}
Group Messaging
@ChannelHandler.Sharable
public class GroupChatHandler extends SimpleChannelInboundHandler<GroupChatRequestPacket> {
@Override
protected void channelRead0(ChannelHandlerContext context, GroupChatRequestPacket packet) {
List<Channel> members = GroupRegistry.getManager().getGroupChannels(packet.getGroupName());
if (members == null) return;
for (Channel ch : members) {
ch.writeAndFlush(new GroupChatResponsePacket(packet.getSender(), packet.getContent()));
}
}
}
Connection Lifecycle and Heartbeats
Connections may terminate normally or abnormally (e.g., power loss). The server must clean up session data in both cases.
Cleanup Handler
@ChannelHandler.Sharable
public class ConnectionCleanupHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelInactive(ChannelHandlerContext context) {
SessionRegistry.unbind(context.channel());
System.out.println("Connection closed: " + context.channel());
}
@Override
public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
SessionRegistry.unbind(context.channel());
context.close();
}
}
Idle State Detection
To handle "zombie" connections where the TCP link is broken but the application is unaware, Netty provides IdleStateHandler.
Server Configuration: Close the channel if no read activity occurs for 5 seconds.
pipeline.addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
pipeline.addLast(new ChannelDuplexHandler() {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
ctx.close();
}
}
}
});
Client Configuration: Send a heartbeat packet if no write activity occurs for 3 seconds.
pipeline.addLast(new IdleStateHandler(0, 3, 0, TimeUnit.SECONDS));
pipeline.addLast(new ChannelDuplexHandler() {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.WRITER_IDLE) {
ctx.writeAndFlush(new HeartbeatPacket());
}
}
}
});
Building a Lightweight RPC Framework
Remote Procedure Call (RPC) allows executing methods on a remote server asif they were local. This implementation uses Netty for transport and JDK dynamic proxies for client-side abstraction.
Message Definitions
Request Object:
public class RpcCallPacket extends MessagePacket {
private String interfaceName;
private String methodName;
private Class<?> returnType;
private Class<?>[] paramTypes;
private Object[] paramValues;
// Constructor and getters omitted for brevity
@Override
public int getMessageType() {
return MessageType.RPC_REQUEST;
}
}
Response Object:
public class RpcResultPacket extends MessagePacket {
private Object result;
private Exception error;
// Constructor and getters omitted for brevity
@Override
public int getMessageType() {
return MessageType.RPC_RESPONSE;
}
}
Server-Side Processing
The server receives the request, uses reflection to invoke the method, and returns the result.
public class RpcRequestProcessor extends SimpleChannelInboundHandler<RpcCallPacket> {
@Override
protected void channelRead0(ChannelHandlerContext context, RpcCallPacket packet) {
RpcResultPacket response = new RpcResultPacket();
response.setSequenceId(packet.getSequenceId());
try {
Class<?> serviceClass = Class.forName(packet.getInterfaceName());
Object service = ServiceLocator.getService(serviceClass);
Method method = service.getClass().getMethod(packet.getMethodName(), packet.getParamTypes());
Object result = method.invoke(service, packet.getParamValues());
response.setResult(result);
} catch (Exception e) {
response.setError(new RuntimeException("Invocation failed: " + e.getMessage()));
}
context.writeAndFlush(response);
}
}
Client-Side Proxy
The client uses a dynamic proxy to intercept method calls, serialize them into RPC packets, and wait for the response using a Promise.
public class RpcProxyFactory {
private static final ConcurrentHashMap<Integer, Promise<Object>> pendingCalls = new ConcurrentHashMap<>();
private static volatile Channel channel;
public static <T> T createProxy(Class<T> interfaceType) {
return (T) Proxy.newProxyInstance(
interfaceType.getClassLoader(),
new Class[]{interfaceType},
(proxy, method, args) -> {
RpcCallPacket request = new RpcCallPacket(
IdGenerator.nextId(),
interfaceType.getName(),
method.getName(),
method.getReturnType(),
method.getParameterTypes(),
args
);
Channel currentChannel = getChannel();
currentChannel.writeAndFlush(request);
DefaultPromise<Object> promise = new DefaultPromise<>(currentChannel.eventLoop());
pendingCalls.put(request.getSequenceId(), promise);
promise.await();
if (promise.isSuccess()) {
return promise.getNow();
} else {
throw new RuntimeException("RPC Call Failed", promise.cause());
}
}
);
}
// Channel initialization logic omitted for brevity
private static Channel getChannel() {
// Double-checked locking for singleton channel
if (channel != null) return channel;
synchronized (RpcProxyFactory.class) {
if (channel == null) {
// Initialize Bootstrap and connect
// ...
}
return channel;
}
}
}
Response Collection
The client handler matches incoming responses to pending promises using the sequence ID.
@ChannelHandler.Sharable
public class RpcResponseCollector extends SimpleChannelInboundHandler<RpcResultPacket> {
@Override
protected void channelRead0(ChannelHandlerContext context, RpcResultPacket packet) {
Promise<Object> promise = RpcProxyFactory.pendingCalls.remove(packet.getSequenceId());
if (promise != null) {
if (packet.getError() == null) {
promise.setSuccess(packet.getResult());
} else {
promise.setFailure(packet.getError());
}
}
}
}
Implementation Considerations
- Sequence ID Uniqueness: Ensure the ID generator produces unique IDs to correctly match requests and responses.
- Synchronous vs Asynchronous: The example uses
promise.await(), making the call synchronous. Production systems often prefer asynchronous callbacks or CompletableFuture. - Payload Size: Custom protocols often have frame size limits. Exception stack traces should be truncated to prevent oversized messages.
- Service Discovery: The example uses a simple factory. Real-world systems typically use registries like Zookeeper or Nacos.