This article demonstrates how to implement a message queue using Redis Streams in Go. We will examine the core architecture, focusing on the Redis client configuraton and connection management logic.
The project is strcutured into several key components:
redis: A package for configuring and connecting to Redis, including connection pooling.producer: Logic for sending messages to the queue.consumer: Logic for receiving and processing messages.dead_letter: A mechanism for handling failed message delivery.config: Configuration files for the application.
Redis Client Configuration
The foundation of our message queue is a robust Redis client. The following code defines the configuratoin and connection logic.
package redis
import (
"fmt"
"time"
"github.com/gomodule/redigo/redis"
)
// Default configuration constants for the Redis connection pool.
const (
defaultIdleTimeout = 10 * time.Second
defaultMaxActive = 100
defaultMaxIdle = 20
)
// RedisConfig holds the parameters for connecting to a Redis server.
type RedisConfig struct {
Network string
Address string
Password string
MaxIdle int
IdleTimeout time.Duration
MaxActive int
Wait bool
}
// applyDefaults ensures all configuration fields have valid values.
func applyDefaults(cfg *RedisConfig) {
if cfg.MaxIdle <= 0 {
cfg.MaxIdle = defaultMaxIdle
}
if cfg.IdleTimeout <= 0 {
cfg.IdleTimeout = defaultIdleTimeout
}
if cfg.MaxActive <= 0 {
cfg.MaxActive = defaultMaxActive
}
}
// NewRedisClient creates and returns a new Redis client with a configured connection pool.
func NewRedisClient(network, address, password string, opts ...func(*RedisConfig)) *Client {
config := &RedisConfig{
Network: network,
Address: address,
Password: password,
// Set some initial defaults
MaxIdle: defaultMaxIdle,
IdleTimeout: defaultIdleTimeout,
MaxActive: defaultMaxActive,
}
// Apply any custom options
for _, opt := range opts {
opt(config)
}
// Ensure all values are valid
applyDefaults(config)
// Create the connection pool
pool := createConnectionPool(config)
return &Client{
pool: pool,
}
}
// createConnectionPool initializes and returns a Redis connection pool.
func createConnectionPool(config *RedisConfig) *redis.Pool {
return &redis.Pool{
MaxIdle: config.MaxIdle,
IdleTimeout: config.IdleTimeout,
MaxActive: config.MaxActive,
Wait: config.Wait,
Dial: func() (redis.Conn, error) {
conn, err := redis.Dial(config.Network, config.Address)
if err != nil {
return nil, fmt.Errorf("failed to dial redis: %w", err)
}
// Authenticate if a password is provided
if config.Password != "" {
if _, err := conn.Do("AUTH", config.Password); err != nil {
conn.Close()
return nil, fmt.Errorf("authentication failed: %w", err)
}
}
return conn, nil
},
}
}