Building my own distributed Key/Value Store

24 Jul 2022

Building a distributed, fault-tolerant, and robust key/value store seemed like a super fun challenge to me, especially considering I don’t work on distributed systems at my job, nor do I have any prior experience in building them. While the whole thing was super challenging - to the point where sometimes I thought a problem was impossible to solve - it was also super fun. I thoroughly enjoyed the entire process of development. Here are a few challenges I faced (and you will probably, too, if you decide to build your own distributed database!)

What this post is not:

What this post assumes:

Architecture

Linearizability

The first challenge when creating such a database is offering linearizability. Put simply, linearizability offers a guarantee that once a request returns successfully, all clients immediately observe the results of that request. And for concurrent requests, linearizability guarantees that the requests execute in some specific, deterministic order. For example:

Linearizability is easy to guarantee if there’s a single server. However, when there are multiple servers and especially when the network is unreliable and servers can crash, we need to ensure that all servers execute the same execution order, regardless of what it is. So in our case of “it can be either 50 or 60”, it can be one of the two. Not both. If a server decides on an execution order of 50, then ALL servers must decide on the same order.

How linearizability is guaranteed

For multiple servers, guaranteeing linearizability is easy to explain, but hard to implement. The idea is basically: accept a command through an RPC, replicate that command across multiple servers, and once a majority of the servers have stored that command, return success to that RPC.

Where’s the hard part then?

When a kv server starts, it starts a thread that continuously listens on a Go channel (called apply channel) for commands that Raft successfully replicates. However, this leads to a problem: the thread that sent the command to Raft to replicate (the one that accepted the Clerk’s RPC and calls Start(), called RPC thread henceforth) is different from the thread that listens on the apply channel, called the applier thread. To guarantee linearizability the RPC thread needs to somehow wait for the applier thread so it can return successfully to the clerk.

How should we implement such a wait?

Solution 1: Use Go channels. The RPC thread waits on a channel that the applier thread sends to. However, this doesn’t work when multiple clients are calling the RPC (note that a separate thread is auto-created for each RPC execution). Since multiple RPC threads will be sending the clients’ values to Raft, they will all be waiting on the same channel. When a value is sent by the applier thread on the channel, which thread should wake up? Go’s implementation of channels does not allow you to specify which thread will wake up and consume the value. It will be completely random, and we want only the thread that sent a specific value to wake up and return to the client when that specific value surfaces on the applier channel.

Solution 2: Use a shared variable to communicate to waiting threads that a value was applied to the database and that the RPC thread that started that specific command can successfully return to the client. This seems promising, but if multiple values are committed at the same time, we would get multiple values on the applier thread rapidly, and we could overwrite our shared variable before the RPC thread that the update was meant for even gets a chance to read it. Then the thread will wait forever and never return to the client.

Solution 3: It’s clear that both the RPC thread and the applier thread need to wait for each other - the RPC thread needs to wait for a new value to be produced by the applier thread, and the applier thread needs to wait for the produced value to be consumed by the correct RPC thread before producing another value. Anyone that has taken an OS course will quickly identify that this is basically the producer consumer problem. The applier thread is the producer, and the RPC threads that are waiting to return to the clients are the consumers. Thus, we have one producer and multiple consumers - but with a small caveat (discussed after the interim).

Interim - Thread communication

Go and Rust’s philosophy for thread communication is a little different from the traditional way of thread communication. Traditionally, you have a lock, and a shared variable between threads, and threads “pass” messages to each other using this variable. However, for these languages, you’ve likely heard at least someone say: “Don’t communicate by sharing memory; instead, share memory by communicating”.

Unfortunately, while the sentence advocates not using shared variables for interthread communication, I just could not come up with a way to use channels to accomplish what I wanted. Using a single channel was not enough, and creating multiple channels would mean that the applier thread somehow receives the sending end of a channel dynamically - whenever a new RPC was received. I considered sending the channel itself as part of the command. However, that’s pretty murky - all servers would receive the channel when the message comes up on the apply channel, regardless of whether they are the leader or not.

I concluded that the only way to achieve this was to share communicate by sharing memory - that is, implementing the producer/consumer problem.

Hiccups with producer/consumer

Implementing producer/consumer should be pretty simple: When a value appears on the apply channel, the applier thread must wait for the RPC thread that started it to read it before producing a new value. The applier thread can modify a shared variable to indicate which index a command appeared on, broadcast to wake up all consumers, and when they wake up, they check which value was applied last.

However, when a value is produced, a random consumer can’t consume it. Only the consumer that started the specific command that was produced can consume it. This can be solved quite easily - when all the consumers wake up, they all check if the command that was applied has the same index that Start() promised them. Only the thread that correctly matches this consumes the value, signals the producer, and returns to the client. The rest all go back to sleep.

The next hiccup is with the fundamental assumption that the traditional producer/consumer problem makes: that there’s always a consumer to consume the value that the producer produces. And so, the producer blocks until the value is consumed. But what if there is no consumer at all, or the previous consumer left/died? Since the kv store + Raft is a dynamic system that evolves to adapt to failures and partitions, we can’t guarantee that there’s an RPC thread available to consume the value that the applier thread produces. The applier thread too exists and runs on all the kv servers, not just the one that’s the leader. However, only the leader accepts RPCs and thus has RPC consumer threads. The follower kv servers can’t run a simple producer/consumer emulation and block on consumers to consume the values they produce, since there are no consumers at all! It’s also not enough to only block if the server is the leader and not block otherwise - new servers could get elected in the middle of replicating a request.

Another problem that’s not a hiccup with the producer/consumer problem necessarily - but a challenge while implementing the kv store - is that with an unreliable network, clients can think an RPC failed when it actually succeeded. In this case, clients will retry that same request, and the kv store will treat it as a new request, appending that request to Raft’s log, and executing it on the database when the request is replicated successfully. Handling this should be pretty simple - just maintain an in-memory map of which client ID sent the last request. For each new request, we can assume that the client must have successfully seen and observed the previous request since a particular client will wait for a request to be successful before sending the next.

Finally, a leader could also crash and reboot in the middle of processing (AKA replicating) a request. Since the leader will read its persisted (to disk) state on boot, it will retain its leadership status. However, any waiting RPC threads will have died out. Handling this case took me quite some thought. I was in essence, trying to prevent duplicate requests from clients, since when a server fails, clients immediately start retrying. Since we can’t now maintain an in-memory map of which client sent which request (we can only persist Raft state, not the kv store state), we need to come up with another solution. Turns out, we can just send duplicate requests to Raft to replicate. Since replicated commands pop up one by one on the applier channel, we can simply build out a new set of which requests we have seen so far - and ignore the duplicate ones.

Mental blocks and impossible puzzles

During the development of the store, I faced a lot of challenges that simply seemed impossible to solve. For example, in the leader can crash and reboot situation, I was so fixated on my thought process of “you just can’t send a duplicate request to Raft to replicate” that I never stopped to think “well, what if you do?”. Handling such mental blocks and somehow widening your tunnel vision - is what I truly love about software engineering. The fact that you are forced, so frequently, to question your fundamental assumptions really improves your overall problem-solving skills.

Final thoughts