Coordinator Wrapper Structure
The foundational component relies on a gRPC wrapper that manages lifecycle, external dependencies, and inter-service communication. Key fields track the internal coordinator instence, the RPC server instance, error propagation channels, and client proxies for data and query routing.
type GrpcCoordinator struct {
coordinatorCore RootCoordComponent
rpcServer *grpc.Server
terminationChan chan error
waitGroup sync.WaitGroup
lifecycleCtx context.Context
lifecycleCancel context.CancelFunc
nodeIdentifier atomic.Int64
kvBackend *clientv3.Client
txnBackend *txnkv.Client
dataCoordProxy DataCoordClient
queryCoordProxy QueryCoordClient
buildDataClient func(addr string, kv *clientv3.Client) DataCoordClient
buildQueryClient func(addr string, kv *clientv3.Client) QueryCoordClient
}
Dependency injection for coordinatorCore, dataCoordProxy, and queryCoordProxy occurs during the instantiation phase. The coordinatorCore field adheres to the RootCoordComponent contract, abstracting the underlying API implementations.
Initialization Sequence
The startup process is triggered through a role launcher that delegates to a generic component runner. This runner accepts a factory function responsible for constructing the coordinator instance.
func RoleLauncher.StartRootCoord(ctx context.Context, isolated bool, wg *sync.WaitGroup) Service {
wg.Add(1)
return LaunchServiceComponent(ctx, isolated, wg, BuildCoordinator, metrics.RegisterCoordinator)
}
The BuildCoordinator function acts as the primary constructor. It delegates to the internal server initializer and wraps the result in a higher-level service object.
func BuildCoordinator(ctx context.Context, factory dependency.Factory) (*CoordinatorService, error) {
instance, err := InitializeCoordinator(ctx, factory)
if err != nil {
return nil, err
}
return &CoordinatorService{
executionCtx: ctx,
server: instance,
}, nil
}
Inside InitializeCoordinator, the execution context is scoped, error channels are buffered, and client generation callbacks are registered. The core logic is instantiated via corepkg.InitializeCore, which returns a concrete Core object satisfying the RootCoordComponent interface. This confirms that Core serves as the operational backbone.
func InitializeCoordinator(ctx context.Context, factory dependency.Factory) (*GrpcCoordinator, error) {
scopedCtx, cancel := context.WithCancel(ctx)
svc := &GrpcCoordinator{
lifecycleCtx: scopedCtx,
lifecycleCancel: cancel,
terminationChan: make(chan error, 1),
}
svc.registerClientFactories()
var initErr error
svc.coordinatorCore, initErr = corepkg.InitializeCore(ctx, factory)
if initErr != nil {
return nil, initErr
}
return svc, nil
}
Execution and Lifecycle Management
Once the wrapper is successfully constructed, the generic launcher orchestrates the initialization and runtime transition. The LaunchServiceComponent function operates asynchronously, utilizing a factory to resolve dependencies, configuring runtime modes based on deployment topology, and triggering the blocking Run method.
func LaunchServiceComponent[T Service](ctx context.Context, isolated bool, wg *sync.WaitGroup, builder func(context.Context, dependency.Factory) (T, error), registerMetrics func(*prometheus.Registry)) Service {
var instance T
readySignal := make(chan struct{})
go func() {
defer close(readySignal)
depFactory := dependency.NewFactory(isolated)
var buildErr error
instance, buildErr = builder(ctx, depFactory)
if buildErr != nil {
return
}
if isolated {
config.SetExecutionMode(typeutil.StandaloneMode)
} else {
config.SetExecutionMode(typeutil.ClusterMode)
}
if err := instance.Init(); err != nil {
return
}
registerMetrics(prometheus.DefaultRegisterer)
go instance.Run()
}()
<-readySignal
wg.Done()
return instance
}
The goroutine guaratnees that initialization completes before signaling readiness. After metric registration, the Run method is invoked concurrently, transitioning the component from a configured state to an active request-processing phase.