Server Implementation
Apache Mina provides a robust framework for building network applications. When combined with Spring's dependency injection, it creates a flexible architecture for real-time message delivery and session management.
Data Transfer Objects
The communication layer requires structured message objects for serialization. The incoming message object captures client requests:
package com.messaging.model;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
public class IncomingMessage implements Serializable {
private static final long serialVersionUID = 1L;
private String action;
private Map payload;
private long createdAt;
public IncomingMessage() {
this.payload = new HashMap<>();
this.createdAt = System.currentTimeMillis();
}
public String getAction() {
return action;
}
public void setAction(String action) {
this.action = action;
}
public void addData(String key, String value) {
this.payload.put(key, value);
}
public String getData(String key) {
return this.payload.get(key);
}
public void removeData(String key) {
this.payload.remove(key);
}
public Map getPayload() {
return payload;
}
public long getCreatedAt() {
return createdAt;
}
public void setCreatedAt(long createdAt) {
this.createdAt = createdAt;
}
} The response object formats server replies:
package com.messaging.model;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
public class OutgoingMessage implements Serializable {
private static final long serialVersionUID = 1L;
private String action;
private String status;
private String description;
private Map payload;
private long timestamp;
public OutgoingMessage() {
this.payload = new HashMap<>();
this.timestamp = System.currentTimeMillis();
}
public String getAction() {
return action;
}
public void setAction(String action) {
this.action = action;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
public String getDescription() {
return description;
}
public void setDescription(String description) {
this.description = description;
}
public void addData(String key, String value) {
this.payload.put(key, value);
}
public String getData(String key) {
return this.payload.get(key);
}
public Map getPayload() {
return payload;
}
public long getTimestamp() {
return timestamp;
}
} Protocol Constants
Define message types and response codes in a centralized constants class:
package com.messaging.model;
public class ProtocolConstants {
public static final String USER_IDENTIFIER = "userId";
public static final String HEARTBEAT_REQ = "ping";
public static final String HEARTBEAT_RESP = "pong";
public static class StatusCode {
public static final String SUCCESS = "200";
public static final String NOT_FOUND = "404";
public static final String UNAUTHORIZED = "403";
public static final String UNKNOWN_ACTION = "405";
public static final String SERVER_ERROR = "500";
}
public static class EventType {
public static final String FORCE_DISCONNECT = "force_disconnect";
}
}Heartbeat Mechanism
Implement connection health monitoring through a keep-alive factory:
package com.messaging.heartbeat;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.keepalive.KeepAliveMessageFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.messaging.model.ProtocolConstants;
public class HeartbeatFactory implements KeepAliveMessageFactory {
private static final Logger LOG = LoggerFactory.getLogger(HeartbeatFactory.class);
private static final String PING = ProtocolConstants.HEARTBEAT_REQ;
private static final String PONG = ProtocolConstants.HEARTBEAT_RESP;
@Override
public Object getRequest(IoSession session) {
LOG.debug("Sending heartbeat request");
return PING;
}
@Override
public Object getResponse(IoSession session, Object request) {
LOG.debug("Responding to heartbeat: {}", request);
return PONG;
}
@Override
public boolean isRequest(IoSession session, Object message) {
return PING.equals(message);
}
@Override
public boolean isResponse(IoSession session, Object message) {
return PONG.equals(message);
}
}Server Bootstrap Configuration
The server initializes the NIO socket acceptor with filters and handlers:
package com.messaging.server;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Map;
import org.apache.mina.core.service.IoAcceptor;
import org.apache.mina.core.service.IoHandler;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory;
import org.apache.mina.filter.executor.ExecutorFilter;
import org.apache.mina.filter.keepalive.KeepAliveFilter;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
import com.messaging.heartbeat.HeartbeatFactory;
public class MessageServer {
private IoAcceptor acceptor;
private IoHandler messageHandler;
private int serverPort;
public void start() throws IOException {
acceptor = new NioSocketAcceptor();
acceptor.getSessionConfig().setReadBufferSize(2048);
acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 60);
acceptor.getFilterChain().addLast("executor", new ExecutorFilter());
acceptor.getFilterChain().addLast("logger", new LoggingFilter());
acceptor.getFilterChain().addLast("codec",
new ProtocolCodecFilter(new ObjectSerializationCodecFactory()));
KeepAliveFilter heartbeatFilter = new KeepAliveFilter(
new HeartbeatFactory(), IdleStatus.BOTH_IDLE);
heartbeatFilter.setForwardEvent(true);
heartbeatFilter.setRequestInterval(30);
acceptor.getFilterChain().addLast("heartbeat", heartbeatFilter);
acceptor.setHandler(messageHandler);
acceptor.bind(new InetSocketAddress(serverPort));
System.out.println("Message server started on port: " + serverPort);
}
public void stop() {
if (acceptor != null) {
acceptor.unbind();
System.out.println("Message server stopped");
}
}
public Map getActiveSessions() {
return acceptor.getManagedSessions();
}
public void setMessageHandler(IoHandler messageHandler) {
this.messageHandler = messageHandler;
}
public void setServerPort(int serverPort) {
this.serverPort = serverPort;
}
} Session Management
Wrap IoSession to track user connections:
package com.messaging.session;
import java.io.Serializable;
import java.net.InetAddress;
import java.net.SocketAddress;
import org.apache.mina.core.session.IoSession;
public class UserSession implements Serializable {
private static final long serialVersionUID = 1L;
private transient IoSession ioSession;
private String sessionId;
private String serverHost;
private String userId;
private long connectedAt;
private long lastHeartbeat;
public UserSession() {}
public UserSession(IoSession ioSession) {
this.ioSession = ioSession;
this.sessionId = String.valueOf(ioSession.getId());
}
public void setAttribute(String key, Object value) {
if (ioSession != null) {
ioSession.setAttribute(key, value);
}
}
public Object getAttribute(String key) {
return ioSession != null ? ioSession.getAttribute(key) : null;
}
public void removeAttribute(String key) {
if (ioSession != null) {
ioSession.removeAttribute(key);
}
}
public boolean hasAttribute(String key) {
return ioSession != null && ioSession.containsAttribute(key);
}
public void sendMessage(Object message) {
if (ioSession != null && ioSession.isConnected()) {
ioSession.write(message);
}
}
public void disconnect() {
if (ioSession != null) {
ioSession.closeNow();
}
}
public boolean isConnected() {
return ioSession != null && ioSession.isConnected();
}
public SocketAddress getRemoteAddress() {
return ioSession != null ? ioSession.getRemoteAddress() : null;
}
public boolean isLocalSession() {
try {
String localIp = InetAddress.getLocalHost().getHostAddress();
return localIp.equals(serverHost);
} catch (Exception e) {
return false;
}
}
// Getters and setters omitted for brevity
}Create a session manager interface:
package com.messaging.session;
public interface SessionRegistry {
void register(String userId, UserSession session);
UserSession find(String userId);
void remove(UserSession session);
void remove(String userId);
}Implement with thread-safe storage:
package com.messaging.session;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
public class DefaultSessionRegistry implements SessionRegistry {
private final ConcurrentHashMap sessions = new ConcurrentHashMap<>();
private final AtomicInteger connectionCount = new AtomicInteger(0);
@Override
public void register(String userId, UserSession session) {
if (session != null) {
sessions.put(userId, session);
connectionCount.incrementAndGet();
}
}
@Override
public UserSession find(String userId) {
return sessions.get(userId);
}
@Override
public void remove(UserSession session) {
sessions.remove(session.getUserId());
}
@Override
public void remove(String userId) {
sessions.remove(userId);
}
} Request Handler Interface
Define a strategy pattern for handling different message types:
package com.messaging.handler;
import com.messaging.model.IncomingMessage;
import com.messaging.model.OutgoingMessage;
import com.messaging.session.UserSession;
public interface MessageProcessor {
OutgoingMessage handle(UserSession session, IncomingMessage request);
}Main Handler Implementation
Route incoming messages to appropriate processors:
package com.messaging.handler;
import java.util.HashMap;
import java.util.Map;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IoSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import com.messaging.model.IncomingMessage;
import com.messaging.model.OutgoingMessage;
import com.messaging.model.ProtocolConstants;
import com.messaging.session.UserSession;
@Component
public class DispatchHandler extends IoHandlerAdapter {
private static final Logger LOG = LoggerFactory.getLogger(DispatchHandler.class);
private Map processors = new HashMap<>();
@Override
public void sessionCreated(IoSession session) throws Exception {
LOG.info("New connection from: {}", session.getRemoteAddress());
}
@Override
public void messageReceived(IoSession ioSession, Object message) throws Exception {
if (message instanceof IncomingMessage) {
IncomingMessage request = (IncomingMessage) message;
UserSession userSession = new UserSession(ioSession);
String action = request.getAction();
OutgoingMessage response;
MessageProcessor processor = processors.get(action);
if (processor == null) {
response = new OutgoingMessage();
response.setStatus(ProtocolConstants.StatusCode.UNKNOWN_ACTION);
response.setDescription("Unknown action: " + action);
} else {
response = processor.handle(userSession, request);
}
if (response != null) {
response.setAction(action);
userSession.sendMessage(response);
}
}
}
@Override
public void sessionClosed(IoSession ioSession) throws Exception {
UserSession userSession = new UserSession(ioSession);
MessageProcessor closeProcessor = processors.get("disconnect");
if (closeProcessor != null && userSession.hasAttribute(ProtocolConstants.USER_IDENTIFIER)) {
closeProcessor.handle(userSession, null);
}
userSession.disconnect();
LOG.info("Session closed: {}", ioSession.getRemoteAddress());
}
@Override
public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
LOG.error("Session error: {}", session.getRemoteAddress(), cause);
}
public void setProcessors(Map processors) {
this.processors = processors;
}
} Business Logic Processors
Handle user binding when connecting:
package com.messaging.handler;
import java.net.InetAddress;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.messaging.model.IncomingMessage;
import com.messaging.model.OutgoingMessage;
import com.messaging.model.ProtocolConstants;
import com.messaging.session.DefaultSessionRegistry;
import com.messaging.session.UserSession;
import com.messaging.util.SpringContext;
public class BindProcessor implements MessageProcessor {
private static final Logger LOG = LoggerFactory.getLogger(BindProcessor.class);
@Override
public OutgoingMessage handle(UserSession newSession, IncomingMessage request) {
OutgoingMessage response = new OutgoingMessage();
DefaultSessionRegistry registry = SpringContext.getBean(DefaultSessionRegistry.class);
try {
String userId = request.getData(ProtocolConstants.USER_IDENTIFIER);
newSession.setUserId(userId);
newSession.setSessionId(UUID.randomUUID().toString());
newSession.setServerHost(InetAddress.getLocalHost().getHostAddress());
newSession.setConnectedAt(System.currentTimeMillis());
newSession.setLastHeartbeat(System.currentTimeMillis());
UserSession existingSession = registry.find(userId);
if (existingSession != null && !existingSession.equals(newSession)) {
OutgoingMessage disconnectNotice = new OutgoingMessage();
disconnectNotice.setStatus(ProtocolConstants.EventType.FORCE_DISCONNECT);
disconnectNotice.addData(ProtocolConstants.USER_IDENTIFIER, userId);
if (existingSession.isLocalSession()) {
existingSession.sendMessage(disconnectNotice);
existingSession.disconnect();
}
}
registry.register(userId, newSession);
response.setStatus(ProtocolConstants.StatusCode.SUCCESS);
LOG.info("User bound: {}", userId);
} catch (Exception e) {
response.setStatus(ProtocolConstants.StatusCode.SERVER_ERROR);
LOG.error("Bind error", e);
}
return response;
}
}Handle message forwarding between clients:
package com.messaging.handler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.messaging.model.IncomingMessage;
import com.messaging.model.OutgoingMessage;
import com.messaging.model.ProtocolConstants;
import com.messaging.session.DefaultSessionRegistry;
import com.messaging.session.UserSession;
import com.messaging.util.SpringContext;
public class ForwardProcessor implements MessageProcessor {
private static final Logger LOG = LoggerFactory.getLogger(ForwardProcessor.class);
@Override
public OutgoingMessage handle(UserSession sender, IncomingMessage request) {
OutgoingMessage response = new OutgoingMessage();
DefaultSessionRegistry registry = SpringContext.getBean(DefaultSessionRegistry.class);
String targetUserId = request.getData(ProtocolConstants.USER_IDENTIFIER);
UserSession targetSession = registry.find(targetUserId);
if (targetSession != null && targetSession.isConnected()) {
OutgoingMessage forwardMessage = new OutgoingMessage();
forwardMessage.setAction(request.getAction());
forwardMessage.setStatus(ProtocolConstants.StatusCode.SUCCESS);
forwardMessage.setDescription("Forwarded message");
forwardMessage.getPayload().putAll(request.getPayload());
forwardMessage.getPayload().remove(ProtocolConstants.USER_IDENTIFIER);
targetSession.sendMessage(forwardMessage);
response.setStatus(ProtocolConstants.StatusCode.SUCCESS);
LOG.info("Message forwarded to: {}", targetUserId);
} else {
response.setStatus(ProtocolConstants.StatusCode.UNAUTHORIZED);
response.setDescription("Target user not connected");
}
return response;
}
}Handle session cleanup on disconnect:
package com.messaging.handler;
import com.messaging.model.IncomingMessage;
import com.messaging.model.OutgoingMessage;
import com.messaging.model.ProtocolConstants;
import com.messaging.session.DefaultSessionRegistry;
import com.messaging.session.UserSession;
import com.messaging.util.SpringContext;
public class DisconnectProcessor implements MessageProcessor {
@Override
public OutgoingMessage handle(UserSession session, IncomingMessage request) {
DefaultSessionRegistry registry = SpringContext.getBean(DefaultSessionRegistry.class);
if (session.getAttribute(ProtocolConstants.USER_IDENTIFIER) != null) {
String userId = session.getAttribute(ProtocolConstants.USER_IDENTIFIER).toString();
registry.remove(userId);
}
return null;
}
}Spring Configuration
Wire components together using XML configuration:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean class="org.springframework.beans.factory.config.CustomEditorConfigurer">
<property name="customEditors">
<map>
<entry key="java.net.SocketAddress"
value="org.apache.mina.integration.beans.InetSocketAddressEditor"/>
</map>
</property>
</bean>
<bean id="dispatchHandler" class="com.messaging.handler.DispatchHandler">
<property name="processors">
<map>
<entry key="bind">
<bean class="com.messaging.handler.BindProcessor"/>
</entry>
<entry key="forward">
<bean class="com.messaging.handler.ForwardProcessor"/>
</entry>
<entry key="disconnect">
<bean class="com.messaging.handler.DisconnectProcessor"/>
</entry>
</map>
</property>
</bean>
<bean id="messageServer" class="com.messaging.server.MessageServer"
init-method="start" destroy-method="stop">
<property name="serverPort" value="8888"/>
<property name="messageHandler" ref="dispatchHandler"/>
</bean>
<bean id="sessionRegistry" class="com.messaging.session.DefaultSessionRegistry"/>
<bean id="springContext" class="com.messaging.util.SpringContext"/>
</beans>Client Implementation
Create a client handler to process server messages:
package com.messaging.client;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IoSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.messaging.model.ProtocolConstants;
public class ClientMessageHandler extends IoHandlerAdapter {
private static final Logger LOG = LoggerFactory.getLogger(ClientMessageHandler.class);
@Override
public void messageReceived(IoSession session, Object message) throws Exception {
String msg = message.toString();
if (ProtocolConstants.HEARTBEAT_REQ.equals(msg)) {
LOG.debug("Heartbeat received");
session.write(ProtocolConstants.HEARTBEAT_RESP);
} else {
System.out.println("Received: " + msg);
}
}
@Override
public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
LOG.error("Client error", cause);
}
}Build the client connection:
package com.messaging.client;
import java.net.InetSocketAddress;
import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory;
import org.apache.mina.transport.socket.nio.NioSocketConnector;
import com.messaging.model.IncomingMessage;
import com.messaging.model.ProtocolConstants;
public class MessageClient {
private static final String HOST = "127.0.0.1";
private static final int PORT = 8888;
private NioSocketConnector connector;
private IoSession session;
public void connect() {
connector = new NioSocketConnector();
connector.setConnectTimeoutMillis(30000);
connector.getFilterChain().addLast("codec",
new ProtocolCodecFilter(new ObjectSerializationCodecFactory()));
connector.setHandler(new ClientMessageHandler());
ConnectFuture future = connector.connect(new InetSocketAddress(HOST, PORT));
future.awaitUninterruptibly();
session = future.getSession();
System.out.println("Connected to server");
}
public void bindUser(String userId) {
IncomingMessage bindMsg = new IncomingMessage();
bindMsg.setAction("bind");
bindMsg.addData(ProtocolConstants.USER_IDENTIFIER, userId);
bindMsg.addData("info", "Test user connection");
session.write(bindMsg);
System.out.println("Bind request sent for: " + userId);
}
public void forwardMessage(String targetUserId, String content) {
IncomingMessage forwardMsg = new IncomingMessage();
forwardMsg.setAction("forward");
forwardMsg.addData(ProtocolConstants.USER_IDENTIFIER, targetUserId);
forwardMsg.addData("content", content);
session.write(forwardMsg);
}
public void disconnect() {
if (session != null) {
session.getCloseFuture().awaitUninterruptibly();
connector.dispose();
}
}
public static void main(String[] args) {
MessageClient client = new MessageClient();
client.connect();
client.bindUser("user123");
}
}Important Considerations
- Ensure matching codec filters between client and server to avoid
java.nio.charset.MalformedInputExceptionerrors. - All transfer objects must implement
Serializablefor object serialization. - Transfer object classes must have identical package names and class names on both client and server to prevent deserialization failures.
- Use
ConcurrentHashMapfor session storage in production environments for thread safety.