Introduction
This article explores key concurrency primitives in Go, including synchronization mechanisms, condition variables, atomic operations, and the context package. We'll examine how these tools help manage concurrent operations and shared resources in Go programs.
Synchronization with the sync Package
The sync package provides fundamental synchronization primitives for managing concurrent access to shared resources. Let's explore the most commonly used types:
Mutex and RWMutex
Mutex (mutual exclusion) ensures that only one goroutine can access a shared resource at a time. RWMutex (read-write mutex) allows multiple readers or a single writer to access a resource.
package main
import (
"fmt"
"sync"
)
var (
accountBalance int
balanceMutex sync.Mutex
)
func deposit(amount int, wg *sync.WaitGroup) {
balanceMutex.Lock()
defer balanceMutex.Unlock()
defer wg.Done()
fmt.Printf("Depositing %d. Current balance: %d\n", amount, accountBalance)
accountBalance += amount
}
func main() {
accountBalance = 1000
var wg sync.WaitGroup
wg.Add(2)
go deposit(200, &wg)
go deposit(150, &wg)
wg.Wait()
fmt.Printf("Final balance: %d\n", accountBalance)
}
WaitGroup
WaitGroup helps synchronize multiple goroutines by waiting for them to complete their execution.
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("Worker %d starting\n", id)
time.Sleep(500 * time.Millisecond)
fmt.Printf("Worker %d completed\n", id)
}
func main() {
var wg sync.WaitGroup
for i := 1; i <= 4; i++ {
wg.Add(1)
go worker(i, &wg)
}
wg.Wait()
fmt.Println("All workers finished")
}
Condition Variables with sync.Cond
Condition variables allow goroutines to wait for specific condisions to be met before proceeding. They work in conjunction with mutexes.
Basic Usage
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var mutex sync.Mutex
condition := sync.NewCond(&mutex)
queue := make([]int, 0, 5)
removeFromQueue := func(delay time.Duration) {
time.Sleep(delay)
mutex.Lock()
queue = queue[1:]
fmt.Println("Item removed from queue")
condition.Signal()
mutex.Unlock()
}
for i := 0; i < 5; i++ {
mutex.Lock()
for len(queue) == 2 {
condition.Wait()
}
fmt.Println("Adding item to queue")
queue = append(queue, i)
go removeFromQueue(1 * time.Second)
mutex.Unlock()
}
}
Producer-Consumer Pattern
Condition variables are particularly useful in producer-consumer scenarios.
package main
import (
"fmt"
"sync"
"time"
)
type Product struct {
id int
}
func producer(buffer *[]Product, lock *sync.Mutex, cond *sync.Cond) {
for i := 0; i < 8; i++ {
lock.Lock()
item := Product{id: i}
*buffer = append(*buffer, item)
fmt.Printf("Produced item %d\n", item.id)
lock.Unlock()
cond.Signal()
time.Sleep(300 * time.Millisecond)
}
}
func consumer(buffer *[]Product, lock *sync.Mutex, cond *sync.Cond) {
for {
lock.Lock()
for len(*buffer) == 0 {
cond.Wait()
}
item := (*buffer)[0]
*buffer = (*buffer)[1:]
fmt.Printf("Consumed item %d\n", item.id)
lock.Unlock()
}
}
func main() {
var mutex sync.Mutex
condition := sync.NewCond(&mutex)
buffer := make([]Product, 0, 10)
go producer(&buffer, &mutex, condition)
go consumer(&buffer, &mutex, condition)
time.Sleep(5 * time.Second)
}
Broadcast Method
The Broadcast() method wakes up all waiting goroutines, unlike Signal() which wakes up just one.
package main
import (
"fmt"
"sync"
"time"
)
func task(id int, cond *sync.Cond) {
cond.L.Lock()
cond.Wait()
fmt.Printf("Task %d started\n", id)
cond.L.Unlock()
}
func main() {
var wg sync.WaitGroup
var mutex sync.Mutex
condition := sync.NewCond(&mutex)
for i := 0; i < 6; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
task(id, condition)
}(i)
}
fmt.Println("All tasks ready...")
time.Sleep(1 * time.Second)
condition.Broadcast()
wg.Wait()
}
Atomic Operations
The sync/atomic package provides low-level atomic memory operations for safe concurrent access to shared variables.
Atomic Counters
package main
import (
"fmt"
"sync"
"sync/atomic"
)
var counter int64
func increment(wg *sync.WaitGroup) {
defer wg.Done()
atomic.AddInt64(&counter, 1)
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go increment(&wg)
}
wg.Wait()
fmt.Printf("Final counter value: %d\n", counter)
}
Compare and Swap
Atomic operations include Compare and Swap (CAS) for conditional updates.
package main
import (
"fmt"
"sync/atomic"
)
func main() {
var value int64 = 42
swapped := atomic.CompareAndSwapInt64(&value, 42, 100)
fmt.Printf("Swap successful: %v, New value: %d\n", swapped, value)
swapped = atomic.CompareAndSwapInt64(&value, 50, 200)
fmt.Printf("Swap successful: %v, Value remains: %d\n", swapped, value)
}
Context Package
The context package provides a way to carry deadlines, cancellation signals, and request-scoped values across API boundaries.
Cancellation with Context
package main
import (
"context"
"fmt"
"time"
)
func longRunningTask(ctx context.Context) {
select {
case <-time.After(2 * time.Second):
fmt.Println("Task completed")
case <-ctx.Done():
fmt.Println("Task cancelled:", ctx.Err())
}
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go longRunningTask(ctx)
time.Sleep(1 * time.Second)
cancel()
time.Sleep(1 * time.Second)
}
Timeout with Context
package main
import (
"context"
"fmt"
"time"
)
func timedOperation(ctx context.Context) {
select {
case <-time.After(1 * time.Second):
fmt.Println("Operation completed")
case <-ctx.Done():
fmt.Println("Operation timed out:", ctx.Err())
}
}
func main() {
ctx, _ := context.WithTimeout(context.Background(), 500 * time.Millisecond)
go timedOperation(ctx)
time.Sleep(1 * time.Second)
}
Passing Values with Context
Context can carry request-scoped values across function calls.
package main
import (
"context"
"fmt"
"net/http"
)
func authMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := context.WithValue(r.Context(), "userID", "user-123")
next.ServeHTTP(w, r.WithContext(ctx))
})
}
func handler(w http.ResponseWriter, r *http.Request) {
userID := r.Context().Value("userID").(string)
fmt.Fprintf(w, "Welcome, %s!", userID)
}
func main() {
http.Handle("/", authMiddleware(http.HandlerFunc(handler)))
http.ListenAndServe(":8080", nil)
}
Context Hierarchy
Contexts can be derived to create hierarchical cancellation relationships.
package main
import (
"context"
"fmt"
"time"
)
func childOperation(ctx context.Context) {
select {
case <-time.After(1 * time.Second):
fmt.Println("Child operation completed")
case <-ctx.Done():
fmt.Println("Child operation cancelled:", ctx.Err())
}
}
func parentOperation(ctx context.Context) {
childCtx, cancel := context.WithTimeout(ctx, 500 * time.Millisecond)
defer cancel()
go childOperation(childCtx)
select {
case <-time.After(1 * time.Second):
fmt.Println("Parent operation completed")
case <-ctx.Done():
fmt.Println("Parent operation cancelled:", ctx.Err())
}
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 300 * time.Millisecond)
defer cancel()
go parentOperation(ctx)
time.Sleep(1 * time.Second)
}
Conclusion
Go's concurrency primitives provide powerful tools for building safe and efficient concurrrent applications. Understanding when to use mutexes, condition variables, atomic operations, or context is crucial for writing robust concurrent code.