Summaries of Papers in Distributed Systems - Storage

A long post with summaries from reading technical papers in distributed systems, focusing on storage.

Status: Forever In Progress


This paper presents Google File System (GFS), a proprietary distributed file system designed specifically to handle the application workloads experienced at Google (web crawling and indexing). With these workloads in mind, the system design makes some key design decisions: its built for large files (in GBs) and large chunk sizes (64 MB), better suited for append-once-read-many workloads, client is coupled into the system design. Further, GFS is built to run on commodity-hardware and expects failures from the disks failing often.

Given this design criteria, GFS has a single master that saves the file storage metadata, and hundreds of chunkservers which save the files on top of standard linux file storage systems.

For a write request, the master will create a mapping from file namespace to chunk location on a primary chunkserver using which the client directly interacts with that chunkserver to write the file; replication is handled by the primary chunkserver pushing the data to other chunkservers; however, the chunk locations on each server are not the same. In order to not become a bottleneck, the master saves the metadata in memory for quick access passing the heavy lifting to the chunkservers.

For a read request, the master has an updated mapping of which chunkservers (primary and replicas) have that file based on heartbeat messages in which the chunkservers report the chunks they are storing. The client reads the chunks directly from the chunkservers and perform hashmap verification on the chunks. The clients can report incorrect chunks to the master who can then mark them for deletion as well as re-replication.

For a update request, they first get a lease for the mutation on the chunkserver. The append is guaranteed but not the write order for multiple clients concurrently updating the same file. Namespace locking using read and write locks allows clients to read the file even when its being written to; and make sure that the file is not deleted while it is being updated.

For file delete requests, these are handled by soft deleted which are then garbage collected in idle cycles in cases of low disk.

The real world workloads at Google show this system being used for 180 TB of data storage with a key highlight being the accumulation of dead files.


This paper presented the Hadoop Distributed File System (HDFS), an open-source distributed system built by the team at Yahoo. The system is built for storing large files distributed across thousands of servers.

The NameNode manages the namespace and the actual file blocks are saved in the DataNodes. The name node holds the file mapping in memory called the image; plus it keeps a log of all the operations not checkpoint-ed yet in the journal.

For a write request from the client, the NameNode picks 3 DataNodes to which the data should be replicated. The client reaches out to these DataNodes to write the data; the DataNodes send an hourly block report to NameNode reporting all the blocks they have including generation timestamp and block size (since blocks can occupy less than the full block size) and a heartbeat every 3 seconds with storage capacity, availability and transfers in progress. The NameNode can use the replies in the heartbeats to pass commands to DataNodes e.g. replication, deletion etc. Once a file is written, it can only be updated by appending by taking a lease for the file from the name node. Further, since HDFS is built for single-writer, multi-reader scenarios, only a single writer can write to a file by acquiring a lease and renewing it via heartbeats.

For a read request, the client gets a list of DataNodes from the NameNode and then reaches out to them based on their distance.

HDFS has other helper tools to like Balancer, Block Scanner, CheckPoint Node and Backup Node to ensure reliability.

Big Table

BigTable is a distributed persistent sparse multi-dimensional sorted map built to store structured data and is designed to scale to petabytes of data across thousands of commodity servers. Data is saved in tables as rows and column families (multiple time versions); the map is indexed with a (row, column, time) string. Big Table maintains the lexicographical order of keys; tables are divided into tablets (row ranges) that are distributed across tablet servers; a BigTable cluster can have thousands of tablet servers each having hundreds of tablets. The BigTable API provides functions to perform single-row transactions, batch writes across row keys as well as execution of client supplied scripts (Sawzall) and MapReduce jobs.

Under the hood, BigTable uses GFS (distributed file system) for storage, SSTable (ordered immutable map) to store the table data , Chubby (distributed locking service) for tablet server management, tablet location & schema information.

BigTable uses a single-master model for the tablet servers; the master handles the addition/removal of tablet servers and assigning tablets to tablet servers but the client talks directly to tablet servers to get access to the data. BigTable uses a three-level hierarchy for saving tablet information; starting with the first metadata table on the root tablet (Chubby knows about root tablet location) that points to other metadata tablets on other servers and those point to user tablets; these tablet locations are cached by the client. BigTable uses master to control tablet assignment and Chubby for tracking tablet servers (using file locks on unique files in servers directory on Chubby); tablet servers also periodically report themselves to the master. BigTable uses GFS to store the persistent data into SSTables and a write log for recent writes (that are saved in sorted buffered called memtable). In order to keep memory in check and storage less fragmented, Big Table uses minor compactions (to convert memtable to SSTable), merging compactions (to convert few SSTables and memtable into a single SSTable) and major compaction (a merging compaction to convert to a single SSTable).

BigTable uses certain refinements for optimum use: Locality Groups to keep column families that accessed frequently together close to each other in separate SSTable, two-pass compression, two-level caching for read performance, bloom filters to check if row/column might exist in a SSTable and a shared log for all tablets on a tablet server.

Applications using BigTable include Google Analytics (to maintain a raw click table of sorted user sessions and a summary table for each website), Google Maps (to store adjacent geo segments close to each other with a large number of column families with sparse data) and Personalized Search (to record userids and user actions to generate user profiles).

Lessons learned from practical application include RPC checksumming, delay feature addition until verification of needs, having proper system level monitoring and the value in simple designs.

Amazon Dynamo

Amazon Dynamo is a highly available & highly scalable distributed key-value storage that sacrifices consistency for availability in cases of failures. It is a purpose-build data store for the needs of that do not require a relational database but a primary-key access to datastore.

Dynamo’s system architecture employs novel techniques to solve the design problems:

  1. Data Partitioning

    • Why: Incremental Scalability, Durability
    • Solution: Consistent Hashing
    • How: Nodes (or virtual nodes to account for physical node heterogeneity) are laid out in a ring with position values. Every data item key is hashed to find its position value. Data is saved on the first node encountered walking clockwise on the right where node position value greater item position value. This node is called the coordinator node and is hence responsible for data between itself and predecessor. Replication is handled by coordinator node to N-1 successor nodes, making each node responsible for data between itself and Nth predecessor. This list of nodes is called the preference list, and system design allows for any node to determine what the preference list is for a given data item.
  2. Data Versioning

    • Why: Always Writeable
    • Solution: Object versioning, Vector Clocks, Read reconciliation
    • How: Dynamo treats each modification as a new & immutable version. On read, all versions of the data are sent to the client for resolution; syntactic resolution is common where new values subsume older ones and are written back to all nodes; semantic resolutions are applied using vector clocks in case of conflicts using a quorum system protocol using the formula R + W > N, but sloppy not strict so as not to affect durability.
  3. Handling Temporary Failures (replica node is down temporarily)

    • Why: Increase write durability
    • Solution: Hinted Handoffs
    • How: if a node is down for a write operation, it will be sent to another node (not in the preference list) with a hint in its metadata about which node it was intended for where they are saved in a separate local database and sent back to intended node when it recovers.
  4. Recovering from Permanent Failures (e.g. replica sync when hinted handoffs are lost)

    • Why: Detect replica inconsistencies faster
    • Solution: Merkle Trees
    • How: Each node maintains separate merkle tree for each key range it hosts. Nodes share merkle trees of key ranges they host in common. Using tree traversal, nodes determine any inconsistencies and sync.
  5. Membership & Failure Detection

    • How: Changes are communicated via a gossip-based protocol from node to node. To prevent logical partitions some nodes are setup as seed nodes and are externally discoverable by all nodes. A local notion of failure is found to be sufficient, and a node will retry to reach un-responsive nodes.

Dynamo’s software architecture has 3 main components:

  1. Local persistence engine

    • It allows for multiple pluggable engines like BDB (most common, for persisting object sizes in tens of kilobytes), MySQL (for persisting large object sizes), in-memory buffer.
  2. Request coordination

    • Built as an event-driven messaging system with message processing pipeline split into multiple stages
    • Coordinator node executes the reads and writes issued by the clients; each request leads to the creation of a state machine on the node for writes; read repair is performed on detection of stale reads
  3. Membership and failure detection

Memcache at Facebook

Memcache is an in-memory distributed key-value store that provides low latency access to a shared storage. Facebook uses Memcache as a query cache and generic cache to scale for their read heavy workload by operating it in in a cluster environment to reduce latency and load on their database servers; using multiple clusters replicated over multiple global regions; providing best-effort eventual consistency with an emphasis on performance and availability.

In Facebook, users consume more than they create and the data to build a page is generated from heterogeneous sources that involves lots of servers, leading to a high read all-to-all communication setup.

Facebook uses memcache servers in a cluster setup to reduce latency and load. A single cluster has hundreds of web servers and memcache servers that follow an all-to-all communication pattern. Items are distributed across memcache servers through consistent hashing; applications construct a DAG representing dependencies between data to batch requests to memcache servers. Memcache clients are stateless and client logic is embedded using a library or on mcrouter (standalone proxy); clients use UDP for get requests directly to the memcache servers and TCP for set and delete operations through mcrouter. The high degree of parallelism for high throughput is managed by connection coalescing via mcrouter. Clients use sliding window to prevent incast congestion that can overwhelm it if all the batch requests were returned at the same time.

Facebook uses leases to handle challenges around stale sets (due to concurrent updates that can get reordered) and thundering herds (due to heavy write activity on a key which keeps invalidating that key causing more cache misses on an equally heavy read activity). On a cache miss, a memcache server issues a 64-bit token bound to the key to the client for a write request (since client will now read data from database now). The server will issue this token once every 10 seconds per key; where other clients asking for a value will be asked to retry later. Facebook also partitions servers into memcache pools based on the application needs or key activity e.g. a low-churn pool for items that are infrequently accessed but expensive to calculate, a high-churn pool for items that are accessed less frequently but a less expensive to calculate, a wildcard pool. In some pools replication is considered more effective than further partitioning of the key space since the replication will lead to reducing traffic to servers for batched key requests. In order to handle errors on memcache servers, Facebook applies two strategies based on the scale of the error: for an error due to a small set of hosts, it uses a small set of machines as a gutter pool that is queried if the main memcache server times out; the gutter pool machines fetches data from database and caches it in there for a short amount of time and hence limiting the impact on backend services; for errors due to a large number of hosts, traffic is diverted to an entirely new cluster.

Facebook creates a region as a set of frontend clusters (of web and memcache servers) and a storage cluster. Regional Invalidations are handled by mcsqueal (installed on database servers) that modifies the sql statements to include the memcache keys that need to be invalidated after transaction is completed, and are then broadcasted in batches to the frontend clusters in a region. Regional Pools are created where multiple frontend clusters can share memcache servers. Cold Cluster Warmup process is used to warm up a new cold cluster by allowing it to get values from a warm cluster.

Facebook sets up multiple geographical regions with one designated as master and the rest as slaves with MySQL replication keeping the sync between master and slave databases; however keeping consistency between the various caches and databases can be a major challenge that stems from the issue that replica databases will lag the master database. Writes from the master region can be invalidated using mcsqueal in the region itself, but invalidating it in the slave regions maybe be premature due to database replication lag; writes from non-master region will read stale data from a cache refill if there is large database replication lag. To reduce the probability of reading stale data, Facebook uses remote markers (that are set on updates and deletes) that indicate that the data in local replica database could be stale and query should be directed to master region.

Facebook also adds performance gains for a single memcache server by 1) allowing auto expansion of hashtables 2) supporting multi-threading using fine-grained locking over multiple data structures 3) using individual UDP port for each thread 4) adding an adaptive slab allocator to manage memory that re-balances slabs based on the LRU eviction patterns due to the current workload 5) creating a transient item cache for short lived items 6) preventing restarts due to software upgrades

Facebook TAO

Tao is a read-optimized distributed eventually consistent graph data store to serve the Facebook social graph. Tao provides a data model made of objects and associations and an API over a MySQL database, replacing the use of Memcache. It is deployed as a single system within Facebook and serves billion reads and millions of writes each second.

A Facebook page aggregates and filters hundreds of items in the social graph and each user gets a personalized view; the extreme personalization makes it infeasible to perform it when content is created and is instead done on the fly when page is requested.

The benefits of using Tao over the Memcache-architecture include 1) purpose built to load list of edges efficiently 2) not having to depend on leases for control logic on clients and instead have it taken care of on the cache server itself 3) not having to use remote markers for avoiding inter-region stale reads and instead rely on the data model restrictions

Facebook focuses on people, relationships and actions that are modeled as nodes and edges in the graph. TAO objects are typed nodes and TAO associations are typed directed edges between objects. It is common for an association to have an inverse. Users, Posts, Landmarks are represented as objects, Friendships, authorship and other relationships as associations. Actions can be modeled as object or association - repeatable actions (like Comments) are modeled as objects but one-time or state transitions (like Likes, accepting event invite) are modeled as associations.

The TAO object and association APIs provides operations to create, read, update and delete objects and associations. TAO’s association query API are organized around association lists; where an association list is a list of associations for a given object id and association type arranged in a descending order by time (since a characteristic of a social graph is that most of the data is old but many of the queries are for recent data; this creation time locality arises as Facebook focusses on recent items). TAO’s association query API provides operations to get association lists as a full list, a ranged position list, a ranged time list and a count value.

TAO is separated into 2 caching layers and 1 storage layer. For the storage layer, many database servers are responsible for serving many shards of data divided logically. All objects are stored in 1 table and all associations in another. Each object contains a shard id and is bound to that shard for its lifetime. Each association is bound to shard of the id1 of the object, so every association query can be served from a single server.

The caching layer is implemented as a write through cache of many cache servers that together for a tier. Each request maps to a single cache server (that is resolved using shard id) and a cache tier is capable of serving any TAO request. The in-memory cache contains object, association lists and association counts. The caching servers understand the semantics of their contents and use that to answer previously unasked queries or issue other queries to other shards for association lists.

The caching layer is split into 2 layers: leaders and followers. Clients communicate only with the followers, which on cache miss or write will pass the request to the leader cache server that will query the storage shard. The followed that requested the update is synchronously updates from the response of the leader, and the leader enqueues async updates to the other follower caches. For associations, the leader will enqueue a refill request instead of an invalidation request. For scaling geographically in a multi-region setup, the master region sends read misses, writes, and embedded consistency messages to the master database. Consistency messages are delivered to the slave cache leader as the replication stream updates the slave database. Slave cache leader sends writes to the master cache leader and read misses to the replica DB. The choice of master and slave is made separately for each shard. Facebook chooses data center locations that are clustered into only a few regions and each region can serve the copy of the entire social graph.

TAO also has optimizations for performance and storage efficiency. For caching servers these include better memory management using adaptive slab allocator, better isolation by partitioning RAM into arenas, storing association counts in a direct-mapped 8-way associative map. For database servers, it has 3 tables: one for objects where all the columns are serialized and saved into a single data column allowing objects of different types to be stored in the save database, one for associations which has an extra index on id, association type and time, and one for association count. Database shards are mapped onto cache servers using consistent hashing; it re-balances load among followers using shard cloning in which reads to a shard are served by multiple follower cache servers; and incase the item is very hot then it is saved along with a version in a TAO client cache.TAO handles association list queries for high degree objects effectively by using application domain knowledge like the association count to choose the query direction and object and association creation times to limit the results.

TAO provides consistency within a single tier using read-after-write and maintains consistency in slave regions using changesets that are passed by the master leader to the slave follower for id1 (and id2 when applicable). TAO handles failures are all levels including database failures, leader failures, follower failures and refill/invalidation failures.


C-Store is a read-optimized relational database with storage of data by column. Row stores are write-optimized and suitable for OLTP applications; column stores are suitable for data warehouses, CRM, electronic library card catalogs, and other ad-hoc inquiry systems. C-Store has a hybrid architecture with a WS component optimized for frequent writes and RS component optimized for query performance; redundant storage of a table with overlapping projections; heavily compressed columns using various encoding schemes; use of snapshot isolation to avoid 2PC and locking for queries.

C-Store supports the standard relational logical data model; tables can form primary and foreign key references; and uses SQL for queries. C-Store stores a collection of columns, each sorted by some attributes. The columns are stored in individual files in the underlying OS. Groups of columns that are sorted on the same attribute are called projections. Each projection is horizontally partitioned into 1 or more segments on the sort key of the projection. Each segment associates every data value of every column with a storage key. To construct all of the records of a table from its various projections, C-Store uses join indexes that map projections sorted on the same sort key by mapping position in one projection to storage key in the other projection. Values from different columns in the same segment with matching storage keys below to the same logical row (or tuple or record).

RS is a read-optimized column store and WS is a write-optimized column store. Both of them are horizontally partitioned (with 1:1 mapping between segments) & have the same projections and join indexes but storage representations are different. In RS, compression is done using a scheme based ordering of values in a column (self order) or by values of some other column (foreign order); in WS, since the size of the data is trivial no compression is done. In RS, the storage key not stored by calculated but in WS the storage key is explicitly stored in every record. Every projection is represented as a collection of pairs of segments, one in WS and another in RS.

TM is a background task that moves blocks from a WS segment to a RS segment using MOP while also updating any join indexes in the process.

Read-only transactions are isolated from updates and deletes using Snapshot Isolation by allowing read-only transactions to access the database in the recent past before which we can guarantee that there are no uncommitted transactions. HWM is the most recent time in the past that snapshot isolation can run; LWM is the earliest effective time at which a read-only transaction can run. In order to provide Snapshot Isolation, we cannot perform updates in place and are instead converted into an insert and delete and using coarse granularity epochs. An IV & DRV are saved in WS of when (based on epoch) each record was inserted/deleted; the tuple mover makes sure that there are no records in RS that were after LWM; the runtime engine consults the IV and DRV for visibility calculation of each query on a record-by-record basis.

C-Store is a multi-site distributed system in which the calculation of HWM and the concurrency control is managed by one of the may sites. For HWM determination, one of the sites is picked as the Time Authority and does the epoch management. For concurrency control, each transaction has a master that uses distributed locking (but no PREPARE statements are sent) to commit it, and undo log to roll it back. C-Store maintains K-safety in that K sites can fail and the system can recover while maintaining its transactional consistency.

The C-Store query optimizer executor supports 10 operators(decompress, select, mask, project, sort, aggregation operators, concat, permute, join, bitstring operators), 3 operand/result types (projection, column, bitstring) and 4 operator arguments (predicates, join indexes, attributes, expressions). A C-store query plan consists of a tree of operators, access methods and iterators. C-Store operators can operate on compressed and uncompressed input.

In performance comparisons, C-store uses 60% less space than the row store and 30% less than another column store due to its compression and absence of padding to word/block boundaries. In space-constrained case (to limit the number of views created), C-Store is 200 times faster than row store and 20 times faster than column store; in space-unconstrained case, C-Store is 10 times faster than row store and 15 times faster than the column store.