Implementing Task Queues in Go Using Goroutines and Channels

Go excels at building asynchronous systems through lightweight goroutines and channels, offering simpler constructs than many languages' async/await models. Below are practical patterns for implementing task queues.

Direct Asynchronous Execution

For trivial cases, firing off a task asynchronously requires no queue:

go handleTask(item)

This suffices for fire-and-forget operations like sending an email within an HTTP handler. Complexity demands structured queuing to control concurrency and backpressure.

Basic Single-Worker Queue

A minimal design uses a buffered channel and a looping worker:

func consumeTasks(tasks <-chan Task) {
    for t := range tasks {
        handleTask(t)
    }
}

// Buffered channel holds up to 100 pending tasks
taskStream := make(chan Task, 100)

// Launch worker goroutine
go consumeTasks(taskStream)

// Enqueue a task
taskStream <- newTask

Safety and race-free behavior come without explicit locks. The buffer size naturally throttles producers when full.

Producer Throttling via Backpressure

When the channel reaches its capacity, a send blocks:

taskStream <- newTask // blocks if 100 tasks already queued

Blocking calllers can enforce service-level agreements, preventing unbounded memory growth. For example, an HTTP endpoint may respond with 503 Service Unavailable on blockage, signaling clients to retry later.

Non-Blocking Enqueue Attempt

To attempt insertion without waiting, use a select with a default branch:

func attemptInsert(t Task, queue chan<- Task) bool {
    select {
    case queue <- t:
        return true
    default:
        return false
    }
}

Usage in an HTTP handler:

if !attemptInsert(reqTask, taskStream) {
    http.Error(w, "capacity exceeded", http.StatusServiceUnavailable)
    return
}

Graceful Worker Shutdown

Closing the channel stops iteration over it:

close(taskStream)

Because range on a channel exits when closed, all queued items are processed before termination.

Waiting for Completion

Channel closure doesn't block until the worker finishes. Use sync.WaitGroup:

var wg sync.WaitGroup

func consumeTasks(tasks <-chan Task) {
    defer wg.Done()
    for t := range tasks {
        handleTask(t)
    }
}

wg.Add(1)
go consumeTasks(taskStream)

close(taskStream)
wg.Wait()

Increment the counter before launching the goroutine; defer wg.Done() ensures it decrements on exit.

Timed Wait for Completion

To avoid indefinite blocking, wrap the wait:

func waitWithTimeout(wg *sync.WaitGroup, limit time.Duration) bool {
    done := make(chan struct{})
    go func() {
        wg.Wait()
        close(done)
    }()
    select {
    case <-done:
        return true
    case <-time.After(limit):
        return false
    }
}

waitWithTimeout(&wg, 5*time.Second)

Immediate Cancelllation Without Draining

Using context.Context, a worker can abort promptly:

ctx, stop := context.WithCancel(context.Background())
go worker(ctx, taskStream)

func worker(ctx context.Context, tasks <-chan Task) {
    for {
        select {
        case <-ctx.Done():
            return
        case t := <-tasks:
            handleTask(t)
        }
    }
}

stop() // sends cancellation signal

Cancellation doesn't await task completion. Combine with a wait group if synchronization is needed post-cancel.

Care is required: if tasks remain in the channel and cencellation happens, the worker may continue processing unless explicitly checked. One approach checks context error after handling a task:

case t := <-tasks:
    handleTask(t)
    if ctx.Err() != nil {
        return
    }

Alternatively, set an atomic flag before calling stop() and check it after task execution.

Cancel Channel Instead of Context

A plain channel can mimic cancellation:

cancelSig := make(chan struct{})
go worker(taskStream, cancelSig)

func worker(tasks <-chan Task, stop <-chan struct{}) {
    for {
        select {
        case <-stop:
            return
        case t := <-tasks:
            handleTask(t)
        }
    }
}

close(cancelSig)

Same caveat applies: task processing continues until channel is drained unless additional guards are added.

Multiple Workers for Parallelism

Run several consumers on one shared channel:

for i := 0; i < workerCount; i++ {
    go consumeTasks(taskStream)
}

Each worker competes for tasks; exactly one receives each item. To wait for all workers:

for i := 0; i < workerCount; i++ {
    wg.Add(1)
    go consumeTasks(taskStream)
}
close(taskStream)
wg.Wait()

Broadcast cancellation to all workers via a shared cancel channel:

cancelSig := make(chan struct{})
for i := 0; i < workerCount; i++ {
    go worker(taskStream, cancelSig)
}
close(cancelSig)

Tags: Go Task Queue goroutine channel Concurrency

Posted on Sat, 04 Jul 2026 17:32:35 +0000 by Lee