Building a Real-Time Messaging System with Spring and Apache Mina

Server Implementation

Apache Mina provides a robust framework for building network applications. When combined with Spring's dependency injection, it creates a flexible architecture for real-time message delivery and session management.

Data Transfer Objects

The communication layer requires structured message objects for serialization. The incoming message object captures client requests:

package com.messaging.model;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;

public class IncomingMessage implements Serializable {
    
    private static final long serialVersionUID = 1L;
    
    private String action;
    private Map payload;
    private long createdAt;
    
    public IncomingMessage() {
        this.payload = new HashMap<>();
        this.createdAt = System.currentTimeMillis();
    }
    
    public String getAction() {
        return action;
    }
    
    public void setAction(String action) {
        this.action = action;
    }
    
    public void addData(String key, String value) {
        this.payload.put(key, value);
    }
    
    public String getData(String key) {
        return this.payload.get(key);
    }
    
    public void removeData(String key) {
        this.payload.remove(key);
    }
    
    public Map getPayload() {
        return payload;
    }
    
    public long getCreatedAt() {
        return createdAt;
    }
    
    public void setCreatedAt(long createdAt) {
        this.createdAt = createdAt;
    }
}

The response object formats server replies:

package com.messaging.model;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;

public class OutgoingMessage implements Serializable {
    
    private static final long serialVersionUID = 1L;
    
    private String action;
    private String status;
    private String description;
    private Map payload;
    private long timestamp;
    
    public OutgoingMessage() {
        this.payload = new HashMap<>();
        this.timestamp = System.currentTimeMillis();
    }
    
    public String getAction() {
        return action;
    }
    
    public void setAction(String action) {
        this.action = action;
    }
    
    public String getStatus() {
        return status;
    }
    
    public void setStatus(String status) {
        this.status = status;
    }
    
    public String getDescription() {
        return description;
    }
    
    public void setDescription(String description) {
        this.description = description;
    }
    
    public void addData(String key, String value) {
        this.payload.put(key, value);
    }
    
    public String getData(String key) {
        return this.payload.get(key);
    }
    
    public Map getPayload() {
        return payload;
    }
    
    public long getTimestamp() {
        return timestamp;
    }
}

Protocol Constants

Define message types and response codes in a centralized constants class:

package com.messaging.model;

public class ProtocolConstants {
    
    public static final String USER_IDENTIFIER = "userId";
    
    public static final String HEARTBEAT_REQ = "ping";
    public static final String HEARTBEAT_RESP = "pong";
    
    public static class StatusCode {
        public static final String SUCCESS = "200";
        public static final String NOT_FOUND = "404";
        public static final String UNAUTHORIZED = "403";
        public static final String UNKNOWN_ACTION = "405";
        public static final String SERVER_ERROR = "500";
    }
    
    public static class EventType {
        public static final String FORCE_DISCONNECT = "force_disconnect";
    }
}

Heartbeat Mechanism

Implement connection health monitoring through a keep-alive factory:

package com.messaging.heartbeat;

import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.keepalive.KeepAliveMessageFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.messaging.model.ProtocolConstants;

public class HeartbeatFactory implements KeepAliveMessageFactory {
    
    private static final Logger LOG = LoggerFactory.getLogger(HeartbeatFactory.class);
    private static final String PING = ProtocolConstants.HEARTBEAT_REQ;
    private static final String PONG = ProtocolConstants.HEARTBEAT_RESP;
    
    @Override
    public Object getRequest(IoSession session) {
        LOG.debug("Sending heartbeat request");
        return PING;
    }
    
    @Override
    public Object getResponse(IoSession session, Object request) {
        LOG.debug("Responding to heartbeat: {}", request);
        return PONG;
    }
    
    @Override
    public boolean isRequest(IoSession session, Object message) {
        return PING.equals(message);
    }
    
    @Override
    public boolean isResponse(IoSession session, Object message) {
        return PONG.equals(message);
    }
}

Server Bootstrap Configuration

The server initializes the NIO socket acceptor with filters and handlers:

package com.messaging.server;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Map;
import org.apache.mina.core.service.IoAcceptor;
import org.apache.mina.core.service.IoHandler;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory;
import org.apache.mina.filter.executor.ExecutorFilter;
import org.apache.mina.filter.keepalive.KeepAliveFilter;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
import com.messaging.heartbeat.HeartbeatFactory;

public class MessageServer {
    
    private IoAcceptor acceptor;
    private IoHandler messageHandler;
    private int serverPort;
    
    public void start() throws IOException {
        acceptor = new NioSocketAcceptor();
        acceptor.getSessionConfig().setReadBufferSize(2048);
        acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 60);
        
        acceptor.getFilterChain().addLast("executor", new ExecutorFilter());
        acceptor.getFilterChain().addLast("logger", new LoggingFilter());
        acceptor.getFilterChain().addLast("codec", 
            new ProtocolCodecFilter(new ObjectSerializationCodecFactory()));
        
        KeepAliveFilter heartbeatFilter = new KeepAliveFilter(
            new HeartbeatFactory(), IdleStatus.BOTH_IDLE);
        heartbeatFilter.setForwardEvent(true);
        heartbeatFilter.setRequestInterval(30);
        acceptor.getFilterChain().addLast("heartbeat", heartbeatFilter);
        
        acceptor.setHandler(messageHandler);
        acceptor.bind(new InetSocketAddress(serverPort));
        
        System.out.println("Message server started on port: " + serverPort);
    }
    
    public void stop() {
        if (acceptor != null) {
            acceptor.unbind();
            System.out.println("Message server stopped");
        }
    }
    
    public Map getActiveSessions() {
        return acceptor.getManagedSessions();
    }
    
    public void setMessageHandler(IoHandler messageHandler) {
        this.messageHandler = messageHandler;
    }
    
    public void setServerPort(int serverPort) {
        this.serverPort = serverPort;
    }
}

Session Management

Wrap IoSession to track user connections:

package com.messaging.session;

import java.io.Serializable;
import java.net.InetAddress;
import java.net.SocketAddress;
import org.apache.mina.core.session.IoSession;

public class UserSession implements Serializable {
    
    private static final long serialVersionUID = 1L;
    
    private transient IoSession ioSession;
    private String sessionId;
    private String serverHost;
    private String userId;
    private long connectedAt;
    private long lastHeartbeat;
    
    public UserSession() {}
    
    public UserSession(IoSession ioSession) {
        this.ioSession = ioSession;
        this.sessionId = String.valueOf(ioSession.getId());
    }
    
    public void setAttribute(String key, Object value) {
        if (ioSession != null) {
            ioSession.setAttribute(key, value);
        }
    }
    
    public Object getAttribute(String key) {
        return ioSession != null ? ioSession.getAttribute(key) : null;
    }
    
    public void removeAttribute(String key) {
        if (ioSession != null) {
            ioSession.removeAttribute(key);
        }
    }
    
    public boolean hasAttribute(String key) {
        return ioSession != null && ioSession.containsAttribute(key);
    }
    
    public void sendMessage(Object message) {
        if (ioSession != null && ioSession.isConnected()) {
            ioSession.write(message);
        }
    }
    
    public void disconnect() {
        if (ioSession != null) {
            ioSession.closeNow();
        }
    }
    
    public boolean isConnected() {
        return ioSession != null && ioSession.isConnected();
    }
    
    public SocketAddress getRemoteAddress() {
        return ioSession != null ? ioSession.getRemoteAddress() : null;
    }
    
    public boolean isLocalSession() {
        try {
            String localIp = InetAddress.getLocalHost().getHostAddress();
            return localIp.equals(serverHost);
        } catch (Exception e) {
            return false;
        }
    }
    
    // Getters and setters omitted for brevity
}

Create a session manager interface:

package com.messaging.session;

public interface SessionRegistry {
    void register(String userId, UserSession session);
    UserSession find(String userId);
    void remove(UserSession session);
    void remove(String userId);
}

Implement with thread-safe storage:

package com.messaging.session;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

public class DefaultSessionRegistry implements SessionRegistry {
    
    private final ConcurrentHashMap sessions = new ConcurrentHashMap<>();
    private final AtomicInteger connectionCount = new AtomicInteger(0);
    
    @Override
    public void register(String userId, UserSession session) {
        if (session != null) {
            sessions.put(userId, session);
            connectionCount.incrementAndGet();
        }
    }
    
    @Override
    public UserSession find(String userId) {
        return sessions.get(userId);
    }
    
    @Override
    public void remove(UserSession session) {
        sessions.remove(session.getUserId());
    }
    
    @Override
    public void remove(String userId) {
        sessions.remove(userId);
    }
}

Request Handler Interface

Define a strategy pattern for handling different message types:

package com.messaging.handler;

import com.messaging.model.IncomingMessage;
import com.messaging.model.OutgoingMessage;
import com.messaging.session.UserSession;

public interface MessageProcessor {
    OutgoingMessage handle(UserSession session, IncomingMessage request);
}

Main Handler Implementation

Route incoming messages to appropriate processors:

package com.messaging.handler;

import java.util.HashMap;
import java.util.Map;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IoSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import com.messaging.model.IncomingMessage;
import com.messaging.model.OutgoingMessage;
import com.messaging.model.ProtocolConstants;
import com.messaging.session.UserSession;

@Component
public class DispatchHandler extends IoHandlerAdapter {
    
    private static final Logger LOG = LoggerFactory.getLogger(DispatchHandler.class);
    private Map processors = new HashMap<>();
    
    @Override
    public void sessionCreated(IoSession session) throws Exception {
        LOG.info("New connection from: {}", session.getRemoteAddress());
    }
    
    @Override
    public void messageReceived(IoSession ioSession, Object message) throws Exception {
        if (message instanceof IncomingMessage) {
            IncomingMessage request = (IncomingMessage) message;
            UserSession userSession = new UserSession(ioSession);
            String action = request.getAction();
            
            OutgoingMessage response;
            MessageProcessor processor = processors.get(action);
            
            if (processor == null) {
                response = new OutgoingMessage();
                response.setStatus(ProtocolConstants.StatusCode.UNKNOWN_ACTION);
                response.setDescription("Unknown action: " + action);
            } else {
                response = processor.handle(userSession, request);
            }
            
            if (response != null) {
                response.setAction(action);
                userSession.sendMessage(response);
            }
        }
    }
    
    @Override
    public void sessionClosed(IoSession ioSession) throws Exception {
        UserSession userSession = new UserSession(ioSession);
        MessageProcessor closeProcessor = processors.get("disconnect");
        if (closeProcessor != null && userSession.hasAttribute(ProtocolConstants.USER_IDENTIFIER)) {
            closeProcessor.handle(userSession, null);
        }
        userSession.disconnect();
        LOG.info("Session closed: {}", ioSession.getRemoteAddress());
    }
    
    @Override
    public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
        LOG.error("Session error: {}", session.getRemoteAddress(), cause);
    }
    
    public void setProcessors(Map processors) {
        this.processors = processors;
    }
}

Business Logic Processors

Handle user binding when connecting:

package com.messaging.handler;

import java.net.InetAddress;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.messaging.model.IncomingMessage;
import com.messaging.model.OutgoingMessage;
import com.messaging.model.ProtocolConstants;
import com.messaging.session.DefaultSessionRegistry;
import com.messaging.session.UserSession;
import com.messaging.util.SpringContext;

public class BindProcessor implements MessageProcessor {
    
    private static final Logger LOG = LoggerFactory.getLogger(BindProcessor.class);
    
    @Override
    public OutgoingMessage handle(UserSession newSession, IncomingMessage request) {
        OutgoingMessage response = new OutgoingMessage();
        DefaultSessionRegistry registry = SpringContext.getBean(DefaultSessionRegistry.class);
        
        try {
            String userId = request.getData(ProtocolConstants.USER_IDENTIFIER);
            newSession.setUserId(userId);
            newSession.setSessionId(UUID.randomUUID().toString());
            newSession.setServerHost(InetAddress.getLocalHost().getHostAddress());
            newSession.setConnectedAt(System.currentTimeMillis());
            newSession.setLastHeartbeat(System.currentTimeMillis());
            
            UserSession existingSession = registry.find(userId);
            
            if (existingSession != null && !existingSession.equals(newSession)) {
                OutgoingMessage disconnectNotice = new OutgoingMessage();
                disconnectNotice.setStatus(ProtocolConstants.EventType.FORCE_DISCONNECT);
                disconnectNotice.addData(ProtocolConstants.USER_IDENTIFIER, userId);
                
                if (existingSession.isLocalSession()) {
                    existingSession.sendMessage(disconnectNotice);
                    existingSession.disconnect();
                }
            }
            
            registry.register(userId, newSession);
            response.setStatus(ProtocolConstants.StatusCode.SUCCESS);
            LOG.info("User bound: {}", userId);
            
        } catch (Exception e) {
            response.setStatus(ProtocolConstants.StatusCode.SERVER_ERROR);
            LOG.error("Bind error", e);
        }
        
        return response;
    }
}

Handle message forwarding between clients:

package com.messaging.handler;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.messaging.model.IncomingMessage;
import com.messaging.model.OutgoingMessage;
import com.messaging.model.ProtocolConstants;
import com.messaging.session.DefaultSessionRegistry;
import com.messaging.session.UserSession;
import com.messaging.util.SpringContext;

public class ForwardProcessor implements MessageProcessor {
    
    private static final Logger LOG = LoggerFactory.getLogger(ForwardProcessor.class);
    
    @Override
    public OutgoingMessage handle(UserSession sender, IncomingMessage request) {
        OutgoingMessage response = new OutgoingMessage();
        DefaultSessionRegistry registry = SpringContext.getBean(DefaultSessionRegistry.class);
        
        String targetUserId = request.getData(ProtocolConstants.USER_IDENTIFIER);
        UserSession targetSession = registry.find(targetUserId);
        
        if (targetSession != null && targetSession.isConnected()) {
            OutgoingMessage forwardMessage = new OutgoingMessage();
            forwardMessage.setAction(request.getAction());
            forwardMessage.setStatus(ProtocolConstants.StatusCode.SUCCESS);
            forwardMessage.setDescription("Forwarded message");
            forwardMessage.getPayload().putAll(request.getPayload());
            forwardMessage.getPayload().remove(ProtocolConstants.USER_IDENTIFIER);
            
            targetSession.sendMessage(forwardMessage);
            response.setStatus(ProtocolConstants.StatusCode.SUCCESS);
            LOG.info("Message forwarded to: {}", targetUserId);
        } else {
            response.setStatus(ProtocolConstants.StatusCode.UNAUTHORIZED);
            response.setDescription("Target user not connected");
        }
        
        return response;
    }
}

Handle session cleanup on disconnect:

package com.messaging.handler;

import com.messaging.model.IncomingMessage;
import com.messaging.model.OutgoingMessage;
import com.messaging.model.ProtocolConstants;
import com.messaging.session.DefaultSessionRegistry;
import com.messaging.session.UserSession;
import com.messaging.util.SpringContext;

public class DisconnectProcessor implements MessageProcessor {
    
    @Override
    public OutgoingMessage handle(UserSession session, IncomingMessage request) {
        DefaultSessionRegistry registry = SpringContext.getBean(DefaultSessionRegistry.class);
        
        if (session.getAttribute(ProtocolConstants.USER_IDENTIFIER) != null) {
            String userId = session.getAttribute(ProtocolConstants.USER_IDENTIFIER).toString();
            registry.remove(userId);
        }
        return null;
    }
}

Spring Configuration

Wire components together using XML configuration:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd">

    <bean class="org.springframework.beans.factory.config.CustomEditorConfigurer">
        <property name="customEditors">
            <map>
                <entry key="java.net.SocketAddress"
                       value="org.apache.mina.integration.beans.InetSocketAddressEditor"/>
            </map>
        </property>
    </bean>

    <bean id="dispatchHandler" class="com.messaging.handler.DispatchHandler">
        <property name="processors">
            <map>
                <entry key="bind">
                    <bean class="com.messaging.handler.BindProcessor"/>
                </entry>
                <entry key="forward">
                    <bean class="com.messaging.handler.ForwardProcessor"/>
                </entry>
                <entry key="disconnect">
                    <bean class="com.messaging.handler.DisconnectProcessor"/>
                </entry>
            </map>
        </property>
    </bean>

    <bean id="messageServer" class="com.messaging.server.MessageServer"
          init-method="start" destroy-method="stop">
        <property name="serverPort" value="8888"/>
        <property name="messageHandler" ref="dispatchHandler"/>
    </bean>

    <bean id="sessionRegistry" class="com.messaging.session.DefaultSessionRegistry"/>
    <bean id="springContext" class="com.messaging.util.SpringContext"/>

</beans>

Client Implementation

Create a client handler to process server messages:

package com.messaging.client;

import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IoSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.messaging.model.ProtocolConstants;

public class ClientMessageHandler extends IoHandlerAdapter {
    
    private static final Logger LOG = LoggerFactory.getLogger(ClientMessageHandler.class);
    
    @Override
    public void messageReceived(IoSession session, Object message) throws Exception {
        String msg = message.toString();
        
        if (ProtocolConstants.HEARTBEAT_REQ.equals(msg)) {
            LOG.debug("Heartbeat received");
            session.write(ProtocolConstants.HEARTBEAT_RESP);
        } else {
            System.out.println("Received: " + msg);
        }
    }
    
    @Override
    public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
        LOG.error("Client error", cause);
    }
}

Build the client connection:

package com.messaging.client;

import java.net.InetSocketAddress;
import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory;
import org.apache.mina.transport.socket.nio.NioSocketConnector;
import com.messaging.model.IncomingMessage;
import com.messaging.model.ProtocolConstants;

public class MessageClient {
    
    private static final String HOST = "127.0.0.1";
    private static final int PORT = 8888;
    
    private NioSocketConnector connector;
    private IoSession session;
    
    public void connect() {
        connector = new NioSocketConnector();
        connector.setConnectTimeoutMillis(30000);
        connector.getFilterChain().addLast("codec",
            new ProtocolCodecFilter(new ObjectSerializationCodecFactory()));
        connector.setHandler(new ClientMessageHandler());
        
        ConnectFuture future = connector.connect(new InetSocketAddress(HOST, PORT));
        future.awaitUninterruptibly();
        session = future.getSession();
        
        System.out.println("Connected to server");
    }
    
    public void bindUser(String userId) {
        IncomingMessage bindMsg = new IncomingMessage();
        bindMsg.setAction("bind");
        bindMsg.addData(ProtocolConstants.USER_IDENTIFIER, userId);
        bindMsg.addData("info", "Test user connection");
        session.write(bindMsg);
        System.out.println("Bind request sent for: " + userId);
    }
    
    public void forwardMessage(String targetUserId, String content) {
        IncomingMessage forwardMsg = new IncomingMessage();
        forwardMsg.setAction("forward");
        forwardMsg.addData(ProtocolConstants.USER_IDENTIFIER, targetUserId);
        forwardMsg.addData("content", content);
        session.write(forwardMsg);
    }
    
    public void disconnect() {
        if (session != null) {
            session.getCloseFuture().awaitUninterruptibly();
            connector.dispose();
        }
    }
    
    public static void main(String[] args) {
        MessageClient client = new MessageClient();
        client.connect();
        client.bindUser("user123");
    }
}

Important Considerations

  • Ensure matching codec filters between client and server to avoid java.nio.charset.MalformedInputException errors.
  • All transfer objects must implement Serializable for object serialization.
  • Transfer object classes must have identical package names and class names on both client and server to prevent deserialization failures.
  • Use ConcurrentHashMap for session storage in production environments for thread safety.

Tags: java Apache Mina Spring Framework Real-Time Messaging Network Programming

Posted on Sat, 23 May 2026 18:54:07 +0000 by yarons