Designing Data Intensive Applications: Ch5. Replication (Part 1: Leader-Based Replication)

Designing Data Intensive Applications: Ch5. Replication (Part 1: Leader-Based Replication)

In the previous few articles, we discussed the first part of the book which discussed the foundations of data systems; where we talked about distributed systems concepts, data models and query languages, data structures used for storage and retrieval and data encoding and evolvability.

In this article, we will start discussing the second part of the book which will discuss topics like replication, partitioning, transactions, consistency and consensus, etc.

We will first start by discussing replication as we will first introduce the idea of leaders and followers in leader-based replicated systems, then move on to discuss replication techniques like single leader, multi-leader and leaderless replication.

In this part of the article, we will focus on single leader-based replication, how it works and it handles failures in leaders and replicas. We will also discuss different methods for replication, such as statement and row based replication, WAL replication and trigger-based replication. And finally, we will reflect on how we can use those replication techniques beyond conventional uses and introduce how replication can be useful for performing zero-downtime maintenance and online schema migrations.

Replication

Replication means keeping a copy of the same data on multiple machines that are connected via a network. Replication can be useful for several reasons, such as:

  • To keep data geographically close to your users (and thus reduce read latency).
  • To allow the system to continue working even if some of its parts have failed (and thus increase availability).
  • To scale out the number of machines that can serve read queries (and thus increase read throughput).

Leaders and Followers

Let's first start by considering a very simple replicated system consisting of 3 nodes. In such system, one node will be designated as leader and the other two nodes will be followers.

image.png

It works as follows:

  1. When clients want to write to the database, they must send their requests to the leader, which first writes the new data to its local storage.
  2. Whenever the leader writes new data to its local storage, it also sends the data change to all of its followers as part of a replication log or change stream. Each follower takes the log from the leader and updates its local copy of the database accordingly, by applying all writes in the same order as they were processed on the leader.
  3. When a client wants to read from the database, it can query either the leader or any of the followers. However, writes are only accepted on the leader.

Synchronous Versus Asynchronous Replication

Replicating newly written data in the leader to the replicas can either be done in a synchronous or an asynchronous fashion:

  • Synchronous: In the synchronous approach, the leader receives a write from the client, writes it on its local disk, then replicates it to all replicas and waits for confirmation from all replicas that the data was successfully replicated before it responds with success back to the client.
  • Asynchronous: In the asynchronous approach, the leader receives a write from the client, writes it on its local disk, then it enqueues the changes for background replication, then it responds with a success to the client. Note here that the actual data replication happens outside the boundaries of the write transaction.

As you may have already guessed, synchronous replication seems the safest, because the system doesn't acknowledge writing the data unless all replicas have that data which also guarantees that reads related to this data can be instantaneously served from any replica. But it's also clear that it will cause a large write lag because now the write operation will have to wait until the data was written to all replicas and if one replica in the system is at fault, it will pretty much render the entire system unavailable. For those reasons, synchronous replication is rarely used in commercial systems although most databases and storage systems can be configured to operate this way if needed.

On the other hand, in the asynchronous replication approach, the safety of the data is not guaranteed because once the leader writes the data on its disk, the client gets an acknowledgment of writing the data. If the leader later fails before any of replicas getting this data, this data will be lost. Systems usually work around this defect by defining durability guarantees such that, a write is only considered durable when its replication log is written to disk, so if that node fails, it can recover from the replication log (known as redo logs in some databases). The upside of asynchronous replication however is that results in very fast writes as the actual replication happens in the background and partial failures in some nodes don't affect the system's availability.

In real life systems, usually a mix of both synchronous and asynchronous replication is employed (usually called semi-synchronous). If a system has 3 nodes, one is designated as leader and 2 as replicas. It can be the case that the leader will replicate to only one replica node synchronously and replicates to the other node asynchronously. For example:

image.png

The previous diagram shows such system as described. When the leader receives a writes, it initiates replication to both followers, but it will only wait for 1 acknowledgment which means it will replicate synchronously to only 1 replica and asynchronously to the other replica. The diagram shows that there is a substantial delay before follower 2 processes the message. Normally, replication is quite fast: most database systems apply changes to followers in less than a second. However, there is no guarantee of how long it might take. There are circumstances when followers might fall behind the leader by several minutes or more; for example, if a follower is recovering from a failure, if the system is operating near maximum capacity, or if there are network problems between the nodes.

This configuration kind of has the best of both worlds because:

  • It reduces write lag because it writes to the fastest set of replicas in the system (note that it doesn't designate a specific replica to be the synchronous one, it just sends out a replication request and whichever node is able to fulfill it first, it's considered replicated).
  • Offers better durability guarantees because now we are sure that when the client gets a write acknowledged, it's at least available on 2 nodes not just the leader as in fully asynchronous replication.
  • The overall number of replicas and the number of synchronous and asynchronous replicas can be configured independently to be able to achieve a latency/availability/consistency/durability trade offs as per every system's unique needs.

Setting Up New Followers

From time to time, you need to set up new followers—perhaps to increase the number of replicas, or to replace failed nodes. New followers will have to be spawned up in a state that contains all data like all other followers in the system. Using a normal files copying operations would be very inefficient here and will probably require locking writes in the database to assure a consistent state of the new replica.

Setting up a new follower can be done as follows:

  1. Take a consistent snapshot of the system. This can be done in many databases without locking writes and it's actually very important for taking backups as well.
  2. Restore the snapshot on the new follower which is a very fast operation compared to copying files across the network.
  3. The new follower will request all missing writes since the time the snapshot was taken as it will know exactly its position in the replication log.
  4. Once the new follower processes all missing writes, it will have a small replication queue (almost empty) and it will be all caught up with the rest of the system.
  5. The database system can now accept the follower to join its nodes and start serving requests.

It's worth mentioning that some other databases do this very differently and it quite impressive actually. For example, Aurora, which AWS's version of MySQL, is built on a very cool concept of splitting the compute tier from the storage tier, so actually reader replicas don't have data inside of them, as the data is in the storage tier which is accessible instantaneously to any new node in the system and the readers themselves only have some materialized data pages to improve latency. This makes the process of adding a new replica quite easy. You can read all about this in the Aurora series [here] (sayedalesawy.hashnode.dev/series/amazon-aur..).

Handling Node Outages

Any node in the system can go down, perhaps unexpectedly due to a fault, but just as likely due to planned maintenance (we can model taking a node out of service as a failure event). Being able to reboot individual nodes without downtime is a big advantage for operations and maintenance. So how do you achieve high availability with leader-based replication given node failures?

Follower failure: Catch-up recovery

On its local disk, each follower keeps a log of the data changes it has received from the leader. If a follower crashes and is restarted, or if the network between the leader and the follower is temporarily interrupted, the follower can recover quite easily: from its log, it knows the last transaction that was processed before the fault occurred. Thus, the follower can connect to the leader and request all the data changes that occurred during the time when the follower was disconnected. When it has applied these changes, it has caught up to the leader and can continue receiving a stream of data changes as before.

Leader failure: Failover

Handling a failure of the leader is trickier: one of the followers needs to be promoted to be the new leader, clients need to be reconfigured to send their writes to the new leader, and the other followers need to start consuming data changes from the new leader. This process is called failover.

A failure can be done manually or automatically, the steps involve the following:

  1. Determining that the leader has failed. There are many things that could potentially go wrong: crashes, power outages, network issues, and more. There is no foolproof way of detecting what has gone wrong, so usually systems use something called healthcheck which is basically a bunch of "I am alive" messages bouncing back and forth nodes in the system and when a node stops sending those messages for a set timeout period, it's considered unresponsive.
  2. Choosing a new leader. This could be done through an election process (where the leader is chosen by a majority of the remaining replicas), or a new leader could be appointed by a previously elected controller node. The best candidate for leadership is usually the replica with the most up-to-date data changes from the old leader (to minimize any data loss). Getting all the nodes to agree on a new leader is a classic consensus problem.
  3. Reconfiguring the system to use the new leader. Clients now need to send their write requests to the new leader. If the old leader comes back, it might still believe that it is the leader, not realizing that the other replicas have forced it to step down. The system needs to ensure that the old leader becomes a follower and recognizes the new leader.

There are many problems that can happen during failover that makes it tricky, for example:

  • If asynchronous replication is used, the new leader may not have received all the writes from the old leader before it failed. If the former leader rejoins the cluster after a new leader has been chosen, what should happen to those writes? The new leader may have received conflicting writes in the meantime. The most common solution is for the old leader’s unreplicated writes to simply be discarded, which may violate clients’ durability expectations and it can very bad and hard to debug problems like for example reusing auto-increment old values for primary keys (this actually happened in an incident at github before).
  • In some fault scenarios, it could happen that two nodes both believe that they are the leader. This situation is called split brain, and it is dangerous: if both leaders accept writes, and there is no process for resolving conflicts data is likely to be lost or corrupted.
  • What is the right timeout before the leader is declared dead? A longer timeout means a longer time to recovery in the case where the leader fails. However, if the timeout is too short, there could be unnecessary failovers. For example, a temporary load spike could cause a node’s response time to increase above the timeout, or a network glitch could cause delayed packets. If the system is already struggling with high load or network problems, an unnecessary failover is likely to make the situation worse.
  • A problem that I have personally seen, is that usually readers are much smaller machines than writers, so when a failover happens and a smaller machine becomes leader, that usually results in its CPU/RAM/connections getting saturated pretty quick which will cause further system failures.

There are no easy solutions to these problems. For this reason, some operations teams prefer to perform failovers manually, even if the software supports automatic failover.

Implementation of Replication Logs

Leader based replication is entirely based on the idea of streaming a replication log from the leader to its followers. This replication log can be implemented in multiple different ways. In the following section we will look at a few of those methods.

Statement-based replication

In the simplest case, the leader logs every write request (statement) that it executes and sends that statement log to its followers. For a relational database, this means that every INSERT, UPDATE, or DELETE statement is forwarded to followers, and each follower parses and executes that SQL statement as if it had been received from a client. This sounds reasonable, but it has a ton of limitations and edge cases that render is unusable in most cases, some of those limitations are:

  • Any statement that calls a nondeterministic function, such as NOW() to get the current date and time or RAND() to get a random number, is likely to generate a different value on each replica.
  • If statements use an autoincrementing column for example, they must be executed in exactly the same order on each replica, or else they may have a different effect. This can be limiting when there are multiple concurrently executing transactions.
  • Statements that have side effects (e.g., triggers, stored procedures, user-defined functions) may result in different side effects occurring on each replica, unless the side effects are absolutely deterministic.

Write-ahead log (WAL) shipping

As we discussed before in chapter 3, some databases keep an append only log of all changes that happen in the database and persists this log on disk. This log can be used to rebuild a replica of the data, so to in a replicated database, this log is also sent over the network to the replicas. When the follower processes this log, it builds a copy of the exact same data structures as found on the leader.

The main disadvantage is that the log describes the data on a very low level: a WAL contains details of which bytes were changed in which disk blocks. This makes replication closely coupled to the storage engine. If the database changes its storage format from one version to another, it is typically not possible to run different versions of the database software on the leader and the followers.

Although this might sound like a minor disadvantage, it can make operations very hard. For example, usually when performing a version upgrade in a replicated database, it can be done without a downtime if we can first upgrade the replicas to the new version and then failover the leader to one of the upgraded replicas. But if this version mismatch (at some point, the replicas will be running a different version than the leader) is not allowed, due to perhaps a different in how WAL is written, this is not possible without a downtime.

Logical (row-based) log replication

An alternative is to use different log formats for replication and for the storage engine, which allows the replication log to be decoupled from the storage engine internals. This kind of replication log is called a logical log, to distinguish it from the storage engine’s (physical) data representation.

A logical log for a relational database is usually a sequence of records describing writes to database tables at the granularity of a row:

  • For an inserted row, the log contains the new values of all columns.
  • For a deleted row, the log contains enough information to uniquely identify the row that was deleted.
  • For an updated row, the log contains enough information to uniquely identify the updated row, and the new values of all columns (all the ones that were updated).

Since a logical log is decoupled from the storage engine internals, it can more easily be kept backward compatible, allowing the leader and the follower to run different versions of the database software, or even different storage engines. Also they are much easier for external applications to parse and manipulate if needed.

Trigger-based replication

The replication approaches described so far are implemented by the database system, without involving any application code. In many cases, that’s what you want—but there are some circumstances where more flexibility is needed. For example, if you want to only replicate a subset of the data, or want to replicate from one kind of database to another, or if you need conflict resolution logic, or maybe you need to apply some data transformation before replication, like for example anonymizing/reformatting data. In those cases you might need to implement replication yourself.

A trigger lets you register custom application code that is automatically executed when a data change (write transaction) occurs in a database system. The trigger has the opportunity to log this change into a separate table for example which can later be read by an external process to do whatever application logic needed.

Trigger-based replication typically has greater overheads than other replication methods, and is more prone to bugs and limitations than the database’s built-in replication. However, it can nevertheless be useful due to its flexibility.

Other Uses Of Replication

This section is not really described in the book, but from my personal experience dealing with large data systems, replication is not only used to improve durability, increase availability and scale out latency, but it can also be used to perform complicated schema changes.

Performing schema changes, like for example, adding a new column, can be rather challenging for some databases like MySQL and specially for massive tables. So there are a lot of tools that takes a different approach to this, like for example:

  • lhm: It's an online schema migration tool, developed by Shopify (some other versions exist, I believe a Soundcloud version exists as well), which uses trigger based replication to perform a migration. It basically creates a new table having the new schema, runs a process to copy the old data from the old table table into the new table and sets up triggers to replicate the new data to the new table. This has a lot of problems like contention on locks which in many cases lead to deadlocks.
  • ghost: It's an online schema migration tool, developed by github, it follows a very similar concept as lhm with the only different is that it uses the binlog coming out of the database to stream updates to the new table (called ghost table) and it has a lot of other very useful features that makes operations easier.

At Instabug (were I work), we deal with massive amounts of data, and performing schema changes is just a day-to-day job, so we deal with that a lot. This blog post discusses who we use ghost to migrate large tables in our databases.

Summary

In this chapter, we introduced the idea of data replication and how it can be used to increase availability, improve durability and enable scaling out of our systems. We discussed the concept of leaders and followers and the different methods of leader-based replication. And finally, we have also discussed how we can use those replication techniques beyond conventional uses and introduced how replication can be useful for performing zero-downtime maintenance and online schema migrations.

References