Building a Message Queue with Redis Streams in Go

This article demonstrates how to implement a message queue using Redis Streams in Go. We will examine the core architecture, focusing on the Redis client configuraton and connection management logic.

The project is strcutured into several key components:

  • redis: A package for configuring and connecting to Redis, including connection pooling.
  • producer: Logic for sending messages to the queue.
  • consumer: Logic for receiving and processing messages.
  • dead_letter: A mechanism for handling failed message delivery.
  • config: Configuration files for the application.

Redis Client Configuration

The foundation of our message queue is a robust Redis client. The following code defines the configuratoin and connection logic.

package redis

import (
	"fmt"
	"time"

	"github.com/gomodule/redigo/redis"
)

// Default configuration constants for the Redis connection pool.
const (
	defaultIdleTimeout = 10 * time.Second
	defaultMaxActive   = 100
	defaultMaxIdle     = 20
)

// RedisConfig holds the parameters for connecting to a Redis server.
type RedisConfig struct {
	Network        string
	Address        string
	Password       string
	MaxIdle        int
	IdleTimeout    time.Duration
	MaxActive      int
	Wait           bool
}

// applyDefaults ensures all configuration fields have valid values.
func applyDefaults(cfg *RedisConfig) {
	if cfg.MaxIdle <= 0 {
		cfg.MaxIdle = defaultMaxIdle
	}
	if cfg.IdleTimeout <= 0 {
		cfg.IdleTimeout = defaultIdleTimeout
	}
	if cfg.MaxActive <= 0 {
		cfg.MaxActive = defaultMaxActive
	}
}

// NewRedisClient creates and returns a new Redis client with a configured connection pool.
func NewRedisClient(network, address, password string, opts ...func(*RedisConfig)) *Client {
	config := &RedisConfig{
		Network:  network,
		Address:  address,
		Password: password,
		// Set some initial defaults
		MaxIdle:     defaultMaxIdle,
		IdleTimeout: defaultIdleTimeout,
		MaxActive:   defaultMaxActive,
	}

	// Apply any custom options
	for _, opt := range opts {
		opt(config)
	}

	// Ensure all values are valid
	applyDefaults(config)

	// Create the connection pool
	pool := createConnectionPool(config)

	return &Client{
		pool: pool,
	}
}

// createConnectionPool initializes and returns a Redis connection pool.
func createConnectionPool(config *RedisConfig) *redis.Pool {
	return &redis.Pool{
		MaxIdle:     config.MaxIdle,
		IdleTimeout: config.IdleTimeout,
		MaxActive:   config.MaxActive,
		Wait:        config.Wait,
		Dial: func() (redis.Conn, error) {
			conn, err := redis.Dial(config.Network, config.Address)
			if err != nil {
				return nil, fmt.Errorf("failed to dial redis: %w", err)
			}
			// Authenticate if a password is provided
			if config.Password != "" {
				if _, err := conn.Do("AUTH", config.Password); err != nil {
					conn.Close()
					return nil, fmt.Errorf("authentication failed: %w", err)
				}
			}
			return conn, nil
		},
	}
}

Tags: Redis Go MessageQueue Streams

Posted on Wed, 27 May 2026 17:24:19 +0000 by UVL