Designing Data Intensive Applications: Ch.6 Partitioning (Part 1: Partitioning of Key-Value Data)

Designing Data Intensive Applications: Ch.6 Partitioning (Part 1: Partitioning of Key-Value Data)

In the previous series of articles, we discussed Chapter 5 of Data-Intensive Applications which tackled replication. And we learned that replication is essentially having multiple copies of the same dataset spread across multiple machines for a multitude of reasons, such as increasing the system's availability and fault tolerance and increasing throughput.

But for very large datasets or very high query throughput, that is not sufficient: we also need to break the data up into partitions, hence, some systems choose to partition their dataset onto multiple machines which can be in turn replicated.

There are many approaches to partitioning data, each with its use cases and each comes with its complications. In this article, we will discuss, perhaps, the simplest partitioning technique, which is partitioning by key-value and we will learn its variants, use cases and complications.

Partitioning and Replication

Normally, partitions are defined in such a way that each piece of data (each record, row or document) belongs to exactly one partition. The main reason for wanting to partition data is scalability. Different partitions can be placed on different nodes in a shared-nothing cluster (a shared-nothing cluster is one whose nodes are independent and can satisfy requests on their own in the sense that they don't share processor/memory/storage unit which helps in reducing contention and avoiding single points of failure). Thus, a large dataset can be distributed across many disks, and the query load can be distributed across many processors.

When a dataset is partitioned, small queries that operate on a single partition can be executed independently on that partition. While more complex queries that might span multiple partitions can be parallelized across many nodes which gets significantly more complex.

One other thing to note is that partitioning is often combined with replication so that copies of each partition are stored on multiple nodes. So even though each record only belongs to one partition, it may be replicated to many other nodes to improve availability and fault tolerance.

A node may store more than one partition. If a leader-follower replication model is used, the combination of partitioning and replication can look like the following:

As we can see in the previous diagram, we have a system consisting of 4 nodes and a dataset that's partitioned into 3 partitions. Each partition is stored on 3 different nodes in a leader-follower architecture in which one leader is replicating to 2 other followers. So for example, partition 1 leader resides on node 1 and it's replicating to its followers on nodes 3 and 4, and so on.

It's very important to note that the choice of the partitioning scheme is mostly independent of the choice of the replication scheme, so for all things considered in this chapter, we will ignore replication.

Now let's discuss the first and perhaps the simplest approach to partitioning data which is partitioning by key-value.

Partitioning of key-value data

Say you have a large amount of data, and you want to partition it. How do you decide which records to store on which node?

The main goal behind partitioning is scalability as we discussed in the previous section. So it makes sense that we want to partition data such that it's spread evenly across all nodes in the cluster, so in theory, 10 nodes can handle 10 times more read and write throughput of a single node.

But then if the partitioning is skewed, some partitions will have more data and this more queries than others which will make this partition a bottleneck in the scalability of this system. The worst case is if the partitioning scheme is so skewed that it routes all data to a single partition, in which case all other partitions are essentially idle. Usually, when a partition has a disproportional high load, it's called a hot spot.

Randomly Write Data to Partitions

The simplest approach to avoiding hot spots is to assign data to partitions randomly, so now we know that every row of data has an equal probability of ending up in any of the system's partitions which will avoid hot spots. But this approach has a major disadvantage: when you're trying to read any of the items you have randomly written, you will have no way of knowing where this item is located which leaves you no option but to query all the partitions (maybe in parallel) until you find the item you desire which is quite inefficient and will lead to high read latency and high load on all partitions which defeats the purpose of partitioning in the first place.

Partitioning by Key Range

In the previous approach, we tended to optimize writes more than reads which lead to an unbalanced system with high read latency. Now let's think of how we can also optimize for read latency when partitioning data.

Let's assume that our data is primary key identifiable, in the sense that each row of data has some property that we can use to identify this item (let's ignore uniqueness for now). Let's the following diagram for example:

Let's assume that the dataset we are working with consists of a bunch of books where each book has a title we can use to identify this book. In an old-fashioned paper encyclopedia, those books will be sorted by title like the diagram.

What this achieves is that now our dataset is partitioned by title into 12 partitions. Each partition contains a sorted range of data with a minimum and maximum (for example partition one would contain all books with titles starting with the letter A). So now if we are looking for a specific book by its title (primary key), we can immediately and quite easily determine to which partition it belongs and perform a direct read to that partition which improves read latency.

Moreover, each partition is also internally sorted, so that makes it easy to perform range reads, like for example if we want to fetch all books with titles that start with the letter C till all books that start with the letter H, we can read partitions 3, 4, 5 and 6 in order (maybe even while applying other conditions, like for example, those books that were written by a specific author, etc. although that might not be the most efficient method, we will learn more about partitioning via secondary indexes in the next article).

There is however a major flaw in this type of partitioning:

Those parts are not evenly spread. For example, volume 1 contains books whose titles start with A and B, but volume 12 contains books whose titles start with T, U, V, X, Y and Z. Also, it's a fact that a few words in the English language start with T, U, V, X, Y and Z which means that we will always have hot spots in those popular partitions, like the ones that contain A for example.

The same problem can also happen with some special workloads for example if we have a system that indexes logs partitioned by their timestamps. Since we will always index logs that are happening today, all writes will end up going to the same partition creating a write hot spot.

Some workarounds for this problem exist like choosing a prefix to be used in conjunction with the timestamp to help evenly distribute the load, for example, add the service name from which those logs are originating as a first part of the partitioning key and then partition by timestamp. This is of course assuming that all services have more or less the same load, otherwise, this will also create hotspots.

Partitioning by Hash of Key

Because of this risk of skew and hot spots, many distributed datastores use a hash function to determine the partition for a given key.

A good hash function takes skewed data and makes it uniformly distributed. Say you have a 32-bit hash function that takes a string. Whenever you give it a new string, it returns a seemingly random number between 0 and \(2^{32} - 1\). Even if the input strings are very similar, their hashes are evenly distributed across that range of numbers.

In the previous diagram, every log timestamp is hashed using the MD5 hash function (note that in this use case, we don't need a cryptographically strong hash*, we can use any simple hashing function*), which generates an integer, and then we partition them by ranges between those integers bounds, like for example \(p_0\) and \(p_1\) are storing all hashed timestamps between \(0\) and \(16,383\) and so on.

And when a specific log is read, we can hash the timestamp and use the lookup table to know which partition has this log and fetch it. But as you may have already noticed, now we lost the ability to perform range queries as once adjacent keys are now scattered across all the partitions, so their sort order is lost. The only option here is to send range queries to all partitions which is a solution some datastores opt to use, while others don't support range queries in hash partitioned datasets at all.

Some datastores like Cassandra achieve a compromise between the two approaches in which a table has a compound partitioning key consisting of several columns in which only the first part of the key is hashed to help evenly spread out data while other parts of the key are stored in sorted order. So range queries can't be done on the first column of the partitioning key, but once that first column has a fixed value, range queries can be efficiently performed using the rest of the primary key.

This allows an elegant data model for one-to-many relationships. For example, on a social media site, one user may post many updates. If the primary key for updates is chosen to be (user_id, update_timestamp) then you can efficiently retrieve all updates made by a particular user within some time interval, sorted by timestamp. Different users may be stored on different partitions, but within each user, the updates are stored ordered by timestamp on a single partition.

I recommend going back and reviewing SSTables and LSM trees as they relate very well to the concepts discussed here.

Skewed workloads and relieving hot spots

As discussed, hashing a key to determine its partition can help reduce hot spots. However, it can’t avoid them entirely: in the extreme case where all reads and writes are for the same key, you still end up with all requests being routed to the same partition.

For example, imagine if in our previous example where the partitioning is done by (user_id, update_timestamp) that there is some celebrity user_id causing a storm of activity whenever they do something. This can result in a large volume of writes to the same key (where the key is perhaps the user_id of the celebrity or the user_id of the action that people are commenting on). Hashing the key doesn’t help, as the hash of two identical IDs is still the same.

Most systems can't automatically compensate for such a highly skewed load, it's the responsibility of the application developers to account for that. And there exist some techniques that can help moderate this heavily skewed workload, like for example, if one key is known to be hot, it can be prefixed by any small random number, say a 2-digit random number, which will effectively spread the load to 100 different keys allowing those keys to be distributed to different partitions. However, this requires special design and some extra bookkeeping.

Perhaps in the future, data systems will be able to automatically detect and compensate for skewed workloads, but for now, you need to think through the trade-offs for your application.


In this article, we introduced the concept of partitioning data to increase read and write throughput which effectively increases the scalability of the system. We discussed one of the approaches that can be used to perform partitioning which is key-value partitioning and we discussed some of its variants, like simple random partitioning, key range partitioning, and hashed key partitioning. And we learned the differences between those variants in terms of creating read/write hot spots and their ability to support efficient simple and range reads.