Go excels at building asynchronous systems through lightweight goroutines and channels, offering simpler constructs than many languages' async/await models. Below are practical patterns for implementing task queues.
Direct Asynchronous Execution
For trivial cases, firing off a task asynchronously requires no queue:
go handleTask(item)
This suffices for fire-and-forget operations like sending an email within an HTTP handler. Complexity demands structured queuing to control concurrency and backpressure.
Basic Single-Worker Queue
A minimal design uses a buffered channel and a looping worker:
func consumeTasks(tasks <-chan Task) {
for t := range tasks {
handleTask(t)
}
}
// Buffered channel holds up to 100 pending tasks
taskStream := make(chan Task, 100)
// Launch worker goroutine
go consumeTasks(taskStream)
// Enqueue a task
taskStream <- newTask
Safety and race-free behavior come without explicit locks. The buffer size naturally throttles producers when full.
Producer Throttling via Backpressure
When the channel reaches its capacity, a send blocks:
taskStream <- newTask // blocks if 100 tasks already queued
Blocking calllers can enforce service-level agreements, preventing unbounded memory growth. For example, an HTTP endpoint may respond with 503 Service Unavailable on blockage, signaling clients to retry later.
Non-Blocking Enqueue Attempt
To attempt insertion without waiting, use a select with a default branch:
func attemptInsert(t Task, queue chan<- Task) bool {
select {
case queue <- t:
return true
default:
return false
}
}
Usage in an HTTP handler:
if !attemptInsert(reqTask, taskStream) {
http.Error(w, "capacity exceeded", http.StatusServiceUnavailable)
return
}
Graceful Worker Shutdown
Closing the channel stops iteration over it:
close(taskStream)
Because range on a channel exits when closed, all queued items are processed before termination.
Waiting for Completion
Channel closure doesn't block until the worker finishes. Use sync.WaitGroup:
var wg sync.WaitGroup
func consumeTasks(tasks <-chan Task) {
defer wg.Done()
for t := range tasks {
handleTask(t)
}
}
wg.Add(1)
go consumeTasks(taskStream)
close(taskStream)
wg.Wait()
Increment the counter before launching the goroutine; defer wg.Done() ensures it decrements on exit.
Timed Wait for Completion
To avoid indefinite blocking, wrap the wait:
func waitWithTimeout(wg *sync.WaitGroup, limit time.Duration) bool {
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
select {
case <-done:
return true
case <-time.After(limit):
return false
}
}
waitWithTimeout(&wg, 5*time.Second)
Immediate Cancelllation Without Draining
Using context.Context, a worker can abort promptly:
ctx, stop := context.WithCancel(context.Background())
go worker(ctx, taskStream)
func worker(ctx context.Context, tasks <-chan Task) {
for {
select {
case <-ctx.Done():
return
case t := <-tasks:
handleTask(t)
}
}
}
stop() // sends cancellation signal
Cancellation doesn't await task completion. Combine with a wait group if synchronization is needed post-cancel.
Care is required: if tasks remain in the channel and cencellation happens, the worker may continue processing unless explicitly checked. One approach checks context error after handling a task:
case t := <-tasks:
handleTask(t)
if ctx.Err() != nil {
return
}
Alternatively, set an atomic flag before calling stop() and check it after task execution.
Cancel Channel Instead of Context
A plain channel can mimic cancellation:
cancelSig := make(chan struct{})
go worker(taskStream, cancelSig)
func worker(tasks <-chan Task, stop <-chan struct{}) {
for {
select {
case <-stop:
return
case t := <-tasks:
handleTask(t)
}
}
}
close(cancelSig)
Same caveat applies: task processing continues until channel is drained unless additional guards are added.
Multiple Workers for Parallelism
Run several consumers on one shared channel:
for i := 0; i < workerCount; i++ {
go consumeTasks(taskStream)
}
Each worker competes for tasks; exactly one receives each item. To wait for all workers:
for i := 0; i < workerCount; i++ {
wg.Add(1)
go consumeTasks(taskStream)
}
close(taskStream)
wg.Wait()
Broadcast cancellation to all workers via a shared cancel channel:
cancelSig := make(chan struct{})
for i := 0; i < workerCount; i++ {
go worker(taskStream, cancelSig)
}
close(cancelSig)