Our company develops distributed software applications. In order to provide cooperation between components, these applications employ different methods of inter-process communication. The most of the complexity arises during the implementation of the algorithms for distributed processing of interconnected data. These problems require utilization of a special purpose distributed coordination system. The most popular and widely used coordination system is Apache Zookeeper.
Zookeeper is a complex product. Despite its considerable age, there are occasional bugs discovered in Zookeeper. However, it is a most natural consequence of its vast capabilities, which make life of the developer of distributed systems so much easier. This article overviews some features of Zookeeper that will help to better understand its capabilities. After that we will discuss the Apache Curator library (developed at Netflix), which allows developers of distributed software to work more efficiently and offers them a toolbox of ready-made solutions for developing distributed coordination objects.
Apache Zookeeper
As mentioned previously, Zookeeper is a crucial component of many distributed systems. Zookeeper database can be represented as a tree, similar to a file system tree, where every node is identified by a path (/a/path/to/a/node) and can store arbitrary data. This allows Zookeeper to be used for creating hierarchical distributed data stores and other useful constructs. Utility value and wide spread occurrence of Zookeeper is possible due to the properties described next.
Distributed consensus
Zookeeper ensures consensus by means of the ZAB algorithm. The algorithm provides C(consistency) and P(partition tolerance) properties of the CAP theorem, while sacrificing A(availability). In practice that means:
- All clients see the same state, regardless of the server they query.
- Changes to the state occur in a strict order, race conditions are impossible (for set operations, get-set operations are not atomic).
- Zookeeper cluster can fall apart and become unavailable, but then it is unavailable for everybody.
Consensus is an ability of a distributed system to somehow agree on its own state. Zookeeper uses the ZAB algorithm, but there are other algorithms widely used, including Paxos and Raft.
Ephemeral nodes
A client that connects to Zookeeper creates a session. During the session it can create nodes, that will be visible for other clients, but only while the session is active. After its completion the nodes will be deleted. These nodes have some limitations: they can only be terminal and thus cannot have descendants. In other words, Zookeper doesn’t allow ephemeral subtrees. Ephemeral nodes are often used for implementing service discovery systems.
Consider a set of service instances that should be load balanced. Upon addition of a new service instance an ephemeral node containing its address is created. After the instance crashes, its node is removed and hence cannot be used for load balancing. Ephemeral nodes are often used in problems where on-line availability of a subject needs to be observed.
Node events subscription
A client can subscribe (become a watch) to the node and be subsequently notified about the events connected to its data. However, there are also limitations: after the event occurs, the subscription is nullified and should be renewed. Before the subscription is renewed the other events might occur which will not be registered. Due to this fact, application of this feature is quite limited.
For example, in the service discovery problem it can be used for reacting to configuration changes. However, it is important to remember to manually perform the operation, in order to capture possible state change.
Sequence nodes
Zookeeper allows creating nodes with a monotonically increasing counter appended to the end of the path, and these nodes can even be ephemeral. This feature is widely used for both applied problems (for example where all services of the same type are registered as ephemeral nodes) and for implementing Zookeeper recipes (such as the recipe Fair Distributed Lock).
Node versions
Node versions can help identify whether a change occurred in the time frame between read and write. In other words, set operation can be configured by a node version, and the operation will not be performed if the actual version is different from the expected, since it would mean that the state was changed by the other client and should be reread. The versioning makes possible the linearizable order of data state changes , which is useful, for example, when implementing the recipe Distributed Counter.
ACL for nodes
Zookeeper allows specifying restricted access to a node by means of ACL, which is necessary for protecting data from untrusted applications. It is worth mentioning that obviously ACL doesn’t protect from overloads created by a malicious client, and it only controls access to the actual data contents.
TTL for nodes
Zookeeper allows specifying a TTL for a node, after which (if there were no updates) the node is deleted. This is a relatively new feature.
Observer servers
There is a way for a server to join the cluster as an observer, in order to get read-only access. This can be useful when the cluster write load is already high, and adding read-write servers will only aggravate the problem. You might be wondering, why are read-only servers a solution? The answer lies in the consensus algorithm: the more servers can write data, the more time-consuming it is to achieve consensus, and thus the less is the write throughput of the system. Observer servers do not participate in consensus, and so they don’t affect write operations.
Time synchronization tolerance
Zookeeper doesn’t rely on an external clock for the node synchronization. It is a useful property: systems that rely on the precise time are pron to errors arising from the time mismatch between different nodes.
Zookeeper disadvantages
Of course there is a fly in the ointment: some of the properties of Zookeeper can limit its possible applications. There is an expression that quite ironically describes difficulties of dealing with Zookeeper:
Single Cluster of Failure (c) Pinterest.
It emphasizes the fact that in a distributed system built on top of Zookeeper, it may eventually become that very single point of failure we were trying to avoid in the first place.
Database should fit into RAM
Zookeeper loads its database in memory and keeps it there. If it exceeds the RAM capacity, it will be moved to Swap, which will lead to a substantial performance decline. If the database needs to be big, it will need a server with a considerable RAM amount (which is hardly an issue, now that 1 TB RAM on a single server is not even the limit).
Session timeout
Improper timeout settings for a client can lead to unexpected effects, that will be even worsened by load increases and cluster node failures. Users tend to decrease the timeout value (defaulted to 30 seconds) in order to shorten the convergence time (since ephemeral nodes will be deleted faster), but that usually leads to lesser system stability under load.
Performance decline due to additional nodes in the cluster
A Zookeeper cluster usually has 3 nodes required for reaching a consensus. Additional nodes will significantly decrease performance of write operations. The number of the cluster nodes must be odd (requirement of the ZAB algorithm), so increasing it up to 5, 7, or 9 nodes will decrease the overall performance. In some cases the problem can be mitigated by using observer servers.
Maximum data size for a node
The size of the data stored in a single tree node is limited to 1MB. If a bigger per-node capacity is required, Zookeeper cannot be used.
Maximum number of nodes in a descendant list
Zookeeper doesn’t impose any limits on the number of descendants for a single node, however, maximum size of a data package that the server can send to a client is 4MB (jute.maxbuffer). If a node has so many descendants, that its list cannot fit into a single package, there will be no way of getting information about them. This situation can be avoided by arranging hierarchical pseudo-flat lists similar to file system caches: object names or digests are split into parts and arranged in a hierarchical structure.
Despite all the limitations, the advantages provided by Zookeeper outweigh its shortcomings: Zookeeper is an important part of many distributed ecosystems, including Cloudera CDH5, DC/OS, Apache Kafka and others.
Zookeeper from a developer’s perspective
Since Zookeeper is implemented in Java, it can be smoothly integrated in any environment with JVM. For example, running a server (or even a cluster of servers) directly from Java and using it for conducting integration and smoke tests is fairly straightforward and doesn’t require deployment of an external server. However, the client API of Zookeeper is quite low-level, and though with enough proficiency it can be used as is, dealing with it is similar to swimming upstream. Moreover, extensive understanding of Zookeeper fundamental principles is required in order to implement correct exception handling. In our own experience of working with the low-level interface, debugging and troubleshooting code for distributed coordination and service discovery caused many troubles and was very time consuming.
Good news, there is a solution, and it was presented to the community by one of the Netflix developers, Jordan Zimmerman. Without further ado, Apache Curator.
Apache Curator
The home page of the project contains the following quote:
This statement perfectly sums up what Curator is. After we began using the Curator library, the code for working with Zookeeper became simple and transparent, while the number of bugs and the time for their correction has substantially decreased. If using the low-level client API is an upstream swim, the use of Curator allows the developer to make a u-turn. Moreover, Curator provides implementations for many recipes, which we will overview later.
Essential API
API is designed as a fluent interface and it makes programming simple and concise. For example (all the following examples are in the Scala programming language):
client
.create()
.orSetData()
.forPath("/object/path", byteArray)
which can be interpreted as “create a node, or use an existing one, for path /object/path
and write
byteArray
as its contents”. Another example:
client
.create()
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
.forPath("/head/child", byteArray)
“create a sequential ephemeral node for path /head/child000000XXXX
and write byteArray
to it”. Other
examples can be found on this page.
Asynchronous operations
Curator supports both synchronous and asynchronous execution modes. In the case of asynchronous mode the client is an
instance of AsyncCuratorFramework
(as opposed to CuratorFramework
for synchronous mode), and every method
chain must end with a thenAccept
call, that accepts a callback to be executed after the operation completion. A
more detailed discussion of an asynchronous API can be found on the corresponding page of the Curator documentation.
val async = AsyncCuratorFramework.wrap(client);
async.checkExists().forPath(somePath).thenAccept(stat -> mySuccessOperation(stat))
When using Scala (which is one of the main programming languages of our company), asynchronous interface might not be feasible, since the same effect can be achieved by the more scala-way approach of using Future objects. However, for Java and other JVM languages the interface might serve its purpose.
Data Schema support
Zookeeper doesn’t support semantics of the data it stores. This means that developers are entirely responsible for using appropriate formats and paths for the data. This might become an issue in many cases, including introduction of new developers to the project. In order to avoid similar problems Curator supports data schemas, which allow specifying constraints on possible paths and their node types. Schema, created from an external configuration, can be represented in the JSON format:
[
{
"name": "test",
"path": "/a/b/c",
"ephemeral": "must",
"sequential": "cannot",
"metadata": {
"origin": "outside",
"type": "large"
}
}
]
Migration support
Migrations in Curator are similar to Liquibase, but are applied to Zookeeper. They can be used to document database evolution between different versions of the product. Migration can be specified as a sequence of operations. Every operation is a transformation over Zookeeper database. Curator can be configured to automatically track if the migration has been applied (once again by utilizing Zookeeper). This functionality can be used for deploying a new version of the application. Migrations are described in more detail on the corresponding page of the Curator documentation.
Migration support
Migrations in Curator are similar to Liquibase, but are applied to Zookeeper. They can be used to document database evolution between different versions of the product. Migration can be specified as a sequence of operations. Every operation is a transformation over Zookeeper database. Curator can be configured to automatically track if the migration has been applied (once again by utilizing Zookeeper). This functionality can be used for deploying a new version of the application. Migrations are described in more detail on the corresponding page of the Curator documentation.
Testing server and testing cluster
In order to simplify the testing process, Curator provides interface for building a server or even a cluster of servers right into the application. This can be achieved by using Zookeeper API as well, however Curator provides a much more concise interface. For example, with Zookeeper alone:
class ZookeeperTestServer(zookeeperPort: Int, tmp: String) {
val properties = new Properties()
properties.setProperty("tickTime", "2000")
properties.setProperty("initLimit", "10")
properties.setProperty("syncLimit", "5")
properties.setProperty("dataDir", s"$tmp")
properties.setProperty("clientPort", s"$zookeeperPort")
val zooKeeperServer = new ZooKeeperServerMain
val quorumConfiguration = new QuorumPeerConfig()
quorumConfiguration.parseProperties(properties)
val configuration = new ServerConfig()
configuration.readFrom(quorumConfiguration)
private val thread = new Thread() {
override def run() = {
zooKeeperServer.runFromConfig(configuration)
}
}
def start = {
thread.start()
}
def stop = {
thread.interrupt()
}
}
...
val s = new ZookeeperTestServer(port, tmp)
s.start
...
s.stop
With Curator:
val s = new TestingServer(port)
s.start()
...
s.stop()
Curator recipes
Recipes is the main incentive for using the Curator library when developing distributed mechanisms for inter-process communication. Following are the basic recipes supported by Curator and their possible applications.
Leader election
The following recipes are designed to implement fault-tolerant process execution model with a single effective leader and several hot-standby processes. As soon as the leader stops performing its duty, the other process becomes a leader. There are two suitable recipes:
- Leader Latch is analogous to JDK’s
CountDownLatch
, it provides methods that block until the process becomes a leader. - Leader Election specifies the leader by calling a method. The moment the process becomes a leader, a specific method is called, returning from which signifies that the leadership is relinquished.
Locks
Locks are one of the most important ways for implementing distributed inter-process synchronization. Curator provides a wide range of lock objects:
- Shared Reentrant Lock — a distributed lock, which can be safely reentered by the client already holding this lock;
- Shared Lock — a distributed lock;
- Shared Reentrant Read Write Lock — an object, that allows separate locks for writes and reads, so that the read locks can be hold by several clients concurrently while the write locks are exclusive.
- Shared semaphore — a counting semaphore that allows distribution of a single limited resource which can be represented as a 32-bit integer;
- Multi Shared Lock — a high level object that allows atomic operations over several distributed locks at the same time.
Barriers
- Barrier — an object, that allows a client to block a code section for other participants until certain conditions are met, and then to unblock it so that all participants can continue their execution;
- Double Barrier — an object that can synchronize entrance of several clients into a code section, as well as their exit from it.
Counters
- Shared counter — a simple integral counter (32 bit) protected from race conditions.
- Distributed Atomic Long — a counter of type Long (64 bit).
Caches
- Path Cache — an object that observes a node and maintains an up-to-date local cache of the node’s children and (optionally) their data;
- Node Cache — an object that observes a node and maintains an up-to-date local cache of its state and data.
- Tree Cache — an object that observes a whole subtree of a node’s descendants and maintains its up-to-date local cache.
Nodes
- Persistent Node — this recipe creates a data node, which will try to stay present even through connection and session interruptions;
- Persistent TTL Node — a recipe for creating a node similar to a Persistent Node, but with its lifespan being limited by TTL;
- Group Member — a recipe for creating and maintaining a group of members.
Queues
It should be noted that Zookeeper is not the best solution for creating data-intensive distributed queues. If a high message throughput needs to be achieved, we recommend using a dedicated solution, for example Apache Kafka, RabbitMQ or another. Nevertheless, Curator provides recipes for queues:
- Distributed Queue — a simple distributed queue, provides methods for adding and popping messages in a FIFO order;
- Distributed Id Queue — a distributed queue that stores an identifier for every message and allows popping by this identifier;
- Distributed Priority Queue &mdash a distributed queue which associates a priority with every item and fetches them according to priorities;
- Distributed Delay Queue — a queue that stores a Unixtime value for every message. This value specifies the moment in time when the message will become available for popping.
- Simple Distributed Queue — is a replacement for the Distributed Queue from a standard Zookeeper API.
Conclusion
Apache Curator is definitely worth considering, the library is an outstanding example of software engineering art and it provides an easy access to the capabilities of Apache Zookeeper. The library has a relatively scarce documentation, which increases the threshold for developers. In our experience we often found ourselves consulting the library source code in order to understand the logic behind a particular recipe. However, it had a positive effect as well — deeper understanding of implementation details guards from many logical errors that could arise from incorrect assumptions.
It should be noted that Curator developers strongly suggest reading the Zookeeper documentation before using their library. It is a sound advice, since Zookeeper is one of those products, that require an extensive understanding of its internal workings, not merely knowledge of its API. This investment will surely pay off, and Zookeeper will help an experienced software engineer to create reliable and high performant distributed systems.