Queue: The Foundation
The basic Queue interface defines the fundamental contract for processing items. It supports adding items, retrieving them, and marking them as complete.
The concrete implementation of this interface maintains three internal data structures to ensure processing order and deduplication:
- orderList: A slice that preserves the sequence in which items should be processed.
- pendingSet: A set (implemented as a map with empty struct values) tracking all items that currently need processing. This ensures uniqueness.
- activeSet: A set tracking items that are currently being processed. This prevents the same item from being re-added to the processing list until the current work is finished.
Here is a simplified definition of the queue structure:
type WorkQueue struct {
orderList []interface{}
pendingSet map[interface{}]struct{}
activeSet map[interface{}]struct{}
cond *sync.Cond
isShuttingDown bool
}
type empty struct{}
Core Methods
The logic for managing these sets is encapsulated in the core methods:
Add(item): This method ensures an item is queued for processing.
- If the queue is shutting down, it returns immediately.
- If the item already exists in
pendingSet, it is ignored (deduplication). - The item is added to
pendingSet. - If the item is not currently in
activeSet, it is appended toorderListand the condition varible is signaled to wake up any waiting consumers.
Get(): This method retrieves the next item for processing.
- It waits on the condition variable if
orderListis empty and the queue is not shutting down. - It pops the first item from
orderList. - The item is removed from
pendingSetand inserted intoactiveSet.
Done(item): This method marks an item as finished.
- The item is removed from
activeSet. - If the item still exists in
pendingSet(meaning it was re-added while being processed), it is appended back toorderListto be processed again.
DelayingQueue: Handling Time
The DelayingQueue extends the basic Queue by adding the ability to postpone the addition of an element until a specified duration has passed. This is crucial for implementing retry logic with backoff.
The interface simply adds an AddAfter method. The implementation relies on a min-heap (priority queue) to manage items waiting to be added to the main queue.
type delayedItem struct {
value interface{}
readyAt time.Time
}
type delayQueue struct {
WorkQueue
waitingForAddCh chan *delayedItem
heartbeat clock.Ticker
}
The Waiting Loop
The core logic resides in waitingLoop, a background goroutine started during initialization.
- It maintains a
waitForPriorityQueue(the heap) sorted byreadyAttime. - It listens on several channels: a stop channel, a heartbeat channel (to prevent the loop from stalling), a timer for the next ready item, and the
waitingForAddCh. - When an item is received on
waitingForAddCh, if the duration has not elapsed, it is pushed onto the heap. - If the current time passes the
readyAttime of the earliest item in the heap, that item is popped and added to the underlyingWorkQueue.
The AddAfter method acts as a producer for this loop. If the delay is zero or negative, it adds the item immediately. Otherwise, it creates a delayedItem and pushes it into the waitingForAddCh.
RateLimitingQueue: Controlling Flow
The RateLimitingQueue is the top-level abstraction, extending DelayingQueue. It combines delayed insertion with a rate-limiting strategy to control how often items are re-queued after failures.
The interface adds methods like AddRateLimited and Forget. The behavior is determined by the RateLimiter interface.
type RateLimiter interface {
When(item interface{}) time.Duration
Forget(item interface{})
NumRequeues(item interface{}) int
}
RateLimiter Implementations
There are several strategies implementing the RateLimiter interface:
BucketRateLimiter: Uitlizes the standard golang.org/x/time/rate package. It implements a token bucket algorithm, allowing a steady rate of items with a specific burst capacity.
ItemExponentialFailureRateLimiter: Calculates the wait time as baseDelay * 2^<retries>. This provides an exponential backoff, ensuring that repeatedly failing items are retried less frequently.
ItemFastSlowRateLimiter: Uses a shorter delay for the first few retries (fast) and a longer delay for subsequent ones (slow). This is useful when you expect transient errors initially but want to back off significantly if the problem persists.
MaxOfRateLimiter: Holds multiple limiters and returns the maximum delay calculated among them. This allows combining different strategies, such as per-item limits and global limits.
WithMaxWaitRateLimiter: Wraps another limiter but enforces a maximum delay cap. If the calculated delay exceeds the cap, the cap value is returned instead.
Integration
The RateLimitingQueue uses these limiters to determine the delay duration. When AddRateLimited is called, it queries the RateLimiter.When() for the delay and calls AddAfter with that duration. The Forget method resets the retry counter for a specific item, typically called upon successful processing.