KubeCon + CloudNativeCon Amsterdam | March 30 – April 2 | Best Pricing Ends February 2 | Learn more



How Kubernetes Has Been ‘Transformational’ to Productivity and Culture at uSwitch

By | Blog

When it was founded in 2000, uSwitch helped consumers in the U.K. compare prices for their utilities. The company eventually expanded to include comparison and switching tools for a wide range of verticals, including broadband service, credit cards, and insurance.

Internally, those verticals translated into a decentralized technology infrastructure. Teams—which were organized around the markets their applications price-compared—ran all their own AWS infrastructure and were responsible for configuring load-balancers, EC2 instances, ECS cluster upgrades, and more. 

Though many teams had converged on the idea of using containerization for web applications and deploying to ECS, “everyone was running different kinds of clusters with their own way of building and running them, with a whole bunch of different tools,” says Head of Engineering Paul Ingles. With the increasing cloud and organizational complexity, they had difficulty scaling teams. “It was just inefficient,” says Infrastructure Lead Tom Booth. “We wanted to bring some consistency to the infrastructure that we had.”

Booth and Ingles had both experimented with Kubernetes before, when the company first adopted containerization. Separately, the infrastructure team had done an evaluation of several orchestration systems, in which Kubernetes came out on top. In late 2016, the two began working on building a Kubernetes platform for uSwitch. 

Today, all 36 teams at uSwitch are running at least some of their applications on the new platform, and as a result, the rate of deployments has increased almost 3x. The release rate per person per week has almost doubled, with releases now happening as often as 100 times a day. Moreover, those improvements have been sustained, even as more people were added to the platform. 

Read more about uSwitch’s cloud native journey in the full case study.

Kubernetes Audit: Making Log Auditing a Viable Practice Again

By | Blog

Originally published on Alcide Blog by Nitzan Niv

In the security world, one of the most established methods to identify that a system was compromised, abused or mis-configured is to collect logs of all the activity performed by the system’s users and automated services, and to analyze these logs.

Audit Logs as a Security Best Practice

In general, audit logs are used in two ways:

  1. Proactively identifying non-compliant behavior. Based on a configured set of rules, that should faithfully filter any violation to the organization’s policies, an investigator finds in the audit log entries that prove a non-compliant activity has taken place. With automated filters, a collection of such alerts is periodically reported to compliance investigators.
  2. Reactively investigating a specific operational or security problem. The known problem is traced back to the responsible party, root causes or contributing factors by a post-mortem investigation, following deduced associations from state to causing action and previous state.

Kubernetes Audit Logs

Let us examine how audit logs are configured and used in the Kubernetes world, what valuable information they contain, and how they can be utilized to enhance the security of the Kubernetes-based data center.

The Kubernetes audit log is intended to enable the cluster administrator to forensically recover the state of the server and the series of client interactions that resulted in the current state of the data in the Kubernetes API.

In technical terms, Kubernetes audit logs are detailed descriptions of each call made to the Kubernetes API-Server. This Kubernetes component exposes the Kubernetes API to the world. It is the central touch point that is accessed by all users, automation, and components in the Kubernetes cluster. The API server implements a RESTful API over HTTP and performs all the API operations.

Upon receiving a request, the API server processes it through several steps:

  1. Authentication: establishing the identity associated with the request (a.k.a. principal). There are several mechanisms for authentication.
  2. RBAC/Authorization: The API server determines whether the identity associated with the request can access the combination of the verb and the HTTP path in the request. If the identity of the request has the appropriate role, it is allowed to proceed.
  3. Admission Control: determines whether the request is well formed and potentially applies modifications to the request before it is processed.
  4. Validation: ensures that a specific resource included in a request is valid.
  5. Perform the requested operation. The types of supported operations include:
    • Create resource (e.g. pod, namespace, user-role)
    • Delete a resource or a collection of resources
    • List resources of a specific type (e.g. pods, namespaces), or get a detailed description of a specific resource
    • Open a long-running connection to the API server, and through it to a specific resource. Such a connection is then used to stream data between the user and the resource. For example, this enables the user to open a remote shell within a running pod, or to continuously view the logs of an application that runs inside a pod.
    • Monitors a cluster resource for changes.

A request and its processing steps may be stored in the Kubernetes audit log. The API server may be configured to store all or some of these requests, with varying degrees of details. This audit policy configuration may also specify where the audit logs are stored. Analysis tools may request to receive these logs through this hook.

Challenges of Auditing a Kubernetes Cluster

While the principles of audit logs collection and analysis naturally apply to the cloud, and specifically to data centers built on Kubernetes, in practice the scale, the dynamic nature and the implied context of such environments make analyzing audit logs difficult, time consuming and expensive.

The scale of the activities in the cluster means any analysis that relies solely on manual inspection of many thousands of daily log entries is impractical. A “quiet” cluster with no human-initiated actions and no major shifts in applicative activity, is still processing thousands of API calls per hour, generated by the internal Kubernetes mechanisms that ensure that the cluster is alive, its resources are utilized according to the specified deployments, and failures are automatically identified and recovered from. Even with log filtering tools, the auditor needs a lot of experience, intuition and time to be able to zoom in on a few interesting entries.

The dynamic nature of a system like a Kubernetes cluster means that workloads are being added, removed or modified at a fast pace. It’s not a matter of an auditor focusing on access to a few specific workloads containing a database – it’s a matter of identifying which workloads contain a sensitive database at each particular instant in the audited time period, which users and roles had legitimate reason to access each of these database-workloads at what times, and so on.

Furthermore, while finding some interesting results is just a matter of finding the specific entries in the log that are known in advance to correlate to undesirable activity, finding suspicious but previously unknown activity in the logs requires a different set of tools and skills, especially if this suspicious behavior can only be understood from a wider context over a prolonged period, and not just one or two related log entries. For example, it is rather simple to detect a user’s failure to authenticate himself to the system, as each login attempt shows up as a single log entry. However, identifying a potential theft of user credentials may only be detected if the auditor connects seemingly different entries into a whole pattern, for example access to the system using a specific user’s credentials from a previously unknown Internet address outside the organization, while the same user’s credentials are used concurrently to access the system from within the organization’s network.

Making Log Auditing a Viable Practice Again

In order to make the audit of a large, complex Kubernetes cluster a viable practice, we need to adapt the auditor’s tools to this environment. Such tools would automatically and proactively identify anomalous and problematic behavior, specifically in the context of Kubernetes control security, in order to:

  1. Detect security-related abuse of Kubernetes cluster, especially behavior that can only be detected from observing extended context over multiple activities.
  2. Focus compliance investigations on Kubernetes misuses that are beyond detection by simple log filtering rules.

Of course, in order to achieve these goals, such a tool must be able to:

  1. Automatically analyze Kubernetes Audit logs, detecting anomalous behavior of users and automated service accounts and anomalous access to sensitive resources.
  2. Summarize the detected anomalies as well as important trends and statistics of the audit information for user-friendly understanding. At the end of the day, the auditor should have enough information that she can understand, qualify or ignore the results of the automatic analysis.

Let us describe some of the more complex threat scenarios that we would like the envisioned audit log analyzer to detect automatically:

  • Adversaries may steal the credentials of a specific user or service account (outside the cluster) or capture credentials earlier in their reconnaissance process through social engineering as a means of gaining initial access to cluster resources.
  • Adversaries may use token theft (inside the cluster, from compromised or legitimately accessible resources) or token impersonation to operate under a different user or service account (i.e., security context) to perform actions for lateral movement, privilege escalation, data access and data manipulation, while evading detection.
  • Adversaries may use insufficient or misconfigured RBAC to gain access under their own security context to privileged and sensitive resources (cluster’s secrets, configuration maps, authorization policies etc.) for lateral movement, privilege escalation, data access and data manipulation.
  • Adversaries may exploit vulnerabilities in Kubernetes API Server (authentication, authorization, admission control or validation requests processing phases) to gain access to privileged and sensitive resources (cluster’s secrets, configuration maps, authorization policies etc.) for initial access, lateral movement, privilege escalation, data access and data manipulation.

Obviously, detection of such scenarios is way beyond a simple filtering of the audit log using predetermined rules. Sophisticated machine learning algorithms must be used to achieve this automatically, at scale and within a reasonable time after actual threatening activity.

The analyzer tool should collect features of the operational and security activity of the cluster as they are presented in the log and feed them to the machine learning algorithm for measurement, weighting and linking together. At the end of this process, possible patterns of suspicious behavior can be presented to the auditor for validation and further investigation.


Auditing of system logs is a well-established practice to identify security threats to the system, whether before these threats have a detrimental effect on after the threat is realized. In fact, some forms of periodic audit are mandated by laws and regulations.

Nevertheless, identifying suspicious patterns in audit logs of complex, large-scale, dynamic systems like modern day Kubernetes clusters is a formidable task. While using some sort of automation is mandatory for such and analysis, most existing audit-tools are just mindless filters, hardly assisting the auditor in the deeper challenges of her task.

In this article we presented a vision for an automated Kubernetes audit log analyzer that goes far beyond that. Using machine learning, such a tool can autonomously detect potential threatening pattern in the log that the auditor can focus on, even in real time. Additionally, summarizing the information in the audit log in a way that is user-digestible lets the auditor quickly validate the identified patterns as well as helps her investigate additional hidden suspicious activities.


Case study: TiKV in JD Cloud

By | Blog

Guest post by Can Cui, Infrastructure Specialist at JD Cloud

JD Cloud, is a full-service cloud computing platform and integrated cloud service provider. Like Microsoft Azure, we deliver comprehensive cloud computing services ranging from infrastructure building and strategic consulting to business platform development and operations. By November, 2018, we had achieved technical innovation in more than 120 cloud computing products and services, exceeding 330 thousand registered users. In 2018, Forrester, the world-renowned research and advisory firm, evaluated our product capacity, strategic layout, marketing performance, and other factors, and recognized JD Cloud as a “Strong Performer.”

Our Object Storage Service (OSS) provides cloud storage services of high availability, low cost, and strong security for enterprises and individual developers. Previously, we used MySQL to store OSS metadata. But as the metadata grew rapidly, standalone MySQL couldn’t meet our storage requirements. In the future, we expect to hit 100 billion or even 1 trillion rows. We faced severe challenges in storing unprecedented amounts of data that kept soaring.

In this post, I’ll deep dive into how TiKV, an open-source distributed transactional key-value database, empowered us to manage huge amounts of OSS metadata with a simple and horizontally scalable architecture. I’ll introduce why we chose TiKV, how we’re using it, and some thoughts about future plans.

Our pain points

JD Cloud’s OSS provides a full range of products including file upload, storage, download, distribution, and online processing. We aim to offer a safe, stable, massive, and convenient object storage service for users. 

In this section, I’ll elaborate on challenges we encountered in storing OSS metadata and our exploration to redesign the metadata storage system.

Challenges caused by rapid data growth

As shown in the figure below, we previously used MySQL to store OSS metadata (such as image size). Metadata was grouped in buckets, which are similar to namespaces. 

Original OSS metadata storage system based on MySQL 

Many similar systems use such a design. But facing business growth with metadata booming, we were plagued by the following challenges:

The number of objects in a single bucket was expected to become huge.

We anticipated that the number of objects in a single bucket would grow large, and thought hash-based sharding and scan merge could resolve this problem.

The number of objects in a single bucket unexpectedly increased rapidly.

In the early days, we had a small number of metadata. We might need only four buckets to store them. But as our business developed, the metadata surged at an unanticipated rate, so we needed to redivide the metadata into 400 buckets. 

In this case, we had to rehash and rebalance data. However, the data rehashing process was very complicated and troublesome.

The storage capacity of our previous metadata storage system was limited, and this system couldn’t scale horizontally.

We hoped our metadata storage database could scale infinitely; for example, to hold 100 billion or even 1 trillion rows of data. But this wasn’t the case.

Our exploration

To conquer the difficulties mentioned above, we redesigned our metadata storage system as shown in the following figure:

Redesigning the OSS metadata storage system

The core technique of this solution was making the most data static, because static data was easy to store, migrate, and split. Every day, we made the data written on the previous day static, and merged the static data into the historical data. 

However, as the following diagram reveals, this solution had two problems:

  • Data distribution was complex, so it was hard to manage.
  • Data scheduling was inflexible, which made system maintenance more difficult.

Complexity of the metadata storage system

Therefore, we began to look for a new solution: a globally ordered key-value store with great storage capacity and elastic scalability. Finally, we found TiKV, and it turns out it’s a perfect match for storing enormous amounts of data.

What is TiKV

TiKV is a distributed transactional key-value (KV) database originally created by PingCAP to complement TiDB, an open-source MySQL-compatible NewSQL Hybrid Transactional/Analytical Processing (HTAP) database.

As an incubating project of Cloud Native Computing Foundation (CNCF), TiKV is intended to fill the role of a unifying distributed storage layer. TiKV excels at working with large amounts of data by supporting petabyte-scale deployments spanning trillions of rows.

TiKV complements other CNCF project technologies like etcd, which is useful for low-volume metadata storage, and you can extend it by using stateless query layers that speak other protocols.

The TiKV architecture

The overall architecture of TiKV is illustrated in the figure below:

TiKV architecture

TiKV connect to clients. To understand how TiKV works, you need to understand some basic terms:

  • Store: A store refers to a tikv-server instance, which serves as a storage node in the cluster. There are one or more stores within each disk.
  • Region: A Region is the basic unit of data movement and is replicated by the Raft protocol. Within each store, there are many Regions. Each Region is replicated to several nodes.
  • Raft group: A Raft group consists of the replicas of the same Region on different nodes.
  • Placement Driver (PD): PD schedules the load balancing of the data among different TiKV nodes. 

TiKV’s features

TiKV has the following features:

Why we chose TiKV

After we investigated many database products, we chose TiKV because it has the following advantages:

  • TiKV supports a globally-ordered key-value store, and it is easy to horizontally scale. This fulfills the requirements for metadata storage of our OSS.
  • Through rigorous tests, TiKV demonstrates excellent performance that meets the demands of our application. 
  • TiKV boasts an active community, with complete documentation and ecosystem tools.
  • TiKV is a CNCF incubation-level hosted project. It iterates rapidly with features under fast development. 
  • Compared with TiDB server’s code, TiKV’s code is simpler. We can further develop TiKV based on our needs.

Besides the advantages above, TiKV also passed our tests, including:

  • The feature test. We tested whether TiKV’s main features satisfied the requirements of our application.
  • The performance test. We tested whether queries per second (QPS), and the average, 90th percentile, and 99th percentile query latencies were ideal.
  • The fault injection test. Engineers specializing in data storage tend to focus more on system behavior in abnormal conditions rather than on performance status. In addition, distributed storage is more complex than standalone storage. Thus, we simulated various machine, disk, and network faults. We even randomly combined these faults and triggered abnormalities in the system to test the system behavior. 
  • The staging environment test. Before we deployed TiKV to production on a large scale, we ran TiKV on some production applications that were less important. Then, we collected some issues and problems that could be optimized.

The test results showed that TiKV met our requirements for system performance and security. Then, we applied TiKV in our OSS metadata storage system, as shown in the following figure:

OSS metadata storage system based on TiKV 

Migrating data from MySQL to TiKV

Many TiDB users have migrated data from MySQL to TiDB, but far fewer have migrated data to TiKV. We gained firsthand experience in data migration from MySQL to TiKV. 

This section covers how we migrated data from MySQL to TiKV, including our migration policy, the traffic switch process, and how we verified data.

The data migration policy

The following figure shows our data migration policy:

Data migration policy

The key points of this policy are as follows:

  • To guarantee data security, we enabled the doublewrite buffer for all the production data.
  • We set the existing data read-only and migrated this data to TiKV. During the migrating process, the incremental data was directly written to TiKV. 
  • Every day, we set the incremental data generated on the previous day to be static, and compared this data in TiKV to that in MySQL for data verification. If a doublewrite failed, we would use the data in MySQL. 

Traffic switching

The whole traffic switch process consisted of three steps:

  1. Switched the existing traffic to TiKV, and verified reads.

To switch existing traffic, we made the existing data static to simplify data migration, data comparison, and rollback processes.

  1. Switched the incremental traffic to TiKV, and verified reads and writes.

To switch incremental traffic, we performed doublewrites of incremental data to TiKV and MySQL. When an abnormality occurred, we rolled back our application to MySQL. This would not affect the online application.

  1. Took MySQL offline after verifying data in TiKV.

TiKV exhibited outstanding results in the test environment. Therefore, we used TiKV as the primary database during the doublewrite process.

Data verification

During data verification, the most challenging problem was verifying incremental data, as incremental data was changing daily. Thus, we decided to doublewrite data to TiKV and MySQL, made incremental data static each day, and verified data in TiKV against that in MySQL to check whether data in TiKV was reliable (no data loss or inconsistency).

However, in this case, we might run across a problem: writing data to TiKV succeeded, but writing to MySQL failed. The two writes were executed in different transactions, so they were not necessarily both successful or unsuccessful, especially when reads and writes were frequent. Under this circumstance, we would check the operational records of the application layer to determine whether the issue was caused by TiKV or the application layer.

The application status

Currently, TiKV serves as the primary database for our OSS metadata storage application. We plan to take MySQL offline at the end of 2019.

The current application status is as follows:

  • We’ve deployed more than 10 TiKV clusters.
  • In the production environment, a single cluster’s QPS reaches 40,000 (half reads and half writes) at peak time.
  • The data volume of the largest cluster hits 20+ billion rows, and this cluster contains  500,000+ Regions.
  • The latency is about 10 milliseconds.

We are now testing TiKV 3.0, and expect to deploy it to the production environment in the fourth quarter of 2019.

What’s next

Thanks to the horizontal scalability of TiKV, we can deal with an enormous amount of OSS metadata in a storage architecture that is simpler than before. 

In the future, we’ll optimize TiKV in the following ways:

  • Implement disaster recovery at the TiKV layer. Currently, we perform disaster recovery at the application layer. Later, we’ll implement disaster recovery at the TiKV layer. We also expect subsequent versions of TiKV will support this feature. 
  • Optimize the cluster hardware resources. Because object metadata storage is a storage-intensive application, we want to reduce hardware costs. For example, we hope we can use larger disks (8 TB or 10 TB) or cheaper disks. We’ll work together with PingCAP engineers to optimize hardware resources. 
  • Optimize the Region scheduling mechanism. Currently, TiKV’s scheduling mechanism is complicated. This is troublesome for storage-intensive applications that have a huge amount of data. We’ll optimize the Region scheduling process so that when data upsurges we don’t need to migrate data to another machine.

Chinese version: TiKV 在京东云对象存储元数据管理的实践

How Slack Leverages Vitess to Keep Up with Its Ever-Growing Storage Needs

By | Blog

If you live on Slack, multiply your usage by millions of active users worldwide and you’ll quickly understand why the company ran into data storage challenges.

In the fall of 2016, Slack was dealing with hundreds of thousands of MySQL queries per second and thousands of sharded MySQL hosts in production. The company’s busiest database hosts had to handle all the queries from larger and larger customers, so they were running hotter and hotter while there were thousands of other hosts that were mostly idle. And the burden of manually managing the fleet of hosts in the legacy configuration was taxing the operations teams. 

The Slack engineering team knew it needed a new approach to scaling and managing databases for the future. The challenges were technological, architectural, as well as process-related, and they needed to be solved while the system was running at full scale. 

“We have thousands of lines of code containing SQL queries, some of which expect MySQL-specific semantics,” says Principal Engineer Michael Demmer. “That plus our years of operational experience and tooling meant that we really did not want to move away from MySQL at the heart of our database systems.” The company also wanted to continue to host its own instances running in AWS, on a platform that incorporated new technology now, but could still be extensible later. “Vitess was really the only approach that fit all our needs,” says Demmer. 

The team had confidence given Vitess’s success at YouTube, and after getting familiar with the code, decided to build upon the technology in order to make it work at Slack. (Everything has been upstreamed in the open source project.) It took about seven months to go from whiteboard to production. Currently, around 45% of Slack’s overall query load—about 20 billion total queries per day, with some 500,000 queries per second at peak times—is running on Vitess. And all new features are written to use Vitess only.

“Vitess has been a clear success for Slack,” says Demmer. “The project has both been more complicated and harder to do than anybody could have forecast, but at the same time Vitess has performed in its promised role a lot better than people had hoped for.” Critical application features like @-mentions, stars, reactions, user sessions, and others are now fully backed by Vitess, “powered in ways that are much more sustainable and capable of growth,” he says. Performance-wise, the connection latency with Vitess is an extra millisecond on average, not discernable to users. “Our goal is that all MySQL at Slack is run behind Vitess,” says Demmer. “There’s no other bet we’re making in terms of storage in the foreseeable future.”

Read more about Slack’s use of Vitess in the full case study.

Announcing the CNCF Job Board

By | Blog

We are excited to announce the availability of the CNCF job board, the official job board of CNCF. According to the 2018 Linux Foundation Open Source Jobs Report, containers are rapidly growing in popularity and importance, with 57% of hiring managers seeking that expertise.

The CNCF job board leverages an engaged audience of over 500 members and tens of thousands of monthly visitors to help employers find the perfect candidate. It’s free to both post and apply; CNCF member job openings get a featured listing.

The CNCF job board is the ideal way to connect with the world’s top developers and hire great people. We invite you to post your job, search for candidates or find your next opportunity through the site.

kubectl flags in your plugin

By | Blog

Originally posted by Gianluca Arbezzano on Gianarb.it

This is not at all a new topic, no hacking involved, but it is something everybody needs to know where we design kubectl plugin.

I was recently working at one and I had to make the user experience as friendly as possible compared with the kubectl, because that’s what a good developer does! Tricks other developers to make their life comfortable, if you are used to do:

$ kubectl get pod -n your-namespace -L app=http

To get pods from a particular namespace your-namemespace filtered by label app=http and your plugin does something similar or it will benefit from an interaction that remembers the classic get you should re-use those flags.

Example: design a kubectl-plugin capable of running a pprof profile against a set of containers.

My expectation will be to do something like:

$ kubectl pprof -n your-namespace -n pod-name-go-app

The Kubernetes community writes a lot of their code in Go, it means that there are a lot of libraries that you can re-use.

kubernetes/cli-runtime is a library that provides utilities to create kubectl plugins. One of their packages is called genericclioptions and as you can get from its name the goal is obvious.

// import "github.com/spf13/cobra"
// import "github.com/spf13/pflag"
// import "k8s.io/cli-runtime/pkg/genericclioptions"

// Create the set of flags for your kubect-plugin
flags := pflag.NewFlagSet("kubectl-plugin", pflag.ExitOnError)
pflag.CommandLine = flags

// Configure the genericclioptions
streams := genericclioptions.IOStreams{
    In:     os.Stdin,
    Out:    os.Stdout,
    ErrOut: os.Stderr,

// This set of flags is the one used for the kubectl configuration such as:
// cache-dir, cluster-name, namespace, kube-config, insecure, timeout, impersonate,
// ca-file and so on
kubeConfigFlags := genericclioptions.NewConfigFlags(false)

// It is a set of flags related to a specific resource such as: label selector
(-L), --all-namespaces, --schema and so on.
kubeResouceBuilderFlags := genericclioptions.NewResourceBuilderFlags()

var rootCmd = &cobra.Command{
    Use:   "kubectl-plugin",
    Short: "My root command",
    Run: func(cmd *cobra.Command, args []string) {

// You can join all this flags to your root command

This is the output:

$ kubectl-plugin --help
My root command

  kubectl-plugin [flags]

      --as string                      Username to impersonate for the operation
      --as-group stringArray           Group to impersonate for the operation, this flag can be repeated to specify multiple groups.
      --cache-dir string               Default HTTP cache directory (default "/home/gianarb/.kube/http-cache")
      --certificate-authority string   Path to a cert file for the certificate authority
      --client-certificate string      Path to a client certificate file for TLS
      --client-key string              Path to a client key file for TLS
      --cluster string                 The name of the kubeconfig cluster to use
      --context string                 The name of the kubeconfig context to use
  -f, --filename strings               identifying the resource.
  -h, --help                           help for kubectl-pprof
      --insecure-skip-tls-verify       If true, the server's certificate will not be checked for validity. This will make your HTTPS connections insecure
      --kubeconfig string              Path to the kubeconfig file to use for CLI requests.
  -n, --namespace string               If present, the namespace scope for this CLI request
  -R, --recursive                      Process the directory used in -f, --filename recursively. Useful when you want to manage related manifests organized within the same directory. (default true)
      --request-timeout string         The length of time to wait before giving up on a single server request. Non-zero values should contain a corresponding time unit (e.g. 1s, 2m, 3h). A value of zero means don't timeout requests. (default "0")
  -l, --selector string                Selector (label query) to filter on, supports '=', '==', and '!='.(e.g. -l key1=value1,key2=value2)
  -s, --server string                  The address and port of the Kubernetes API server
      --token string                   Bearer token for authentication to the API server
      --user string                    The name of the kubeconfig user to use

Kubernetes Scheduler 101

By | Blog

Originally published on Magalix Blog by Mohamed Ahmed

What is Kubernetes Scheduling?

If you’ve read any Kubernetes documentation, books, or articles you’ve undoubtedly seen the word “schedule” in phrases like “the Pod gets scheduled to the next available node” for example. Kubernetes Scheduling involves mush more than just placing a pod on a node. In this article, we discuss the different mechanisms that Kubernetes follows when it needs to deal with a new pod, and the components entailed in the process.

What happens when you create a Pod on a Kubernetes Cluster?

In a matter of seconds, the Pod is up and running on one of the cluster nodes. However, a lot has happened within those seconds. Let’s see:

  1. While scanning the API server (which it is continuously doing), the Kubernetes Scheduler detects that there is a new Pod without a nodeName parameter. The nodeName is what shows which node should be owning this Pod.
  2. The Scheduler selects a suitable node for this Pod and updates the Pod definition with the node name (though the nodeName parameter).
  3. The kubelet on the chosen node is notified that there is a pod that is pending execution.
  4. The kubelet executes the Pod, and the latter starts running on the node.

How Kubernetes Selects the Right node?

Perhaps the hardest part of the above steps is when the Scheduler decides which node it should select for running the pod. Indeed, this part takes the most work as there are several algorithms that the Scheduler must use to make this decision. Some of those algorithms depend on user-supplied options, while Kubernetes itself calculates others. They can be explained like a set of questions that the Scheduler asks the node to decide it.

Do you have what it takes to run this pod (predicate)?

A node may be overloaded with so many busy pods consuming most of its CPU and memory. So, when the scheduler has a Pod to deploy, it determines whether or not the node has the necessary resources. If a Pod was deployed to a node that does not have enough memory (for example) that the Pod is requesting, the hosted application might behave unexpectedly or even crash.

Sometimes, the user needs to make this decision on behalf of Kubernetes. Let’s say that you’ve recently purchased a couple of machines equipped with SSD disks, and you want to use them explicitly for the MongoDB part of the application. To do this, you select the nodes through the node labels in the pod definition. When a node does not match the provided label, it is not chosen for deploying the Pod.


As demonstrated in the above graph, the predicate decision resolves to either True (yes, deploy the pod on that node) or False (no, don’t deploy on that one).

Are you a better candidate for having this pod (priorities)?

In addition to true/false decisions a.k.a predicates, the scheduler executes some calculations (or functions) to determine which node is more suited to be hosting the pod in question.

For example, a node where the pod image is already present (like it’s been pulled before in a previous deployment) has a better chance of having the pod scheduled to it because no time will be wasted downloading the image again.

Another example is when the scheduler favors a node that does not include other pods of the same Service. This algorithm helps spread the Service pods on multiple nodes as much as possible so that one node failure does not cause the entire Service to go down. Such a decision-making method is called the spreading function.

Several decisions, like the above examples, are grouped, and weight is calculated for each node based on the final decision. The node with the highest priority wins the pod deployment.

The final decision

You may be asking, if there are so many factors that the Kubernetes Scheduler must consider before selecting a node for deploying the pod, how does it get to choose the right one?

  1. Well, the decision process is done as follows:
  2. The scheduler determines all the nodes that it knows they exist and are healthy.
  3. The scheduler runs the predicate tests to filter out nodes that are not suitable. The rest of the nodes form a group of possible nodes.
  4. The scheduler runs priority tests against the possible nodes. Candidates are ordered by their score with the highest ones on the top. At this point, the highest-scoring possible node gets chosen. But sometimes there may be more than one node with the same score.
  5. If nodes have the same score, they are moved to the final list. The Kubernetes Scheduler selects the winning node in a round-robin fashion to ensure that it equally spreads the load among the machines.

What if that was not the best decision?

In busy Kubernetes Clusters, the time between the Scheduler choosing the right node and the kubelet on that node executing the pod may be sufficient for changes to occur on the nodes. Even if that time is no more than a few milliseconds, a pod may get terminated on one of the nodes that were filtered out due to insufficient memory. That node could’ve had a higher score on the priority test only if it wasn’t overloaded back then. But now, perhaps a less-suitable node was selected for the pod.

Some projects aim at addressing this situation like the Kubernetes Descheduler Project. In this application, the pod is automatically evicted from the node if another node proved to be a better choice for pod-scheduling. The pod is returned to the schedule to deploy it again to the right node.

Perhaps a more difficult situation could occur when the opposite scenario happens. Let’s say that a node was tested against whether or not it could provide 2 GB of memory. At the time the Scheduler was doing the predicate check, the node did have some spare RAM. However, while kubelet is executing the pod against the node, a DaemonSet was deployed to the same node. This DaemonSet entailed some resource-heavy operation that consumed the remaining 2 GB. Now, when the pod tries to run, and since it is missing the amount of memory it requires to function correctly, it fails. If this pod was deployed using just a pod definition, then the application that it runs on would fail to start, and Kubernetes could do nothing about it. However, if this pod was part of a pod controller like a Deployment or a ReplicaSet, then once it fails, the controller will detect that there is a smaller number of replicas than it should be handling. Accordingly, the controller will request another pod to be scheduled. The Scheduler will run all the checks again and schedules the pod to a different node. This is one of the reasons why it is always advised to use a higher-level object like Deployments when creating pods.

User-defined decision-making

Earlier in this article, we mentioned that a user could simply choose to run a pod on a specific node using the .spec.nodeSelector parameter in the Pod definition or template. The

nodeSelector selects nodes that have specific one or more labels. However, sometimes, user

requirements get more complicated. A nodeSelector, for example, selects nodes that have all the labels defined in the parameter. What if you want to make a more flexible selection?

Node Affinity

Let’s consider our earlier example when we wanted to schedule our pod to run on the machines with the SSD disks. Let’s say that we want them also to use the eight-cored hosts. Node affinity allows for flexible decisions like this. The following pod template chooses nodes that have labels of feature=ssd or feature=eight-cores:

apiVersion: v1
kind: Pod
 name: mongo
       - matchExpressions:
         - key: feature
           operator: In
           - ssd
           - eight-cores
 - name: mongodb
   image: mogo

The requiredDuringSchedulingIgnoredDuringExecution option

There’s a new option here: requiredDuringSchedulingIgnoredDuringExecution. It’s is easier than it looks. It means that we need to run those pods only on nodes labeled feature=ssd or feature=eight-cores. We don’t want the scheduler to make decisions outside of this set of nodes. This is the same behavior as the nodeSelector but with a more expressive syntax.

The preferredDuringSchedulingIgnoredDuringExecution option

Let’s say that we’re interested in running the pod on our selected nodes. But, since launching the pod is of an absolute priority, we demand to run it even if the selected nodes are not available. In this case, we can use the preferredDuringSchedulingIgnoredDuringExecution option. This option will try to run the pod on the nodes specified by the selector. But if those nodes are not available (failed the tests), the Scheduler will try to run the pod on the next best node.

The Node Anti-Affinity

Some scenarios require that you don’t use one or more nodes except for particular pods. Think of the nodes that host your monitoring application. Those nodes shouldn’t have many resources due to the nature of their role. Thus, if other pods than those which have the monitoring app are scheduled to those nodes, they hurt monitoring and also degrades the application they are hosting. In such a case, you need to use node anti-affinity to keep pods away from a set of nodes. The following is the previous pod definition with anti-affinity added:

apiVersion: v1
kind: Pod
 name: mongo
       - matchExpressions:
         - key: feature
           operator: In
           - ssd
           - eight-cores
         - key: role
           operator: NotIn
           - monitoring

 - name: mongodb
   image: mogo

Adding another key to the matchExpressions with the operator NotIn will avoid scheduling the mongo pods on any node labelled role=monitoring.

Learn how to continuously optimize your k8s cluster

Nodes taints and tolerations

While nodes anti-affinity patterns allow you to prevent pods from running on specific nodes, they suffer from a drawback: the pod definition must explicitly declare that it shouldn’t run on those nodes. So, what if a new member joins the development team, writes a Deployment for her application, but forgets to exclude the monitoring nodes from the target nodes? Kubernetes administrators need a way to repel pods from nodes without having to modify every pod definition. That’s the role of taints and tolerations.

When you taint a node, it is automatically excluded from pod scheduling. When the schedule runs the predicate tests on a tainted node, they’ll fail unless the pod has toleration for that node. For example, let’s taint the monitoring node, mon01:

kubectl taint nodes mon01 role=monitoring:NoSchedule

Now, for a pod to run on this node, it must have a toleration for it. For example, the following .spec.toleration:

- key: "role"
  operator: "Equal"
  value: "monitoring"
  effect: "NoSchedule"

matches the key, value, and effect of the taint on mon01. This means that mon01 will pass the predicate test when the Scheduler decides whether or not it can use it for deploying this pod.

An important thing to notice, though, is that tolerations may enable a tainted node to accept a pod but it does not guarantee that this pod runs on that specific node. In other words, the tainted node mon01 will be considered as one of the candidates for running our pod. However, if another node has a higher priority score, it will be chosen instead. For situations like this, you need to combine the toleration with nodeSelector or node affinity parameters.


  • The Kubernetes Scheduler is the component in charge of determining which node is most suitable for running pods.
  • It does that using two main decision-making processes:
    • Predicates: which are a set of tests, each of them qualifies to true or false. A node that fails the predicate is excluded from the process.
    • Priorities: where each node is tested against some functions that give it a score. The node with the highest score is chosen for pod deployment.
  • The Kubernetes Scheduler also honors user-defined factors that affect its decision:
    • Node Selector: the .spec.nodeSelector parameter in the pod definition narrows down node selection to those having the labels defined in the nodeSelector.
    • Node affinity and anti-affinity: those are used for greater flexibility in node selection as they allow for more expressive selection criteria. Node affinity can be used to guarantee that only the matching nodes are used or only to set a preference.
  • Taints and tolerations work in the same manner as node affinity. However, their default action is to repel pods from the tainted nodes unless the pods have the necessary tolerations (which are just a key, a value, and an effect). Tolerations are often combined with node affinity or node selector parameters to guarantee that only the matched nodes are used for pod scheduling.

Announcing the CNCF Prometheus Project Journey Report

By | Blog

Today we are very excited to release our Project Journey Report for Prometheus. This is the third report we have issued for CNCF graduated projects following reports for Kubernetes and Envoy.

Prometheus is a widely-adopted open source metrics-based monitoring and alerting system. When Prometheus joined CNCF in May 2016, the foundation started promotional efforts to help sustain, nurture, and expand the Prometheus community. These efforts have included blog posts, email newsletter mentions, social media support, CNCF-hosted webinars, and end-user case studies. 

Today, Prometheus is kicking off its fourth PromCon in Germany, which is sold out with more than 200 attendees. CNCF has helped administer the events in 2017, 2018, and 2019, and is working with the Prometheus maintainers to plan how the event should expand in the future including adding an event in North America.

This report attempts to objectively assess the state of the Prometheus project and how CNCF has impacted the progress and growth of Prometheus. For the report, we pulled data from multiple sources, particularly CNCF’s own DevStats tool, which provides detailed development statistics for all CNCF projects. 

Some of the highlights of the report include:

  • Code Contributions – The total number of companies contributing code has increased by 258% since Prometheus joined CNCF, from 202 to 723. 

Cumulative growth of contributions by company since Prometheus project launch (Q1 2014 – Q3 2019)

  • Development Velocity – Prometheus has enjoyed an 825% expansion of individual contributors over the three years since the project joined CNCF.

Cumulative growth in commits by quarter (Q1 2014 – Q3 2019)

  • Documentation – The number of documentation commits has increased by 241% since Prometheus joined CNCF.

Growth in participation in Prometheus project documentation (Q1 2014 – Q3 2019)

Since joining CNCF in 2016, Prometheus has recorded:

  • >6.3 K contributors
  • >13.5 K code commits
  • >7.2 K pull requests 
  • >113 K contributions
  • 723 contributing companies

Prometheus is growing quickly – in fact, it is among the top three CNCF projects in terms of velocity. As we continue to collaborate with the project maintainers, we only expect this rapid growth to continue.

This report is part of CNCF’s commitment to fostering and sustaining an ecosystem of open source, vendor-neutral projects. Please read and enjoy the report, share your feedback with us – and stay tuned for more project journey reports for other projects.

Cloud Native Chaos Engineering – Enhancing Kubernetes Application Resiliency

By | Blog

Guest post by Uma Mukkara, COO, MayaData

Extending Cloud Native Principles to Chaos Engineering

Faults are bound to happen no matter how hard you test to find them before putting your software into production – clouds and availability zones will have issues, networks will drop, and, yes, bugs will make their presence felt. Resilience is how well a system withstands such faults – a highly resilient system, for example, one built with loosely coupled microservices that can themselves be restarted and scaled easily, overcomes such faults without impacting users. Chaos Engineering is the practice of injecting faults into a system before they naturally occur. Chaos Engineering is now accepted as an essential  approach for ensuring that today’s frequently changing and highly complex systems are achieving the resilience required. Through chaos engineering, unanticipated failure scenarios can be discovered and corrected before causing user issues.  

Broad adoption has made Kubernetes one of the most important platforms for software development and operations. The word “Cloud Native” is an overloaded term that has been co-opted by many traditional vendors to mean almost anything; even CNCF has allowed the use of the term cloud native to describe technologies that predate the cloud native pattern by, in some cases, decades. For the purposes of this blog, I’d like to use a more technical definition of cloud native; cloud native is here defined as an architecture where the components are microservices that are loosely coupled and, more specifically, are deployed in containers that are orchestrated by Kubernetes and related projects.  

In this blog, I would like to introduce a relatively new or less frequently used term called “Cloud Native Chaos Engineering”, defined as engineering practices focused on (and built on) Kubernetes environments, applications, microservices, and infrastructure. 

CNCF is, first and foremost, an open-source community (while some projects may not be strictly cloud native, they are all open-source). If Kubernetes had not been open-source, it would not have become the defacto platform for software development and operations. With that in mind, I’d like to stake the claim that Cloud Native Chaos Engineering is necessarily based on open source technologies.

4 Principles of a Cloud Native Chaos Engineering Framework

  1. Open source – The framework has to be completely open-source under the Apache2 License to encourage broader community participation and inspection. The number of applications moving to the Kubernetes platform is limitless. At such a large scale, only the Open Chaos model will thrive and get the required adoption.
  2. CRDs for Chaos Management – Kubernetes native – defined here as using Kubernetes CRDs as APIs for both Developers and SREs to build and orchestrate chaos testing. The CRDs act as standard APIs to provision and manage the chaos.
  3. Extensible and pluggable – One lesson learned why cloud native approaches are winning is that their components can be relatively easily swapped out and new ones introduced as needed.  Any standard chaos library or functionality developed by other open-source developers should be able to be integrated into and orchestrated for testing via this pluggable framework.
  4. Broad Community adoption – Once we have the APIs, Operator, and plugin framework, we have all the ingredients needed for a common way of injecting chaos. The chaos will be run against a well-known infrastructure like Kubernetes or applications like databases or other infrastructure components like storage or networking. These chaos experiments can be reused, and a broad-based community is useful for identifying and contributing to other high-value scenarios. Hence a Chaos Engineering framework should provide a central hub or forge where open-source chaos experiments are shared, and collaboration via code is enabled.  

Introduction to Litmus

Litmus is a cloud native chaos Engineering framework for Kubernetes. It is unique in fulfilling all 4 of the above parameters. Litmus originally started as a chaos toolset to run E2E pipelines for the CNCF SandBox project OpenEBS – powering, for example, OpenEBS.ci – and has evolved into a completely open-source framework for building and operating chaos tests on Kubernetes based systems. It consists of four main components-

  • Chaos CRDs or API
  • Chaos Operator
  • Chaos libraries and plugin framework
  • Chaos Hub

Chaos API

Currently, Litmus provides three APIs:

  • ChaosEngine
  • ChaosExperiment
  • ChaosResult

ChaosEngine: ChaosEngine CR is created for a given application and is tagged with appLabel. This CR ties one or more ChaosExperiments to an application.

ChaosExperiment: ChaosExperiment CR is created to hold and operate the details of actual chaos on an application. It defines the type of experiment and key parameters of the experiment. 

ChaosResult: ChaosResult CR is created by the operator after an experiment is run. One ChaosResult CR is maintained per ChaosEngine. The ChaosResult CR is useful in making sense of a given ChaosExperiment. This CR is used for generating chaos analytics which can be extremely useful – for example when certain components are upgraded between the chaos experiments, and the results need to be easily compared

Chaos Operator

The Litmus Operator is implemented using the Operator-SDK. This operator manages the lifecycle of the chaos CRs. The lifecycle of Litmus itself can be managed using this operator as it follows the lifecycle management API requirements. The chaos operator is also available at operatorhub.io

Chaos Libraries and external Plugins

The actual injection of chaos is done by chaos libraries or chaos executors. For example, the Litmus project has already built a chaos library called “LitmusLib”. LitmusLib is aware of how to kill a pod, how to introduce a CPU hog, how to hog memory or how to kill a node, and several other faults and degradations. Like LitmusLib, there are other open-source chaos projects like Pumba or PowerfulSeal. The CNCF landscape has more details of various chaos engineering projects. As shown below, the Litmus plugin framework allows other chaos projects to make use of Litmus for chaos orchestration. For example, one can create a chaos chart for the pod-kill experiment using Pumba or PowerfulSeal and execute it via the Litmus framework. 

* PowerfulSeal and Pumba are shown as examples. 

Chaos Hub

Chaos charts are located at hub.litmuschaos.io. ChaosHub brings all the reusable chaos experiments together. Application developers and SRE share their chaos experiences for others to reuse. The goal of the hub is to have the developers share the failure tests that they are using to validate their applications in CI pipelines to their users, who are typically SREs. 

Currently, the chaos hub contains charts for Kubernetes chaos and OpenEBS chaos. We expect to receive more contributions from the community going forward.

Example use cases of Litmus:

The most simple use case of Litmus is application developers using Litmus in the development phase itself. Chaos Engineering has been limited to the Production environment, and lately, we are seeing this practice being adopted in CI pipelines. But with Litmus, chaos testing is possible during development as well. Like Unit Testing, Integration Testing, and Behavior-Driven Testing, Chaos Testing is a test philosophy for developers to carry out the negative test scenarios to test the resiliency of the code before the code is merged to the repository. Chaos testing can be appended very easily to the application, as shown below:

Other use cases of Litmus are for inducing chaos in CI pipelines and production environments. 


With the introduction of chaos operator, chaos CRDs, and chaos hub, Litmus has all the key ingredients of cloud native Chaos Engineering. 

Important links:

GitHub: github.com/litmuschaos

Twitter: @litmuschaos

Chaos Charts: hub.litmuschaos.io

Community Slack: #litmus channel on K8S Slack


Building a Large-scale Distributed Storage System Based on Raft

By | Blog

Guest post by Edward Huang, Co-founder & CTO of PingCAP 

In recent years, building a large-scale distributed storage system has become a hot topic.  Distributed consensus algorithms like Paxos and Raft are the focus of many technical articles. But those articles tend to be introductory, describing the basics of the algorithm and log replication. They seldom cover how to build a large-scale distributed storage system based on the distributed consensus algorithm. 

Since April 2015, we PingCAP have been building TiKV, a large-scale open source distributed database based on Raft. It’s the core storage component of TiDB, an open source distributed NewSQL database that supports Hybrid Transactional and Analytical Processing (HTAP) workloads. Earlier in 2019, we conducted an official Jepsen test on TiDB, and the Jepsen test report was published in June 2019. In July the same year, we announced that TiDB 3.0 reached general availability, delivering stability at scale and performance boost.

In this article, I’d like to share some of our firsthand experience in designing a large-scale distributed storage system based on the Raft consensus algorithm.

Scaling a distributed storage system

The first thing I want to talk about is scaling. The core of a distributed storage system is nothing more than two points: one is the sharding strategy, and the other is metadata storage. Keeping applications transparent and consistent in the sharding process is crucial to a storage system with elastic scalability. 

If a storage system only has a static data sharding strategy, it is hard to elastically scale with application transparency. Such systems include MySQL static routing middleware like Cobar, Redis middleware like Twemproxy, and so on. All these systems are difficult to scale seamlessly.

Before moving on to elastic scalability, I’d like to talk about several sharding strategies.

Sharding strategies for distributed databases

Sharding is a database partitioning strategy that splits your datasets into smaller parts and stores them in different physical nodes. The unit for data movement and balance is a sharding unit. Each physical node in the cluster stores several sharding units. 

Two commonly-used sharding strategies are range-based sharding and hash-based sharding. The choice of the sharding strategy changes according to different types of systems. A typical example is the data distribution of a Hadoop Distributed File System (HDFS) DataNode, shown in Figure 1 (source: Distributed Systems: GFS/HDFS/Spanner).

Figure 1. Data distribution of HDFS DataNode

Range-based sharding

Range-based sharding assumes that all keys in the database system can be put in order, and it takes a continuous section of keys as a sharding unit. 

It’s very common to sort keys in order. HBase keys are sorted in byte order, while MySQL keys are sorted in auto-increment ID order. For some storage engines, the order is natural. In the case of both log-structured merge-tree (LSM-Tree) and B-Tree, keys are naturally in order. 

Figure 2. Range-based sharding for data partitioning

In Figure 2 (source: MongoDB uses range-based sharding to partition data), the key space is divided into (minKey, maxKey). Each sharding unit (chunk) is a section of continuous keys. The advantage of range-based sharding is that the adjacent data has a high probability of being together (such as the data with a common prefix), which can well support operations like `range scan`. For example, HBase Region is a typical range-based sharding strategy.

However, range-based sharding is not friendly to sequential writes with heavy workloads. For example, in the timeseries type of write load , the write hotspot is always in the last Region. This occurs because the log key is generally related to the timestamp, and the time is monotonically increasing. But relational databases often need to execute `table scan` (or `index scan`), and the common choice is range-based sharding.

Hash-based sharding

Hash-based sharding processes keys using a hash function and then uses the results to get the sharding ID, as shown in Figure 3 (source: MongoDB uses hash-based sharding to partition data).

Contrary to range-based sharding, where all keys can be put in order, hash-based sharding has the advantage that keys are distributed almost randomly, so the distribution is even. As a result, it is more friendly to systems with heavy write workloads and read workloads that are almost all random. This is because the write pressure can be evenly distributed in the cluster, making operations like `range scan` very difficult.

Figure 3. Hash-based sharding for data partitioning

Some typical examples of hash-based sharding are Cassandra Consistent hashing, presharding of Redis Cluster and Codis, and Twemproxy consistent hashing.

Hash-Range combination sharding

Note that hash-based and range-based sharding strategies are not isolated. Instead, you can flexibly combine them. For example, you can establish a multi-level sharding strategy, which uses hash in the uppermost layer, while in each hash-based sharding unit, data is stored in order.

Implementing elastic scalability for a storage system

When it comes to elastic scalability, it’s easy to implement for a system using range-based sharding: simply split the Region. Assuming that you have a Range Region [1, 100), you only need to choose a split point, such as 50. Then this Region is split into [1, 50) and [50, 100). After that, move the two Regions into two different machines, and the load is balanced.

Range-based sharding may bring read and write hotspots, but these hotspots can be eliminated by splitting and moving. Splitting and moving hotspots are lagging behind the hash-based sharding. But overall, for relational databases, range-based sharding is a good choice.

In contrast, implementing elastic scalability for a system using hash-based sharding is quite costly. The reason is obvious. Assume that the current system has three nodes, and you add a new physical node. In the hash model, “n” changes from 3 to 4, which can cause a large system jitter. Although you can use a consistent hashing algorithm like Ketama to reduce the system jitter as much as possible, it’s hard to totally avoid it. This is because after a hash function is applied, data is randomly distributed, and adjusting the hash algorithm will certainly change the distribution rule for most data.

Implementing data consistency and high availability

We chose range-based sharding for TiKV. After choosing an appropriate sharding strategy, we need to combine it with a high-availability replication solution. Different replication solutions can achieve different levels of availability and consistency.

Many middleware solutions simply implement a sharding strategy but without specifying the data replication solution on each shard. Examples include the Redis middleware twemproxy and Codis, and the MySQL middleware Cobar. These middleware solutions only implement routing in the middle layer, without considering the replication solution on each storage node in the bottom layer.

However, this replication solution matters a lot for a large-scale storage system. Generally, the number of shards in a system that supports elastic scalability changes, and so does the distribution of these shards. The system automatically balances the load, scaling out or in. If there is a large amount of data and a large number of shards, it’s almost impossible to manually maintain the master-slave relationship, recover from failures, and so on. So it’s very important to choose a highly-automated, high-availability solution.

In TiKV, each range shard is called a Region. Because we need to support scanning and the stored data generally has a relational table schema, we want the data of the same table to be as close as possible. Each Region in TiKV uses the Raft algorithm to ensure data security and high availability on multiple physical nodes.

Figure 4. Raft group in distributed database TiKV

Several open source Raft implementations, including etcd, LogCabin, raft-rs and Consul, are just implementations of a single Raft group, which cannot be used to store a large amount of data. So the major use case for these implementations is configuration management. After all, the more participating nodes in a single Raft group, the worse the performance. If physical nodes cannot be added horizontally, the system has no way to scale.

If you use multiple Raft groups, which can be combined with the sharding strategy mentioned above, it seems that the implementation of horizontal scalability is very simple. Taking the replicas of each shard as a Raft group is the basis for TiKV to store massive data. However, it is much more complex to manage multiple, dynamically-split Raft groups than a single Raft group. As far as I know, TiKV is currently one of only a few open source projects that implement multiple Raft groups.

TiKV divides data into Regions according to the key range. When a Region becomes too large (the current limit is 96 MB), it splits into two new ones. This splitting happens on all physical nodes where the Region is located. The newly-generated replicas of the Region constitute a new Raft group.

Then here comes two questions: 

  • How do we ensure that the split operation is securely executed on each replica of this Region? 
  • How do we guarantee application transparency?

Question #1: How do we ensure the secure execution of the split operation on each Region replica?

The solution is relatively easy. You can use the following approach, which is exactly what the Raft algorithm does:

  1. Take the split Region operation as a Raft log.
  2. Let this log go through the Raft state machine. When the log is successfully applied, the operation is safely replicated.
  3. Verify that the splitting log operation is accepted.
  4. Let the new Region go through the Raft election process. As an alternative, you can use the original leader and let the other nodes where this new Region is located send heartbeats directly.

The split process is coupled with network isolation, which can lead to very complicated. For example, assume that there are two nodes named A and B, and the Region leader is on node A: 

  1. The leader initiates a Region split request: Region 1 [a, d) → the new Region 1 [a, b) + Region 2 [b, d). Node A first sends the heartbeat of Region 2 to node B. Node A also sends a snapshot of Region 2 to node B because there hasn’t been any Region 2 information on node B.
  2. At this time, Region 2 is split into the new Region 2 [b, c) and Region 3 [c, d). So the snapshot that node A sends to node B is the latest snapshot of Region 2 [b, c).
  3. Now the split log of Region 1 has arrived at node B and the old Region 1 on node B has also split into Region 1 [a, b) and Region 2 [b, d).
  4. Then the latest snapshot of Region 2 [b, c) arrives at node B. After the new Region 2 is applied, it must be guaranteed that the [c, d) data no longer exists on Region 2 at node B.

Question #2: How do we guarantee application transparency?

Raft does a better job of transparency than Paxos. Specifically, Raft provides a clear configuration change process to make sure nodes can be securely and dynamically added or removed in a Raft group. With this algorithm, the rebalance process can be summarized as follows:

  1. Add a replica to a Region.
  2. Transfer the leadership.
  3. Remove the local replica.

These steps are the standard Raft configuration change process. In TiKV, the implementation is a little bit different:

  1. The `conf change` operation is only executed after the `conf change` log is applied.
  2. To avoid a disjoint majority, a Region group can only handle one conf change operation each time. This has been mentioned in Diego Ongaro’s paper “Consensus: Bridging Theory and Practice” (2014). 

The process in TiKV can guarantee correctness and is also relatively simple to implement.

In addition, to implement transparency at the application layer, it also requires collaboration with the client and the metadata management module. After all, when a Region leader is transferred away, the client’s read and write requests to this Region are sent to the new leader node.

Note: In this context, “the client” refers to the TiKV software development kit (SDK) client.

When a client reads or writes data, it uses the following process:

  1. The client caches a routing table of data to the local storage. The routing table is as follows:


{startKey1, endKey1}   -> {Region1, NodeA}

{startKey2, endKey2}   -> {Region2, NodeB}

{startKey3, endKey3}   -> {Region3, NodeC}


  1. According to the key accessed by the user, the client checks and obtains the following information: 
    • The interval the key belongs to
    • The Region the interval belongs to
    • The physical node the leader is on
  1. The client sends the request to the specific node directly. As I mentioned above, the leader might have been transferred to another node. Then the client might receive an error saying “Region not leader”. 
  2. The client returns with the new leader.
  3. The client updates its routing table cache.

Scheduling in a distributed storage system

In this section, I’ll discuss how scheduling is implemented in a large-scale distributed storage system.

Scheduling using Placement Driver

The routing table is a very important module that stores all the Region distribution information. The routing table must guarantee accuracy and high availability.

In addition, to rebalance the data as described above, we need a scheduler with a global perspective. To dynamically adjust the distribution of Regions in each node, the scheduler needs to know which node has insufficient capacity, which node is more stressed, and which node has more Region leaders on it. This is because all nodes are almost stateless, and they cannot migrate the data autonomously. Instead, they must rely on the scheduler to initiate data migration (`raft conf change`).

You might have noticed that you can integrate the scheduler and the routing table into one module. Google’s Spanner database uses this single-module approach and calls it the placement driver. We also use this name in TiKV, and call it PD for short. PD is mainly responsible for the two jobs mentioned above: the routing table and the scheduler.

PD being completely stateless guarantees routing data consistency

Google’s Spanner paper does not describe the placement driver design in detail.  However, it’s certain that one core idea in designing a large-scale distributed storage system is to assume that any module can crash. It’s very dangerous if the states of modules rely on each other. This is because once an instance crashes, the standby instance must start immediately, but the state of this newly-started instance might not be consistent with the instance that has crashed. At this time, we must be careful enough to avoid causing possible issues.

Take a simple case as an example. The PD routing table is stored in etcd. However, the node itself determines the split of a Region. This way, the node can quickly know whether the size of one of its Regions exceeds the threshold. When this split event is actively pushed from the node to PD, if PD receives this event but crashes before persisting the state to etcd, the newly-started PD doesn’t know about the split. At this point, the information in the routing table might be wrong.

What we do is design PD to be completely stateless. Only through making it completely stateless can we avoid various problems caused by failing to persist the state.

Periodically, each node sends information about the Regions on it to PD using heartbeats. Then, PD takes the information it receives and creates a global routing table. In this way, even if PD crashes, after the new PD starts, it only needs to wait for a few heartbeats and then it can get the global routing information again. In addition, PD can use etcd as a cache to accelerate this process. That is, after the new PD starts, it pulls the routing information from etcd, waits for a few heartbeats, and then provides services.

The epoch mechanism guarantees the latest information in PD

However, you might have noticed that there is still a problem. If the cluster has partitions in a certain section, the information about some nodes might be wrong.

For example, some Regions re-initiate elections and splits after they are split, but another isolated batch of nodes still sends the obsolete information to PD through heartbeats. So for one Region, either of two nodes might say that it’s the leader, and the Region doesn’t know whom to trust.

In TiKV, we use an epoch mechanism. With this mechanism, changes are marked with two logical clocks: one is the Raft’s configuration change version, and the other is the Region version. For each configuration change, the configuration change version automatically increases. Similarly, for each Region change such as splitting or merging, the Region version automatically increases, too.

The epoch strategy that PD adopts is to get the larger value by comparing the logical clock values of two nodes. PD first compares values of the Region version of two nodes. If the values are the same, PD compares the values of the configuration change version. The node with a larger configuration change version must have the newer information.


It’s a highly complex project to build a robust distributed system. I’ve shared some of the key design ideas of building a large-scale distributed storage system based on the Raft consensus algorithm. If you’re interested in how we implement TiKV, you’re welcome to dive deep by reading our TiKV source code and TiKV documentation.

1 2 39