Managing Cancellation and Deadlines in Go Concurrent Workflows

In Go, the context package is essential for controlling the lifetime of processes. It carries deadlines, cancellation signals, and request-scoped values across API boundaries. When handling concurrent operations, particularly those involving I/O, it is critical to respect these constraints to prevent resource leaks and ensure system responsiveness.

The following performRequest function simulates fetching data from a remote endpoint. Since network operations are unpredictable, they are ideal candidates for context-based timeouts.

// performRequest simulates an I/O operation that respects context cancellation.
func performRequest(ctx context.Context, endpoint string) ([]byte, error) {
    // Simulated delay or network call logic...
    // If ctx.Done() is triggered, return immediately.
    return data, nil
}

When multiple endpoints must be queried within a specific time window, errgroup provides a synchronization mechanism that propagates cancellation. If any single request exceeds the allocated time or fails, the entire group is cancelled.

// aggregateRequests fetches data from all endpoints concurrently.
func aggregateRequests(ctx context.Context, endpoints []string) ([][]byte, error) {
    buffer := make([][]byte, len(endpoints))
    
    group, ctx := errgroup.WithContext(ctx)
    
    for i := range endpoints {
        idx := i
        group.Go(func() error {
            payload, err := performRequest(ctx, endpoints[idx])
            if err != nil {
                return err
            }
            buffer[idx] = payload
            return nil
        })
    }
    
    if err := group.Wait(); err != nil {
        return nil, err
    }
    
    return buffer, nil
}

Once data is acquired, we often need to apply business logic, such as validation or filtering. These functions are typically CPU-bound and may not inherently involve I/O or context propagation.

// matchesRule checks if the data satisfies a specific condition.
func matchesRule(data []byte, rule func([]byte) bool) bool {
    return rule(data)
}

While matchesRule is pure logic, running it concurrently on a large dataset might still exceed the parent context's deadline. A naive concurrent implementation would simply run the logic without checking if the operation should still proceed.

To ensure the filtering process respects the deadline, we must explicitly check ctx.Err() before or during the computation. The following implementation of applyFilterParallel integrates this check.

// applyFilterParallel processes data concurrently while respecting context cancellation.
func applyFilterParallel(ctx context.Context, items [][]byte, rule func([]byte) bool) ([][]byte, error) {
    type outcome struct {
        payload []byte
        keep    bool
    }

    outcomes := make([]outcome, len(items))
    
    group, ctx := errgroup.WithContext(ctx)
    
    for i := range items {
        idx := i
        group.Go(func() error {
            // Explicitly check if the context is already cancelled before starting work
            if err := ctx.Err(); err != nil {
                return err
            }
            
            isValid := matchesRule(items[idx], rule)
            outcomes[idx] = outcome{payload: items[idx], keep: isValid}
            return nil
        })
    }
    
    if err := group.Wait(); err != nil {
        return nil, err
    }
    
    var filtered [][]byte
    for _, res := range outcomes {
        if res.keep {
            filtered = append(filtered, res.payload)
        }
    }
    
    return filtered, nil
}

By wrapping the CPU-bound logic in a function that checks the context, we ensure that even if the underlying logic ignores the context, the wrapper respects the parent's cancellation signal.

The final pipeline combines the fetching and filtering stages. If the fetching stage fails due to a timeout, the filtering stage is skipped. If fetching succeeds but the deadline expires during filtering, applyFilterParallel will catch it.

// executePipeline orchestrates the workflow of fetching and processing data.
func executePipeline(ctx context.Context, endpoints []string) ([][]byte, error) {
    rawData, err := aggregateRequests(ctx, endpoints)
    if err != nil {
        return nil, err
    }
    
    finalData, err := applyFilterParallel(ctx, rawData, validationRule)
    return finalData, err
}

Verifying this behavior requires a test that forces a timeout. The following test ensures that the filtering logic correctly propagates a cancellation error immediately.

func TestTimeoutHandling(t *testing.T) {
    ctx, cancel := context.WithTimeout(context.Background(), 1*time.Nanosecond)
    defer cancel()
    
    dummyData := generateTestPayload()
    
    _, err := applyFilterParallel(ctx, dummyData, validationRule)
    if err == nil {
        t.Errorf("expected timeout error, got nil")
    }
}

Tags: Go Concurrency context errgroup

Posted on Mon, 01 Jun 2026 17:52:15 +0000 by McChicken