Coordinating Goroutine Listeners

SQL
6 min read

We use Golang to build DoltDB, a version-controlled SQL database. In the process we frequently run into language peculiarities and trade-offs. The Go runtime thread manager makes many things possible, but also sometimes gives us too many options.

When redesigning table statistics we were faced with many ways to coordinate between background worker threads and responsive user threads. For context, statistics run as a background thread and balance (1) keeping estimates fresh, (2) rate limiting resource use compared to user queries, and (3) being responsive to user requests. The statistics themselves are lock-protected from SQL engine and worker access with a simple mutex. But clients can also request changes to the worker state. We dicussed the front-facing stats architecture a couple weeks ago. Today we focus on coordinating requests specifically to the worker state.

For example, a client might request a stats GC: call dolt_stats_gc(). For performance reasons, we would rather have the worker thread coordinate GC execution than allow client threads amplify background work. The worker is intentionally asynchronous and rate-limited because CPU resources are better spent responding to SQL queries (regular reads/writes).

How client calls communicate with the background update process is a key question that hinges on the choice of Golang primitive. I was surprised by the number of options and behavioral differences. Here we'll consider different ways for listener threads to communicate with and wait on a worker thread, from spin locking to broadcasts to listener queues.

Worker state

The examples here reference a StatsController type:

type StatsController struct {
    mu       sync.Mutex
    genCnt   &atomic.Uint64
}

In DoltDB, the stats controller is run in a background routine like so:

go func() {
    sc.run()
}

And loops grabbing the lastest database state and updating statistic estimates:

func (sc *StatsController) run() error {
    for {
        genStart := sc.genCnt.Load()
        latestCtx, err := sc.contextGen(ctx)
        if err != nil {
            return err
        }
        newStats, err = sc.newStatsForRoot(latestCtx)
        if err != nil {
            return err
        }
        ok, err := sc.trySwapStats(genStart, newStats);
        if err != nil {
            return err
        }
}

sc.genCnt is an atomic.Uint64s that avoids read version contention and allows the atomic swap below:

func (sc *StatsController) trySwap(prevGen, newStats *rootStats) (ok bool, err error) {

	if sc.genCnt.CompareAndSwap(prevGen, prevGen+1) {
...

This safety mechanism simplifies the lifecycle of the update thread, which we will not focus on here but is relevent for our coordination options.

The listeners we discuss next are user requests indirectly interacting with the worker thread.

Do extra work

The simplest way to block user calls on work being performed is to duplicate work. A user request waiting on a stats update can launch its own concurrent update sc.run().

The simplicity is offset by the resource inefficiency and churn of in-progress work. You have to choose between workers preempting each other, delaying updates, or letting concurrent workers repeat work.

Monitor shared state (spin-lock)

Instead of preempting the main thread, listeners can monitor for changes. For example, the genCnt variable below indicates how many updates have happened. When the count increases, we've finished at least one iteration:

func (sc *StatsController) wait() {
	start := sc.genCnt.Load()
	for {
		if sc.genCnt.Load() > start {
			return
		}
	}
}

Spinning on locks is not particularly efficient. The number of updates performed between entering and exiting the lock is also unpredictable.

Track update context (ctx.Done)

Message passing is the common alternative to sharing data. And the standard way for threads to signal awakeness in Go are contexts. So the first way I tried getting around spin-locks was to wait on an update context.

The function below starts a new activeThreadCtx at the beginning of a loop, invalidating the previous cycle in the process:

func (sc *StatsController) newThreadCtx(ctx context.Context) context.Context {
	sc.mu.Lock()
	defer sc.mu.Unlock()
	if sc.activeThreadCancel != nil {
		sc.activeThreadCancel()
	}
	sc.activeThreadCtx, sc.activeThreadCancel = context.WithCancel(cox)
	return sc.activeThreadCtx
}

User threads use the most recent active thread context to wait:

func (sc *StatsController) wait() {
	waitCtx := func() context.Context {
        sc.mu.Lock()
        defer sc.mu.Unlock()
        return sc.activeThreadCtx
	}()
	start := sc.genCnt.Load()
	for {
		if sc.genCnt.Load() > start {
			return
		}
	}
}

When the cycle finalizes, we invalidate the previous context and release any waiters:

func (sc *StatsController) trySwap(prevGen  uint64, newStats *rootStats) (ok bool, err error) {
	if sc.genCnt.CompareAndSwap(prevGen, prevGen+1) {
        sc.mu.Lock()
        defer sc.mu.Unlock()

		if sc.activeThreadCancel != nil {
			defer sc.activeThreadCancel()
		}
...

Contexts can be cancelled for many reasons, and it is difficult for the listener to understand why the context was cancelled.

Broadcast (sync.Cond)

Instead of abusing the context model, we can use Golang's formal broadcast primitive sync.Cond. We instantiate a condition with an associated mutex:

sc.cond = sync.NewCond(&sc.mu)

The Wait below cedes the goroutine to the runtime manager and releases the lock (it has to be the same lock) (note that the counter is an int now):

func (sc *StatsController) wait() {
    sc.mu.Lock()
    defer sc.mu.Unlock()
	start := sc.genCnt
	for sc.genCnt <= start {
		sc.cond.Wait()
	}
}

When the worker thread runs sc.cond.Broadcast(), the waiting thread reawakes, acquires the lock, and checks if our condition is satisfied. Below we've modified the swap code to trigger a broadcast:

func (sc *StatsController) trySwap(prevGen uint64, newStats *rootStats) (ok bool, err error) {
    sc.mu.Lock()
    defer sc.mu.Unlock()
    if sc.genCnt == prevGen {
        sc.genCnt++
		defer sc.cond.Broadcast()
...

The broadcast is elegant but inflexibile. We need secondary variables to track whether the thing we are waiting for has completed. And each additional event type we want to track requires its own condition and counter.

Broadcasts also mesh poorly with the contexts. A thread paused on a condition cannot be preempted if its own context is cancelled. For example, we cannot do something like below:

func wait(ctx context, cond *sync.Cond) {
    select {
    case <-ctx.Done():
    case <-cond.Wait():
    }
}

Instead, user queries ignore SIGINT requests and block until the condition is signaled. We need a more expressive coordination primitive.

Listening queue

A queue of custom listeners lets us signal waiting threads selectively based on the types of event they are waiting for. Most of this section is implementation, but the goal is for waiters to block on a channel that the worker selectively fires:

func wait(ctx context) {
    listen := addListener(leSwap)
    select {
    case <-ctx.Done():
    case <-listen:
    }
}

This maintains most of the elegance of the broadcast. The waiter can be dumb and trust the worker to let it know when something has happened.

Here is our listener object:

type listenNode struct {
	e listenerEvent
	c chan listenerEvent
	n *listenNode
}

The e listenerEvent is a bitmask of event types that alter this listener. The c channel communicates which specific event triggered the listener. And n is a link to then next object in the list.

A user thread adds a listener to the queue:

func (sc *StatsController) addListener(e listenerEvent) (chan listenerEvent, error) {
	sc.statsMu.Lock()
	defer sc.statsMu.Unlock()
	l := &listenNode{e: e, c: make(chan listenerEvent, 1)}
	if sc.listeners != nil {
		l.n = sc.listeners
	}
	sc.listeners = l
	return l.c, nil
}

And then blocks on the listener signaling or the user cancelling the request:

func wait(ctx context) {
    listen := addListener(leSwap)
    select {
    case <-ctx.Done():
    case <-listen:
    }
}

When the worker thread sends a signal, we walk the linked list and finalize listeners whose bitmask intersects the incoming event. All listeners are closed on a stop signal leStop. The remaining listeners circle back into the queue.

func (sc *StatsController) signalListener(s listenerEvent) {
	var root, keep *listenNode
	n := sc.listeners
	for n != nil {
		if (n.e|leStop)&s > 0 {
			n.c <- s
			close(n.c)
		} else if root == nil {
			root = n
			keep = n
		} else {
			keep.n = n
			keep = n
		}
		n = n.n
	}
	if keep != nil {
		keep.n = nil
	}
	sc.listeners = root
}

Listeners can easily distinguish between events, and gracefully shutdown if the update thread or their own context is cancelled.

Conclusion

We looked at a few ways for waiter threads to coordinate with worker threads in Golang. Each option has pros/cons and are suitable for different use cases.

worker-waiter-coord

There are even ways to compose even more elaborate schemes than those here. For example, we could mix the benefits of listening and broadcasting by extending the worker context example to "snoop". Embedding different error types in the snooped context allows waiting threads to distinguish between error types. If the context was closed but without our expected event, we can fetch the latest and wait again.

I also think it would be interesting to investigate the performance differences between these options as a way to better understand the Go runtime scheduler. However, I can only do so much on on weekday! Maybe next time.

If you have any questions about Dolt, databases, or Golang performance reach out to us on Twitter, Discord, and GitHub!

SHARE

JOIN THE DATA EVOLUTION

Get started with Dolt

Or join our mailing list to get product updates.