Building a distributed, fault-tolerant, and robust key/value store seemed like a super fun challenge to me, especially considering I don’t work on distributed systems at my job, nor do I have any prior experience in building them. While the whole thing was super challenging - to the point where sometimes I thought a problem was impossible to solve - it was also super fun. I thoroughly enjoyed the entire process of development. Here are a few challenges I faced (and you will probably, too, if you decide to build your own distributed database!)
What this post is not:
- A guide to building your own kv store.
- A thorough list of all the pitfalls you’re likely to fall in.
What this post assumes:
- You have a basic, rough understanding of what Raft is, and why it’s needed to build a kv store.
- You are in the process of, or have tried to, build your kv store.
Architecture
- The system comprises multiple servers that can all talk to each other, but no assumptions are made regarding the reliability of the network, order of message deliveries, or availability of servers (they can crash/fail at any time).
- Each server runs Raft
- Each server runs the kv/store code that talks to the “underlying” Raft algorithm. Raft’s
Start()
function is called to send a new message/command to replicate on servers. Start()
is a non-blocking call and returns immediately. It returns the index of the log where the command is expected to appear.- Clients send requests to “clerks”, whose job is to send RPCs to the kv servers, and retry if they think a message failed to send.
- Commands that the Raft code finishes replicating and committing successfully appear on a Go channel for the kv server to apply to the database - for example, an
Append(X,50)
command will cause the kv server to append the value 50 to the keyX
in its internal database.
Linearizability
The first challenge when creating such a database is offering linearizability. Put simply, linearizability offers a guarantee that once a request returns successfully, all clients immediately observe the results of that request. And for concurrent requests, linearizability guarantees that the requests execute in some specific, deterministic order. For example:
- Client X sends Put(balance, 5), Client Y sends Put(balance, 6), and X sends Append(balance, 0). In this case, should the value for balance be 50 or 60? Can it be 6? It all depends on several factors. If the first call returned before Y sent its Put, the only possibility is that balance is 60 (note that X will wait for its own call to return before sending append 0). However, if the call does not return, it can be either 50 or 60.
Linearizability is easy to guarantee if there’s a single server. However, when there are multiple servers and especially when the network is unreliable and servers can crash, we need to ensure that all servers execute the same execution order, regardless of what it is. So in our case of “it can be either 50 or 60”, it can be one of the two. Not both. If a server decides on an execution order of 50, then ALL servers must decide on the same order.
How linearizability is guaranteed
For multiple servers, guaranteeing linearizability is easy to explain, but hard to implement. The idea is basically: accept a command through an RPC, replicate that command across multiple servers, and once a majority of the servers have stored that command, return success to that RPC.
Where’s the hard part then?
- A request RPC (from client to a key/value server) fails but the replication (from kv server to multiple kv servers through Raft) succeeds, so the client retries the request and now we have duplicate executions for the same command (for example, duplicate appends)
- Network partitions could arise such that a leader accepts a command, but then loses leadership and no longer tries to replicate the command.
- A leader could accept a request, replicate it successfully, but fail just before responding to the client, who will then retry.
When a kv server starts, it starts a thread that continuously listens on a Go channel (called apply channel
) for commands that Raft successfully replicates. However, this leads to a problem: the thread that sent the command to Raft to replicate (the one that accepted the Clerk’s RPC and calls Start()
, called RPC thread henceforth) is different from the thread that listens on the apply channel, called the applier thread. To guarantee linearizability the RPC thread needs to somehow wait for the applier thread so it can return successfully to the clerk.
How should we implement such a wait?
Solution 1: Use Go channels. The RPC thread waits on a channel that the applier thread sends to. However, this doesn’t work when multiple clients are calling the RPC (note that a separate thread is auto-created for each RPC execution). Since multiple RPC threads will be sending the clients’ values to Raft, they will all be waiting on the same channel. When a value is sent by the applier thread on the channel, which thread should wake up? Go’s implementation of channels does not allow you to specify which thread will wake up and consume the value. It will be completely random, and we want only the thread that sent a specific value to wake up and return to the client when that specific value surfaces on the applier channel.
Solution 2: Use a shared variable to communicate to waiting threads that a value was applied to the database and that the RPC thread that started that specific command can successfully return to the client. This seems promising, but if multiple values are committed at the same time, we would get multiple values on the applier thread rapidly, and we could overwrite our shared variable before the RPC thread that the update was meant for even gets a chance to read it. Then the thread will wait forever and never return to the client.
Solution 3: It’s clear that both the RPC thread and the applier thread need to wait for each other - the RPC thread needs to wait for a new value to be produced by the applier thread, and the applier thread needs to wait for the produced value to be consumed by the correct RPC thread before producing another value. Anyone that has taken an OS course will quickly identify that this is basically the producer consumer problem. The applier thread is the producer, and the RPC threads that are waiting to return to the clients are the consumers. Thus, we have one producer and multiple consumers - but with a small caveat (discussed after the interim).
Interim - Thread communication
Go and Rust’s philosophy for thread communication is a little different from the traditional way of thread communication. Traditionally, you have a lock, and a shared variable between threads, and threads “pass” messages to each other using this variable. However, for these languages, you’ve likely heard at least someone say: “Don’t communicate by sharing memory; instead, share memory by communicating”.
Unfortunately, while the sentence advocates not using shared variables for interthread communication, I just could not come up with a way to use channels to accomplish what I wanted. Using a single channel was not enough, and creating multiple channels would mean that the applier thread somehow receives the sending end of a channel dynamically - whenever a new RPC was received. I considered sending the channel itself as part of the command. However, that’s pretty murky - all servers would receive the channel when the message comes up on the apply channel, regardless of whether they are the leader or not.
I concluded that the only way to achieve this was to share communicate by sharing memory - that is, implementing the producer/consumer problem.
Hiccups with producer/consumer
Implementing producer/consumer should be pretty simple: When a value appears on the apply channel, the applier thread must wait for the RPC thread that started it to read it before producing a new value. The applier thread can modify a shared variable to indicate which index a command appeared on, broadcast to wake up all consumers, and when they wake up, they check which value was applied last.
However, when a value is produced, a random consumer can’t consume it. Only the consumer that started the specific command that was produced can consume it. This can be solved quite easily - when all the consumers wake up, they all check if the command that was applied has the same index that Start()
promised them. Only the thread that correctly matches this consumes the value, signals the producer, and returns to the client. The rest all go back to sleep.
The next hiccup is with the fundamental assumption that the traditional producer/consumer problem makes: that there’s always a consumer to consume the value that the producer produces. And so, the producer blocks until the value is consumed. But what if there is no consumer at all, or the previous consumer left/died? Since the kv store + Raft is a dynamic system that evolves to adapt to failures and partitions, we can’t guarantee that there’s an RPC thread available to consume the value that the applier thread produces. The applier thread too exists and runs on all the kv servers, not just the one that’s the leader. However, only the leader accepts RPCs and thus has RPC consumer threads. The follower kv servers can’t run a simple producer/consumer emulation and block on consumers to consume the values they produce, since there are no consumers at all! It’s also not enough to only block if the server is the leader and not block otherwise - new servers could get elected in the middle of replicating a request.
Another problem that’s not a hiccup with the producer/consumer problem necessarily - but a challenge while implementing the kv store - is that with an unreliable network, clients can think an RPC failed when it actually succeeded. In this case, clients will retry that same request, and the kv store will treat it as a new request, appending that request to Raft’s log, and executing it on the database when the request is replicated successfully. Handling this should be pretty simple - just maintain an in-memory map of which client ID sent the last request. For each new request, we can assume that the client must have successfully seen and observed the previous request since a particular client will wait for a request to be successful before sending the next.
Finally, a leader could also crash and reboot in the middle of processing (AKA replicating) a request. Since the leader will read its persisted (to disk) state on boot, it will retain its leadership status. However, any waiting RPC threads will have died out. Handling this case took me quite some thought. I was in essence, trying to prevent duplicate requests from clients, since when a server fails, clients immediately start retrying. Since we can’t now maintain an in-memory map of which client sent which request (we can only persist Raft state, not the kv store state), we need to come up with another solution. Turns out, we can just send duplicate requests to Raft to replicate. Since replicated commands pop up one by one on the applier channel, we can simply build out a new set of which requests we have seen so far - and ignore the duplicate ones.
Mental blocks and impossible puzzles
During the development of the store, I faced a lot of challenges that simply seemed impossible to solve. For example, in the leader can crash and reboot situation, I was so fixated on my thought process of “you just can’t send a duplicate request to Raft to replicate” that I never stopped to think “well, what if you do?”. Handling such mental blocks and somehow widening your tunnel vision - is what I truly love about software engineering. The fact that you are forced, so frequently, to question your fundamental assumptions really improves your overall problem-solving skills.
Final thoughts
- During development, when faced with a problem I started Thrash Tuning quite often. Thrash tuning is a temptation that I know is bad, but it’s so hard to avoid - when you’re solving a larger bug and smaller ones start to show up, it’s almost an instinct to want them to go away asap so you can start focussing on the more “important” issue. However, needless to say, this leads to problems more often than not. Avoiding thrash tuning is easy to say, but very hard (not impossible though!) to truly implement in practice.
- I started out writing this post to ELI5 building a distributed key-value store. However, writing this was so hard because there are so many nuances you have to know beforehand - for example, it’s not practical to explain the inner workings of Raft or how it replicates commands in the face of crashes or network partitions. And so, I pivoted to writing this post intended for only those readers that are trying to implement this (and like me, got stuck). Implementing Raft was a lot easier than this because you have an entire paper that guides you and tells you exactly how to implement the system. However, there is honestly not a lot of advice out there on how to fix issues one will encounter when building a KV Store. Hopefully, this post helps someone!