Go channels to solve interface impedance mismatch

GOLANG
6 min read

We're hard at work on compatibility for Doltgres, the world's first and only version-controlled Postgres-compatible SQL database. This means getting all the incredibly complex queries on the pg_catalog tables issued by various tools returning correct results in a reasonable amount of time. To this end, for the last week I've been busy implementing index support for our virtual pg_catalog tables. Without indexes, some queries involving joins on five or more of these tables (surprisingly common in the Postgres library world) simply won't return, so it's table stakes to have them.

I chose Google's btree package to store and query index information in memory. Performance is good, and the package is easy enough to use, but I ran into a problem during integration.

The btree package scans ranges of elements with methods that look like this:

func (t *BTreeG[T]) AscendRange(greaterOrEqual, lessThan T, iterator ItemIteratorG[T])

iterator is a callback function that gets called for every element in the range provided. A single call to AscendRange will call iterator N times, once for every element found in the range provided.

Meanwhile, our SQL engine interfaces rely on a more classical iterator pattern, one which returns a single element every time a Next() method is called. The interface looks like this:

// RowIter is an iterator that produces rows.
type RowIter interface {
	// Next retrieves the next row. It will return io.EOF if it's the last row.
	// After retrieving the last row, Close will be automatically closed.
	Next(ctx *Context) (Row, error)
	Closer
}

The problem in front of me is a form of impedance mismatch between these two interfaces, specifically relating to their periodicity. The btree package's iteration methods have a natural period of every element in the range, whereas go-mysql-server's interfaces have a natural period of a single element. How can I get them to agree, so that I can scan a range during a call to Next() but return only a single element, saving the rest for subsequent calls to Next()?

There are two basic techniques to achieve this that I'm aware of.

The first is to store the results of the AscendRange() call in a slice, then return one every time the iterator is called. This works fine, but it's wasteful: not only do you have to store the entire slice up front for tables that can have very many elements, but simply allocating and collecting the slices puts significant pressure on garbage collection, dragging performance for the entire process.

The second technique is to fix the impedance mismatch with Go channels.

Go channels to the rescue

Like many of you, I read the Go tour when I was learning the language. When I got to the chapter on channels, I encountered this example, which seemed very silly to me.

package main

import (
	"fmt"
)

func fibonacci(n int, c chan int) {
	x, y := 0, 1
	for i := 0; i < n; i++ {
		c <- x
		x, y = y, x+y
	}
	close(c)
}

func main() {
	c := make(chan int, 10)
	go fibonacci(cap(c), c)
	for i := range c {
		fmt.Println(i)
	}
}

Why bother with concurrency at all here? Sure, this is a toy example for a tutorial, but isn't it pretty wasteful to allocate a channel and a new goroutine when the loop could be run in place? Won't this give newcomers the wrong idea what channels are for?

As it turns out, there is a perfectly good reason to pursue this pattern of iteration, and it's to solve the exact problem in front of me. A channel is an easy, efficient way to bridge two interfaces of differing periods.

Let's look at how this works.

First, we have a struct which we'll use to implement the sql.RowIter interface. It will have some accessor fields that give it access to a *btree.BTreeG for its range scan, as well as some sql engine bookkeeping.

// inMemIndexScanIter is a sql.RowIter that uses an in-memory btree index to satisfy index lookups
// on pg_catalog tables.
type inMemIndexScanIter[T any] struct {
	lookup         sql.IndexLookup
	rangeConverter RangeConverter[T]
	btreeAccess    BTreeIndexAccess[T]
	rowConverter   rowConverter[T]
	rangeIdx       int
	nextChan       chan T
}

The magic happens in the nextItem() method, where the btree data structure is queried. Note that this method is parameterized so it can be reused for the in-memory storage of all the different pg_catalog tables we need it for. I've heavily annotated this logic with comments for readers unfamiliar with our domain.

// nextItem returns the next item from the index lookup, or io.EOF if there are no more items.
// Needs to return a pointer to T so that we can return nil for EOF.
func (l *inMemIndexScanIter[T]) nextItem() (*T, error) {
	if l.rangeIdx >= l.lookup.Ranges.Len() {
		return nil, io.EOF
	}

    // If we have a channel established a scan is in progress, 
    // so read from it if we can
	if l.nextChan != nil {
		next, ok := <-l.nextChan
        // If we failed to read, then the channel was closed, so try again with the next range
		if !ok {
			l.nextChan = nil
			l.rangeIdx++
			return l.nextItem()
		}
		return &next, nil
	}

    // Start a new goroutine for the next index scan
	l.nextChan = make(chan T)
	rng := l.lookup.Ranges.ToRanges()[l.rangeIdx]
	go func() {
        // When iteration is done, close this channel to signal that this range 
        // is exhausted to the channel reader
		defer func() {
			close(l.nextChan)
		}()

        // Fetch the bounds for the range scan
		gte, hasLowerBound, lt, hasUpperBound := l.rangeConverter.getIndexScanRange(rng, l.lookup.Index)
        
        // Our iterator function just sends elements to the channel
        // (and will block until the channel is read)
		itr := func(item T) bool {
			l.nextChan <- item
			return true
		}

        // idx is a btree.BTreeG
		idx := l.btreeAccess.getIndex(l.lookup.Index.(pgCatalogInMemIndex).name)
        
        // Now just iterate the range requested with the iterator 
        // that sends on the channel
		if hasLowerBound && hasUpperBound {
			idx.AscendRange(gte, lt, itr)
		} else if hasLowerBound {
			idx.AscendGreaterOrEqual(gte, itr)
		} else if hasUpperBound {
			idx.AscendLessThan(lt, itr)
		} else {
			return
		}
	}()

    // Now that we have a channel with elements waiting on it, 
    // try this call again to read from it
	return l.nextItem()
}

Let's go over the channel part in more detail to see what it's doing. First let's look at the most important bit, the callback we pass to the B-tree scan.

        // Our iterator function just sends elements to the channel
        // (and will block until the channel is read)
		itr := func(item T) bool {
			l.nextChan <- item
			return true
		}

This is the function that will get called for every element in range in the B-tree we're scanning, and all it does is send that element to the channel we created and stored in the iterator struct. This is the part that bridges the time gap between the lifecycles of AscendRange() and Next(). The index scan happens in the background, and queues up a single element to be read by Next(). That happens here:

    // If we have a channel established a scan is in progress, 
    // so read from it if we can
	if l.nextChan != nil {
		next, ok := <-l.nextChan
        // If we failed to read, then the channel was closed, so try again with the next range
		if !ok {
			l.nextChan = nil
			l.rangeIdx++
			return l.nextItem()
		}
		return &next, nil
	}

Reading elements from the channel is obvious, but the behavior of next, ok might not be: ok will only be false when the channel has been closed, which signals to the reader that we should advance to the next range and keep going.

It's also interesting to note that in a generic function like this one, it's only possible to return a nil generic value by declaring the function return type as *T rather than T.

Conclusion

This was a surprising solution to me, one of the few times in years of Go development that I've used channels in this way, for a non-concurrent use case. The technique is generally useful, and because the performance of goroutines is quite good, it's viable even on the hot path of an application. One obvious extension would be to buffer the channel to get more parallel processing on the scan, but I haven't profiled that yet to see how much it matters in practice.

Questions about Go channels or Doltgres? Come by our Discord to talk to our engineering team and meet other Doltgres users.

SHARE

JOIN THE DATA EVOLUTION

Get started with Dolt

Or join our mailing list to get product updates.