Loop with Select
A frequent idiom in Go for managing concurrent workflows involves an infinite loop combined with a select statement. This pattern enables a goroutine to monitor multiple channels simulteneously and react to whichever becomes ready first.
for {
select {
case msg := <-inputCh:
// Handle incoming message
case outputCh <- processedData:
// Send data if receiver is ready
case <-ctx.Done():
// Exit on cancellation
return
default:
// Optional non-blocking fallback
}
}
Key characteristics:
- Multiple channel operations are evaluated concurently.
- The
defaultcase prevents blocking when no channels are ready. - Context cancellation or sentinel values often trigger loop termination.
Timeout Handling with Select
To avoid indefinite blocking, a timeout can be integrated into a select using time.After. This ensures operations respect time boundaries.
select {
case result := <-workCh:
fmt.Println("Got:", result)
case <-time.After(1 * time.Second):
fmt.Println("Timed out")
}
time.After(d) returns a channel that emits the current time after duration d. If the primary channel doesn't deliver within that window, the timeout case executes.
Pipeline Architecture
Go pipelines decompose data processing into sequential stages connected by channels. Each stage runs in its own goroutine, enabling parallelism and modularity.
func source(vals []int) <-chan int {
ch := make(chan int)
go func() {
defer close(ch)
for _, v := range vals {
ch <- v
}
}()
return ch
}
func transform(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for v := range in {
out <- v * v
}
}()
return out
}
func sink(in <-chan int) {
for v := range in {
fmt.Println(v)
}
}
// Usage
sink(transform(source([]int{1, 2, 3, 4})))
Benefits include natural backpressure, composability, and clear separasion of concerns across stages.
Fan-Out and Fan-In
Fan-out distributes work across multiple workers; fan-in aggregates their results into a single stream.
type Job struct{ ID int }
type Result struct{ Value string }
func process(jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
results <- Result{Value: fmt.Sprintf("processed-%d", job.ID)}
}
}
func orchestrate(tasks []Job, workerCount int) []Result {
jobs := make(chan Job, len(tasks))
results := make(chan Result, len(tasks))
var wg sync.WaitGroup
for i := 0; i < workerCount; i++ {
wg.Add(1)
go process(jobs, results, &wg)
}
for _, t := range tasks {
jobs <- t
}
close(jobs)
wg.Wait()
close(results)
var outcomes []Result
for r := range results {
outcomes = append(outcomes, r)
}
return outcomes
}
This pattern scales workload processing horizontally while consolidating outputs efficiently.
Futures via Channels
A future represents a value that will be available later. In Go, this is naturally modeled with a channel that eventually receives a result.
func asyncCompute() <-chan string {
done := make(chan string)
go func() {
defer close(done)
time.Sleep(500 * time.Millisecond)
done <- "completed"
}()
return done
}
func main() {
future := asyncCompute()
fmt.Println("Working...")
result := <-future
fmt.Println(result)
}
The calling code proceeds immediately after launching the goroutine and only blocks when explicitly reading from the future channel. This decouples initiation from consumption, enhancing responsiveness.