Coordinating Goroutine Listeners
We use Golang to build DoltDB, a version-controlled SQL database. In the process we frequently run into language peculiarities and trade-offs. The Go runtime thread manager makes many things possible, but also sometimes gives us too many options.
When redesigning table statistics we were faced with many ways to coordinate between background worker threads and responsive user threads. For context, statistics run as a background thread and balance (1) keeping estimates fresh, (2) rate limiting resource use compared to user queries, and (3) being responsive to user requests. The statistics themselves are lock-protected from SQL engine and worker access with a simple mutex. But clients can also request changes to the worker state. We dicussed the front-facing stats architecture a couple weeks ago. Today we focus on coordinating requests specifically to the worker state.
For example, a client might request a stats GC: call
dolt_stats_gc()
. For performance reasons, we would rather have
the worker thread coordinate GC execution than
allow client threads amplify background work.
The worker is intentionally asynchronous and rate-limited because CPU
resources are better spent responding to SQL queries (regular
reads/writes).
How client calls communicate with the background update process is a key question that hinges on the choice of Golang primitive. I was surprised by the number of options and behavioral differences. Here we'll consider different ways for listener threads to communicate with and wait on a worker thread, from spin locking to broadcasts to listener queues.
Worker state
The examples here reference a StatsController
type:
type StatsController struct {
mu sync.Mutex
genCnt &atomic.Uint64
}
In DoltDB, the stats controller is run in a background routine like so:
go func() {
sc.run()
}
And loops grabbing the lastest database state and updating statistic estimates:
func (sc *StatsController) run() error {
for {
genStart := sc.genCnt.Load()
latestCtx, err := sc.contextGen(ctx)
if err != nil {
return err
}
newStats, err = sc.newStatsForRoot(latestCtx)
if err != nil {
return err
}
ok, err := sc.trySwapStats(genStart, newStats);
if err != nil {
return err
}
}
sc.genCnt
is an atomic.Uint64
s that avoids read
version contention and allows the atomic swap below:
func (sc *StatsController) trySwap(prevGen, newStats *rootStats) (ok bool, err error) {
if sc.genCnt.CompareAndSwap(prevGen, prevGen+1) {
...
This safety mechanism simplifies the lifecycle of the update thread, which we will not focus on here but is relevent for our coordination options.
The listeners we discuss next are user requests indirectly interacting with the worker thread.
Do extra work
The simplest way to block user calls on work being performed is to
duplicate work. A user request waiting on a stats update can launch its own
concurrent update sc.run()
.
The simplicity is offset by the resource inefficiency and churn of in-progress work. You have to choose between workers preempting each other, delaying updates, or letting concurrent workers repeat work.
Monitor shared state (spin-lock)
Instead of preempting the main thread, listeners can monitor
for changes. For example, the genCnt
variable below indicates
how many updates have happened. When the count increases, we've finished
at least one iteration:
func (sc *StatsController) wait() {
start := sc.genCnt.Load()
for {
if sc.genCnt.Load() > start {
return
}
}
}
Spinning on locks is not particularly efficient. The number of updates performed between entering and exiting the lock is also unpredictable.
Track update context (ctx.Done
)
Message passing is the common alternative to sharing data. And the standard way for threads to signal awakeness in Go are contexts. So the first way I tried getting around spin-locks was to wait on an update context.
The function below starts a new activeThreadCtx
at the beginning of a
loop, invalidating the previous cycle in the process:
func (sc *StatsController) newThreadCtx(ctx context.Context) context.Context {
sc.mu.Lock()
defer sc.mu.Unlock()
if sc.activeThreadCancel != nil {
sc.activeThreadCancel()
}
sc.activeThreadCtx, sc.activeThreadCancel = context.WithCancel(cox)
return sc.activeThreadCtx
}
User threads use the most recent active thread context to wait:
func (sc *StatsController) wait() {
waitCtx := func() context.Context {
sc.mu.Lock()
defer sc.mu.Unlock()
return sc.activeThreadCtx
}()
start := sc.genCnt.Load()
for {
if sc.genCnt.Load() > start {
return
}
}
}
When the cycle finalizes, we invalidate the previous context and release any waiters:
func (sc *StatsController) trySwap(prevGen uint64, newStats *rootStats) (ok bool, err error) {
if sc.genCnt.CompareAndSwap(prevGen, prevGen+1) {
sc.mu.Lock()
defer sc.mu.Unlock()
if sc.activeThreadCancel != nil {
defer sc.activeThreadCancel()
}
...
Contexts can be cancelled for many reasons, and it is difficult for the listener to understand why the context was cancelled.
Broadcast (sync.Cond
)
Instead of abusing the context model, we can use Golang's formal
broadcast primitive sync.Cond
. We instantiate a
condition with an associated mutex:
sc.cond = sync.NewCond(&sc.mu)
The Wait
below cedes the goroutine to the runtime manager and releases the
lock (it has to be the same lock) (note that the counter is an int
now):
func (sc *StatsController) wait() {
sc.mu.Lock()
defer sc.mu.Unlock()
start := sc.genCnt
for sc.genCnt <= start {
sc.cond.Wait()
}
}
When the worker thread runs sc.cond.Broadcast()
, the waiting thread
reawakes, acquires the lock, and checks if our condition is satisfied.
Below we've modified the swap code to trigger a
broadcast:
func (sc *StatsController) trySwap(prevGen uint64, newStats *rootStats) (ok bool, err error) {
sc.mu.Lock()
defer sc.mu.Unlock()
if sc.genCnt == prevGen {
sc.genCnt++
defer sc.cond.Broadcast()
...
The broadcast is elegant but inflexibile. We need secondary variables to track whether the thing we are waiting for has completed. And each additional event type we want to track requires its own condition and counter.
Broadcasts also mesh poorly with the contexts. A thread paused on a condition cannot be preempted if its own context is cancelled. For example, we cannot do something like below:
func wait(ctx context, cond *sync.Cond) {
select {
case <-ctx.Done():
case <-cond.Wait():
}
}
Instead, user queries ignore SIGINT requests and block until the condition is signaled. We need a more expressive coordination primitive.
Listening queue
A queue of custom listeners lets us signal waiting threads selectively based on the types of event they are waiting for. Most of this section is implementation, but the goal is for waiters to block on a channel that the worker selectively fires:
func wait(ctx context) {
listen := addListener(leSwap)
select {
case <-ctx.Done():
case <-listen:
}
}
This maintains most of the elegance of the broadcast. The waiter can be dumb and trust the worker to let it know when something has happened.
Here is our listener object:
type listenNode struct {
e listenerEvent
c chan listenerEvent
n *listenNode
}
The e listenerEvent
is a bitmask of event types that alter this listener.
The c
channel communicates which specific event triggered the
listener. And n
is a link to then next object in the list.
A user thread adds a listener to the queue:
func (sc *StatsController) addListener(e listenerEvent) (chan listenerEvent, error) {
sc.statsMu.Lock()
defer sc.statsMu.Unlock()
l := &listenNode{e: e, c: make(chan listenerEvent, 1)}
if sc.listeners != nil {
l.n = sc.listeners
}
sc.listeners = l
return l.c, nil
}
And then blocks on the listener signaling or the user cancelling the request:
func wait(ctx context) {
listen := addListener(leSwap)
select {
case <-ctx.Done():
case <-listen:
}
}
When the worker thread sends a signal, we walk the linked list and
finalize listeners whose bitmask intersects the incoming event. All
listeners are closed on a stop signal leStop
. The remaining listeners
circle back into the queue.
func (sc *StatsController) signalListener(s listenerEvent) {
var root, keep *listenNode
n := sc.listeners
for n != nil {
if (n.e|leStop)&s > 0 {
n.c <- s
close(n.c)
} else if root == nil {
root = n
keep = n
} else {
keep.n = n
keep = n
}
n = n.n
}
if keep != nil {
keep.n = nil
}
sc.listeners = root
}
Listeners can easily distinguish between events, and gracefully shutdown if the update thread or their own context is cancelled.
Conclusion
We looked at a few ways for waiter threads to coordinate with worker threads in Golang. Each option has pros/cons and are suitable for different use cases.
There are even ways to compose even more elaborate schemes than those here. For example, we could mix the benefits of listening and broadcasting by extending the worker context example to "snoop". Embedding different error types in the snooped context allows waiting threads to distinguish between error types. If the context was closed but without our expected event, we can fetch the latest and wait again.
I also think it would be interesting to investigate the performance differences between these options as a way to better understand the Go runtime scheduler. However, I can only do so much on on weekday! Maybe next time.
If you have any questions about Dolt, databases, or Golang performance reach out to us on Twitter, Discord, and GitHub!