MapReduce Design Patterns Book Review

July 28, 2014

I recently came across the delightful book MapReduce Design Patterns by Donald Miner and Adam Shook. The book is an indispensable addition to the collection of any self-respecting big data professional. It is on par with another favorite of mine RESTful Web Services Cookbook. Both books are perfect examples of the right mix of theory and practice. They deal with difficult big data problems that developers and architects encounter, and provide focused, pragmatic and conceptually sound solutions. Kudos to O’Reilly for providing us with such pearls of wisdom.

Several projects that I’ve been involved with could have sorely used this book. Hungry developers eager to adopt hot new Hadoop technologies initially dazzle business folks with their preliminary efforts. The initial results of monetizing large amounts of passive data makes everyone happy.

Here’s one case I witnessed. In first stage, ad server logs from a dozen machines are pushed to a HDFS cluster, MapReduce boils them down to CSV import files, and these are then loaded into a MySQL “data warehouse”.  As ad requests rapidly increase the number of ad servers machines grows to 20 or 30 and the initial architecture starts buckling as the sheer complexity of many moving parts overwhelms the team. Without a proper grounding in big data concepts and frameworks, things fall apart in the face of higher volume and velocity. This is a good example of where quantitative changes result in qualitative ones. In short, the first step is to read MapReduce Design Patterns from cover to cover!

One of the great things about the book is that it covers lower-level patterns such as Numerical Summarization and Counting with Counters but also higher level abstractions such as Metapatterns which are patterns comprised of other patterns such as Job Chaining and Job Merging. You can’t ask for more – there are chapters on Bloom Filtering, Binning, Replicated Joins, etc. Ultimately this would lead us to top-level architecture patterns such as Lambda Architecture – however, this is  the topic of a future blog. The final chapter touches upon future directions regarding non-textual formats such as image audio, video.

My particular favorite  is Chapter 7 Input and Output Patterns as I have recently been trying to hammer out an external table feature for a SQL-on-Hadoop database à la Hive’s StorageHandler. This chapter provides great clarity and insight on the topic. The authors start off by describing the basic underpinnings of key Hadoop interfaces such as InputFormat/RecordReader and the corresponding OutputFormat/RecordWriter from a high-level conceptual perspective. After reading this chapter, you understand the why as well as the how  of  these key interfaces. The chapter then goes beyond the common file-based formats to explore the more adventurous topic of connecting input and output sources to systems outside Hadoop and HDFS. The chosen example is the rocking NoSQL Redis database. This is a must read for anyone trying to connect Hadoop to external systems.

The one issue I had with the book is the “MapReduce” word in its title. Most of the patterns are not in any way specific to MapReduce. Of course, when the book was written MapReduce was the only game in town, but we now have a new generation of execution engines such as Spark and Tez. For the most part, the patterns are equally applicable to the other execution engines or even to a roll-your-own solution. I would love to see a companion book which describes the patterns with Spark code samples.

Resources:

Adventures in Hive SerDe-landia

July 25, 2014

Overview

I was recently involved  in a product requirements effort for external tables for an SQL-on-Hadoop product. The first thing that came to mind was to take a deep dive into Hive’s SerDe and StorageHandler capabilities. The top-level StorageHandler interface in essence represents a good example of a Hadoop design pattern. These features allow you to connect any external data source to Hive – be it a file or network connection.  One of my core beliefs is that before starting any task, it is important do due diligence and see what other folks are doing in the same space. Prior knowledge and awareness of the current state of the art are things all developers and product managers should be required to do.

Basic

There are two dimensions to external data source connectivity:

  • The syntax for defining external data sources in Hive QL which is in essence a DSL (domain-specific language).
  • External data sources connector implementations

In my view, the DSL is not consistent and confusing. There are several ways to do the same thing, and some storage handlers are privileged and exposed as keywords in the DSL. For example, attributes of the default text storage handler are treated as first-order keywords instead of SERDEPROPERTIES. This blog will focus on the storage handler implementations.

There are two dimensions to the way Hive stores table data:

  • file format –  How the input source (e.g. file) is broken up into records.
  • row format – How a record is parsed and mapped into database columns. The SerDe interface defines this extension point.

HiveStorageHandler is the top-level interface that exposes the file and row format handlers. It has three methods – two defining the file format’s input and output, and one method defining the SerDe. A SerDe is simply an an aggregation of a Deserializer and Serializer. The nice thing about Hive is that eats its own dog food – the default text-based handler is implemented as a SerDe itself – MetadataTypedColumnsetSerDe.

One API design note. The the file and row format methods of HiveStorageHandler should be treated in a uniform fashion. Currently the two Input/Output file format methods are exposed as methods whereas the row feature is encapsulated in one method that returns SerDe.

HiveStorageHandler

MongoDB Hadoop Connector

The MongoDB folks have written a very comprehensive Hadoop connector:

Connect to MongoDB BSON File

This example illustrates how you can connect to BSON dump files from a MongoDB database. One potential problem I see here is mapping large schemas or managing schema changes. Specifying a large list in the DDL can get unwieldy. Having an external configuration file with the mapping might be easier to handle.

CREATE external TABLE person (
  name string,
  yob int,
  status boolean
)
ROW FORMAT SERDE 'com.mongodb.hadoop.hive.BSONSerDe'
  WITH SERDEPROPERTIES(
    'mongo.columns.mapping'='{"name" : "name", "yob" : "yob", "status" : "status" ')
STORED AS
  INPUTFORMAT 'com.mongodb.hadoop.mapred.BSONFileInputFormat'
  OUTPUTFORMAT 'com.mongodb.hadoop.hive.output.HiveBSONFileOutputFormat'
LOCATION '/data/person';

Connect to MongoDB database

This is equivalent to the BSON example except that we connect to a live database. As you add more documents to your MongoDB database they automatically appear in the Hive table! Note here my earlier complaint about having multiple ways in doing the same thing. The BSON example uses the STORED AS, INPUTFORMAT and OUTPUTFORMAT keywords whereas this example uses STORED BY. This distinction is not very intuitive.

CREATE external TABLE person (
  name string,
  yob int,
  age INT,
  status boolean
)
STORED BY 'com.mongodb.hadoop.hive.MongoStorageHandler'
WITH SERDEPROPERTIES(
  'mongo.columns.mapping'='{"name" : "name", "yob" : "yob", "status" : "status" ')
TBLPROPERTIES(
  'mongo.uri'='mongodb://localhost:27017/test.persons');

Plugin Ecosystem

Having a well defined interface and productive abstractions has allowed a vigorous third-party plug-in ecosystem to flourish. Besides the text handler, Hive comes bundled with the following storage handlers:

  • Avro (Hive 0.9.1 and later)
  • ORC (Hive 0.11 and later)
  • RegEx
  • Thrift
  • Parquet (Hive 0.13 and later)

Some third-party implementations:

On the Nature of Failure in Distributed Systems

March 26, 2012

I just finished reading an excellent article by Jay Kreps of Voldemort fame titled Getting Real About Distributed System Reliability. This is a must read for anyone seriously interested in the operational reliability of NoSQL or Big Data systems. The main points of the article are that node failure is not independent and the importance of having solid operational procedures for your production environment.

The most frequent reasons for node failures are not random network blips but  software bugs that unfortunately often manifest themselves at the same time on all the deployed nodes.  Michael Stonebraker makes a similar point in Clarifications on the CAP Theorem and Data-Related Errors where he describes Bohrbugs – “when multiple data base replicas are available, the same transaction issued to the replicas will cause all of them to crash”. Even though the theoretical probability of node failure is higher as the cluster size increases, this is not the main problem for distributed system reliability.

Kreps  points out that the theoretical conclusions regarding reliability provided by the CAP theorem do not accurately translate into run-time reliability. Stonebraker makes a similar point: “the model does not deal with several important classes of errors, which a real world system administrator must cope with”.

This is not so much a problem with the CAP theorem per se. Satisfying the CAP theorem is a necessary but not sufficient condition. It is more about system design rather than implementation. On a side note regarding the CAP theorem, there is an excellent discussion by Daniel Abadi in Problems with CAP, and Yahoo’s little known NoSQL system of some of the problems of differentiating CAP’s Availability and Partion-tolerance, i.e what exactly is the distinction between an CA and CP system?

Kreps underscores the importance of operational procedures since no code is bug free. This is especially true of relatively immature NoSQL systems since they are very young and have not gone through a long hardening phase such as traditional databases. In any complex long-running system you are bound to have outages no matter how good your code is, and the real question to be asked is: how do you recover from failures? Of course this is not a novel idea, but it is worth keeping focus on this point especially when it comes from a reliable NoSQL authority.

The importance of system reliability and recovery is underscored by the rather embarrassing recent Azure outage due to February’s leap day. Due to a minor software bug, Azure was unable to launch new elastic VMs on February 29, 2012. This didn’t just happen on one machine but across the whole cluster. In other words, the single point of failure was a software bug! All the redundant hardware and power sources weren’t much good in this situation! See a good recap:  The Azure Outage: Time Is A SPOF, Leap Day Doubly So. The two main points to emphasize here are: the failure was not random and it was widespread, and the necessity of operational recovery procedures.

Finally Kreps raises the need for empirically verifiable tests for  NoSQL products. Having spent an inordinate amount of time evaluating NoSQL systems (think cross-country drive instead of a mere drive around the block) this point strongly resonates with me. Distributed systems are notoriously complex beasts since they have to deal with failure in a fundamentally different way than single server systems. NoSQL vendors would be doing the community and themselves a great service if they would make public their high-volume tests and benchmarks. I would be delighted to see these test results posted regularly on public CI build dashboards. In order for NoSQL systems to be more widely accepted in the enterprise community, more transparency is needed regarding scalability issues.  Concrete comparable benchmarks are needed.

A step in this direction would be to develop some sort of standardized NoSQL equivalent of TPC benchmarks. For example, to start off with, implement a series of high-volume and high-throughput tests with a key/value data model. Kick off a few million writes, then reads, some read/writes, deletes along with different record and key sizes and see what happens. Set up a cluster and then start killing and restarting nodes and see if the system behaves correctly and performantly. I’ve put a lot of effort in such a framework called vtest and I know its not easy. By publicly exposing these kind of tests, vendors would only increase confidence in their products.

Hadoop Ecosystem Happenings

November 10, 2011

These are exciting times for the Hadoop ecosystem. Recently there has been a flurry of activities that can be broadly categorized into the following groups:

  • Hadoop services – Cloudera and Hortonworks
  • Hadoop internals – MapR and Hadapt
  • Big vendor adoption – Oracle, IBM, Microsoft, EMC

Hadoop Services Sparks

The launching of Hortonworks last summer has certainly given the established Cloudera a run for its money. If I had to summarize the situation in a nutshell it would be: Cloudera with an established customer base + Doug Cutting vs. Hortonworks with the Yahoo Hadoop development team and lots of promise/potential. Quite an interesting match here – sort of like a Hadoop version of  Thrilla in Manila (RIP Joe Frazier). Hortonworks has finally released its distribution Hortonworks Data Platform.

Some good reads:

And then there is the animated discussion on who contributes more to the Hadoop source repo – e.g.  number of patches vs. lines of code! Very entertaining stuff .

Hadoop Internals Improvement

Despite its runaway success, Hadoop does have technical shortcomings regarding performance and dependability (e.g. NameNode as single point of failure).  For example, see a good article regarding  JobTracker Next Generation of Apache Hadoop MapReduce – The Scheduler. These problems with Hadoop has provided an opportunity for  several new companies (MapR and Hadapt) to deliver proprietary enhancements to address some of the weaknesses.

The widespread adoption of Hive has significantly expanded the end user base of Hadoop, but at the same time it has put a strain on many Hadoop installations. The very expressiveness of Hive has empowered business folks to rather easily tap into the data store. These days the query barrier of entry is low. Accordingly, load has correspondingly significantly increased. Anecdotally, I have personally witnessed individual queries that can result up to a hundred map-reduce jobs! Shops too often blindly throw more hardware at the problem without first performing root analysis of performance issues. There is definitely a business opening here for more performant solutions. As Hadoop becomes more established in the enterprise, higher quality – faster and more reliable – features are in demand.

Map

Earlier this year MapR started shipping two versions of its Hadoop distribution – the free M3 and more advanced M5 for which it charges. A major emphasis is on fixing Hadoop’s single points of failure.

Key features:

  • Distributed highly availabile NameNode
  • NFS support – can mount clusters as NFS volumes
  • Heatmap management console
  • Unlimited number of files
  • Point-in-time snapshots
  • User, job provisioning, quotas, LDAP integration (now that’s classy!)
  • 100% compatibility with Hadoop

Hadapt

Hadapt is a new startup  based on some very interesting work at Yale in the area of advanced database technology (parallel databases, column stores). Early access to its flagship product, Hadapt Adaptive Analytic Platform, was just announced the other day. See  the good article at dbms2 on the latest Hadapt news:  Hadapt happenings Hadapt is moving forward.

Key features:

  • Integrates Hadoop with a database storage engine
  • Universal SQL support – instead of the more general “standard” Hive query language, Hadapt provides a more tightly integrated SQL interface with the database engine (e.g. Postgres).
  • Multi-structured analytics – analysis of both structure and unstructured data
  • Adaptive query execution technology provides on-the-fly load balancing and fault tolerance.
  • An emphasis on cloud and virtualized load balancing – resonates with me.

This is a good example of new hybrid solution emerging in the polyglot persistence space. One should not think of  NoSQL and RDBMS as mutually exclusive propositions. Integrating current analytical tools to new NoSQL sources of data is a promising devleopment.

The use of a RDBMS as the underlying data store strikes me as familiar. On one of my recent NoSQL projects for a major telecom, we decided to replace Voldemort’s default Berkeley DB storage engine with MySQL since the latter (purportedly) benchmarked as faster. No SQL or transactions involved – just retrieval speed for a key/value data model.

Interestingly, even MySQL is now offering a “NoSQL” solution – direct access to the NDB C++ engine via a Memcached interface!

For more information on the theoretical and technical underpinnings of Hadapt, see the background papers at Daniel Abadi’s publications page. Also recommended is his thought-provoking – readable but rigorous – dbmusings blog. There are spot-on discussions on eventual consistency and  CAP theorem design trade-offs as well as other great articles.

One interesting TODO project would be to perform a more rigorous and complete comparison of the MapR and Hadapt products. Firstly you would do a feature set gap analysis. How are the same common problems addressed? Then you would look at the unique value adds that each vendor provides. A more complete analysis would entail running non-trivial comparison performance tests but of course this would require a major investment in hardware and time resources. You could perhaps start with some kind of TeraSort benchmark comparison of the two.

Piccolo – Dov’è?

One a side note of interest, last winter there was a big splash about the Piccolo project which promised to surpass Hadoop performance. Rather mysteriously I haven’t seen any activity or news about them since then. Getting an article in the New York Times is quite a significant achievement  – I wonder what has happened.

Spring Configuration – Selecting An Alternate Implementation

November 9, 2011

A common recurring pattern in software development is the need to select at runtime a specific instance of an interface. This instance can either be a distinct implementation class of an interface or the same class  but instantiated with different properties. Spring provides unparalleled abilities to define different bean instances. These can be categorized as following:

  • Each bean is a different implementation class of the interface.
  • Each bean is the same implementation class but has different configuration.
  • A mixture of the two above.

The canonical example is selecting a mock implementation for testing instead of the actual target production implementation. However there are often business use cases where alternate providers need to be selectively activated.

The goal is to externalize the selection mechanism by providing a way to toggle the desired bean name. We want to avoid  manually commenting/uncommenting bean names inside a Spring XML configuration file. In other words, the key question is: how to toggle the particular implementation?

A brief disclaimer note: this pattern is most applicable to Spring 3.0.x and lower. Spring 3.1 introduces some exciting new features such as bean definition profiles dependent upon different environments. See the following articles for in-depth discussions:

There are two variants of this pattern:

  • Single Implementation – We only need one active implementation at runtime.
  • Multiple Implementations – We need several implementations at runtime so the application can dynamically select the desired one.
Assume we have the following interface:
  public interface NoSqlDao<T extends NoSqlEntity>  {
     public void put(T o) throws Exception;
     public T get(String id) throws Exception;
     public void delete(String id) throws Exception;
  }

  public interface UserProfileDao extends NoSqlDao<UserProfile> {
  }

Assume two implementations of the interface:

  public class CassandraUserProfileDao<T extends UserProfile>
    implements UserProfileDao

  public class MongodbUserProfileDao<T extends UserProfile>
    implements UserProfileDao

Single Loaded Implementation

In this variant of the pattern, you only need one implementation at runtime. Let’s assume that the name of the bean we wish to load is userProfileDao.

  ApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml");
  UserProfileDao userProfileDao = context.getBean("userProfileDao",UserProfileDao.class);

The top-level applicationContext.xml file contains common global beans and an import statement for the desired provider. The value of the imported file is externalized as a property called providerConfigFile. Since each provider file is mutually exclusive, the bean name is the same in each file.

  <beans>
    <bean id="propertyConfigurer"
          class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
      <property name="location" value="classpath:context-PropertyOverrideConfigurer.properties" />
      <property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE" />
    </bean>
    <import resource="${providerConfigFile}"/>
  </beans>

The provider-specific configuration files are:

  applicationContextContext-cassandra.xml
  applicationContextContext-mongodb.xml
  applicationContextContext-redis.xml
  applicationContextContext-riak.xml
  applicationContextContext-membase.xml
  applicationContextContext-oracle.xml

For example (note the same bean name userProfileDao):

  applicationContext-cassandra.xml

    <bean id="userProfileDao" class="com.amm.nosql.dao.cassandra.CassandraUserProfileDao" >
      <constructor-arg ref="keyspace.userProfile"/>
      <constructor-arg value="${cassandra.columnFamily.userProfile}"/>
      <constructor-arg ref="userProfileObjectMapper" />
    </bean> 

  applicationContext-mongodb.xml

    <bean id="userProfileDao" class="com.amm.nosql.dao.mongodb.MongodbUserProfileDao">
      <constructor-arg ref="userProfile.collectionFactory" />
      <constructor-arg ref="mongoObjectMapper" />
    </bean>

At runtime you need to specifiy the value for the property providerConfigFile.  Unfortunately with Spring 3.0. this has to be a system property and cannot be specified inside a properties file! This means it will work for a stand-alone Java application but not for a WAR unless you pass the value externally to the web server as a system property. This problem has been allegedly fixed in Spring 3.1 (I didn’t notice it working for 3.1.0.RC1). For example:

  java
    -DproviderConfigFile=applicationContextContext-cassandra.xml
    com.amm.nosql.cli.UserProfileCli

Multiple Loaded Implementations 

With this variant of the pattern, you will need to have all implementations loaded into your application context so you can later decide which one to choose. Instead of one import statement,  applicationContext.xml is imports all implementations.

  <import resource="applicationContextContext-cassandra.xml />
  <import resource="applicationContextContext-mongodb.xml />
  <import resource="applicationContextContext-redis.xml />
  <import resource="applicationContextContext-riak.xml />
  <import resource="applicationContextContext-membase.xml />
  <import resource="applicationContextContext-oracle.xml />

Since you have one namespace, each implementation has to have a unique bean name for the UserProfileDao implementation. Using our previous example:

applicationContext-cassandra.xml

  <bean id="cassandra.userProfileDao" class="com.amm.nosql.dao.cassandra.CassandraUserProfileDao" >
    <constructor-arg ref="keyspace.userProfile"/>
    <constructor-arg value="${cassandra.columnFamily.userProfile}"/>
    <constructor-arg ref="userProfileObjectMapper" />
  </bean> 

applicationContext-mongodb.xml

  <bean id="mongodb.userProfileDao" class="com.amm.nosql.dao.mongodb.MongodbUserProfileDao">
    <constructor-arg ref="userProfile.collectionFactory" />
    <constructor-arg ref="mongoObjectMapper" />
  </bean>

Then inside your Java code you need to have a mechanism to select your desired bean, e.g. load either cassandra.userProfileDao or mongodb.userProfileDao. For example, you could have a test UI containing a dropdown list of all implementations. Or you might have a case where you even had a need to access two different NoSQL stores via a UserProfileDao interface.

ALT.NET and Cassandra

October 27, 2011

On a lark, tonight I attended an ALT.NET presentation on Cassandra at our local Microsoft Nerd Center. Kudos to Microsoft for hosting all these great meetings – its a real service to the broader developer community.

Coming from a Java perspective, I found this meeting especially interesting since it was all about .NET. It is always informative to see how other different developer communities are using NoSQL technologies.

The speaker, John Zablocki, emphasized how his goal with the ALT.NET group is to introduce open source projects to Microsoft developers. He was an engaging and entertaining speaker with a background in Mongo, CouchDB and Cassandra. I really enjoyed his joke how DB admins are increasing global warming by being so highly paid and thus buying bigger houses, consuming more resources, etc. 🙂

One of the major outcomes of the meeting for me was my increased awareness of Microsoft Azure. I’ve definitely added a  new item on my to-learn-more queue regarding Azure and more specifically their key/value consistent store called Windows Azure Table. There were several Microsoft folks involved with Azure that imparted some great information. They seem  eager to hook up with greater No SQL community – things are a happening here.

The first part of the talk was on general No SQL background stuff – a bit redundant to the knowledgeable ones. But I always find reviews productive – there is always something new to learn, a slight new angle from a different perspective or a new nugget to store in ye old gray matter. We certainly had fun with Cassandra’s sub-par cassandra-cli shell!

Regarding specific Windows issues, I asked if there were any Windows-based Cassandra deployments out there but no one knew of any. John was obviously very much in-the-know on the three .NET Cassandra client drivers and according to him Fluent Cassandra is the best choice since it most true to the .NET programming paradigm. I was rather surprised at the number of clients for .NET compared to Java!

Finally John was kind enough to raffle a copy of Cassandra The Definitive Guide book to the person who knew who the author of the CAP theorem was. Hint: he recently joined the board of Riak! I volunteered the answer, and lo and behold I now have to update my Cassandra knowledge with “out-of-date” knowledge. How quickly tool-specific information is outdated! The book was published not even a year ago (Nov. 2010) and deals with version 7 whereas the current version is 1.0! Nevertheless, I am confident that there is plenty worthwhile core architectural information to be learned.

As a sidebar, with the tragic demise of Borders bookstore, I had somewhat guiltily  opted to buy a copy of the classical book Principles of Transaction Processing, Second Edition instead of the Cassandra book since the pages-to-price-to-knowledge ratio was higher for the former than the latter. Now my conscience is clear-er! Of particular interest is Chapter 9 Replication which presents a priceless rigorous exposition of replication – from master/slave to eventual consistency. A must read for the No SQL professional!

MongoDB and Cassandra Cluster Failover

October 21, 2011

One of the most important features of a scalable clustered NoSQL store is how to handle failover. The basic question is: is failover seamless from the client perspective? This is not always immediately apparent from vendor documentation especially for open-source packages where documentation is often wanting. The only way to really know is to run your own tests – to both verify vendor claims and to fully understand them. Caveat emptor – especially if the product is free!

Mongo and Cassandra use fundamentally different approaches to clustering. Mongo is based on the classical master/slave model (similar to MySQL) whereas Cassandra is a peer-to-peer system modeled on the eventual consistency paradigm pioneered by Amazon’s Dynamo system. This difference has specific ramifications regarding failover capabilities. The design trade-offs regarding the CAP theorem are described very well in the dbmusings blog post Overview of the Oracle NoSQL Database (Section CAP).

For Mongo, the client can only write to the master and therefore it is a single point of failure. Until a new master is elected from the secondaries, the cluster will not be reachable for writes.

For Mongo reads, you have two options: either talk to the master or to the secondaries.  The default  is to read from the master, and you will be subject to the same semantics as for writes. To enable reading from secondaries, you can call Mongo.slaveOk() for you client driver. For details see here.

For Cassandra, as long  as your selected consistency policy is satisfied, failing nodes will not prevent client access.

Mongo Setup

I’ll illustrate the issue with a simple three node cluster. For Mongo, you’ll need to create a three-node replica set which is well described on the Replica Set Tutorial page.

Here’s a convenience shell script to launch a Mongo replica set.

  dir=/work/server-data/mongo-cluster
  mv nohup.out old-nohup.out
  OPTS="--rest --nojournal --replSet myReplSet"
  nohup mongod $OPTS --port 27017 --dbpath $dir/node0  &
  nohup mongod $OPTS --port 27018 --dbpath $dir/node1  &
  nohup mongod $OPTS --port 27019 --dbpath $dir/node2  &

Cassandra Setup

For Cassandra, create a keyspace with replication factor of 3 in your schema definition file.

  create keyspace UserProfileKeyspace
    with strategy_options=[{replication_factor:3}]
    and placement_strategy = 'org.apache.cassandra.locator.SimpleStrategy';

Since I was using the standard Java Hector client along with Spring, I’ll highlight some of the key Spring bean configurations. The important point to note is the consistency policy must be quorum which means that for an operation (read or write) to succeed, two out of the three nodes must respond.

Properties File

  cassandra.hosts=host1,host2,host3
  cassandra.cluster=MyCluster
  cassandra.consistencyLevelPolicy=quoromAllConsistencyLevelPolicy

Application Context File

  <bean id="userProfileKeyspace"
        class="me.prettyprint.hector.api.factory.HFactory"
        factory-method="createKeyspace">
    <constructor-arg value="UserProfileKeyspace" />
    <constructor-arg ref="cluster"/>
    <property name="consistencyLevelPolicy" ref="${cassandra.consistencyLevelPolicy}" />
  </bean>

  <bean id="cluster"
        class="me.prettyprint.cassandra.service.ThriftCluster">
    <constructor-arg value="${cassandra.cluster}"/>
    <constructor-arg ref="cassandraHostConfigurator"/>
  </bean>

  <bean id="cassandraHostConfigurator"
        class="me.prettyprint.cassandra.service.CassandraHostConfigurator">
     <constructor-arg value="${cassandra.hosts}"/>
  </bean>

  <bean id="quorumAllConsistencyLevelPolicy"
        class="me.prettyprint.cassandra.model.QuorumAllConsistencyLevelPolicy" />

  <bean id="allOneConsistencyLevelPolicy"
        class="me.prettyprint.cassandra.model.AllOneConsistencyLevelPolicy" />

Test Scenario

The basic steps of the test are as follows. Note the Mongo example assumes you do not have slaveOk turned on.

  • Launch cluster
  • Execute N requests where N is a large number such as 100,000 to give your cluster time to fail over. The request is either a read or write.
  • While your N requests are running, kill one of the nodes. For Cassandra this can be any node since it is peer-to-peer. For Mongo, kill the master node.
  • For Cassandra, there will be no exceptions. If your requests are inserts, you will be able to subsequently retrieve them.
  • For Mongo, your requests will fail until the secondary is promoted to the master. This happens for both writes and reads. The time window is “small”, but depending upon your client request rate, the number of failed requests can be quite a few thousand! See the sample exceptions below.
With Mongo, the client can directly access only the master node. The secondaries can only be accessed by the master and never directly by the client. With Cassandra, as long as the minimum number of nodes as specified by the quorum can be reached, your operation will succeed.

Mongo Put Exception Example

I really like the message “can’t say something” – sort of cute!

com.mongodb.MongoException$Network: can't say something
    at com.mongodb.DBTCPConnector.say(DBTCPConnector.java:159)
    at com.mongodb.DBTCPConnector.say(DBTCPConnector.java:132)
    at com.mongodb.DBApiLayer$MyCollection.update(DBApiLayer.java:343)
    at com.mongodb.DBCollection.save(DBCollection.java:641)
    at com.mongodb.DBCollection.save(DBCollection.java:608)
    at com.amm.nosql.dao.mongodb.MongodbDao.put(MongodbDao.java:48)

Mongo Get Exception Example

com.mongodb.MongoException$Network: can't call something
    at com.mongodb.DBTCPConnector.call(DBTCPConnector.java:211)
    at com.mongodb.DBTCPConnector.call(DBTCPConnector.java:222)
    at com.mongodb.DBTCPConnector.call(DBTCPConnector.java:231)
    at com.mongodb.DBApiLayer$MyCollection.__find(DBApiLayer.java:303)
    at com.mongodb.DBCursor._check(DBCursor.java:360)
    at com.mongodb.DBCursor._next(DBCursor.java:442)
    at com.mongodb.DBCursor.next(DBCursor.java:525)
    at com.amm.nosql.dao.mongodb.MongodbDao.get(MongodbDao.java:38)

Mongo Get with slaveOk() 

If you invoke Mongo.slaveOk() for your client driver, then your reads will not fail if a node goes down. You will get the following warning.

Oct 30, 2011 8:12:22 PM com.mongodb.ReplicaSetStatus$Node update
WARNING: Server seen down: localhost:27019
java.net.SocketException: Connection reset by peer: socket write error
        at java.net.SocketOutputStream.socketWrite0(Native Method)
        at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:92)
        at java.net.SocketOutputStream.write(SocketOutputStream.java:136)
        at org.bson.io.PoolOutputBuffer.pipe(PoolOutputBuffer.java:129)
        at com.mongodb.OutMessage.pipe(OutMessage.java:160)
        at com.mongodb.DBPort.go(DBPort.java:108)
        at com.mongodb.DBPort.go(DBPort.java:82)
        at com.mongodb.DBPort.findOne(DBPort.java:142)
        at com.mongodb.DBPort.runCommand(DBPort.java:151)
        at com.mongodb.ReplicaSetStatus$Node.update(ReplicaSetStatus.java:178)
        at com.mongodb.ReplicaSetStatus.updateAll(ReplicaSetStatus.java:349)
        at com.mongodb.ReplicaSetStatus$Updater.run(ReplicaSetStatus.java:296)
Oct 30, 2011 8:12:22 PM com.mongodb.DBTCPConnector _set

Cassandra Users Group Impressions

October 20, 2011

Tonight I attended my first local Cassandra Users Group meetup. The wet walk in the rain sure was worth it!

About half a dozen people attended – most had a Cassandra deployment in production and all had something interesting to say. No fluff here tonight. Everyone had a high-level of knowledge of EC – it was good not to be the big fish in the NoSQL pond so I could learn from folks out in the field.

Sidebar: Cassandra 1.0 was released yesterday! I wonder what the significance of the release number jump from 0.8.7 to 1.0 is – a new item on my TODO queue.

Here follows a synopsis.

Company B

  • 5 TB of data
  • 21 nodes on AWS – spiked to 42 once – now back to 21
  • 90% writes – hence Cassandra is ideal
  • Two column families:
    • one column – basic key/value
    • wide column – key is user ID and columns are his/her emails

Company C

  • 72 nodes
  • 2 data centers

Company Z

  • Cassandra 0.7.6
  • 4 nodes
  • Raid 0 disks – redundancy is achieved through Cassandra clustering
  • 21 million writes a day
  • 250 GB – 7 million XML files ranging from 4 to 450 MB in size
  • Column family: key/value
  • Replication factor is 3
  • 80 concurrent write threads, 160 read threads
  • QuickLZ compression down to 20%
  • Migrated from Mogile FS

Other

Regarding adding new nodes to a live cluster, no one had any negative experiences. For RandomPartitioner, several folks said it was best to double the number of nodes assuming your company can foot the bill. There was apparently no negative impact in rebalancing overhead related to shifting data to the new nodes.

Two folks were using manual server token assignments for a RandomPartitioner – a third person was not keen on this. Asssuming you are using a MD5 hash, you simply divide the 128 bit name space by the number of servers. This apparently was better than letting Cassandra compute the tokens. This was OK even if you added a new node(s) and had to recompute server tokens.

Hadoop integration was important for many – and there was nothing but praise for DataStax’s Brisk tool that integrates Hadoop and Cassandra. Instead of using the standard HDFS (Hadoop Distributed File System) you can simply point Hadoop to use Cassandra. Two advantages to this are:

  • Random access of Cassandra is  faster than HDFS
  • Avoid HDFS SPOF (single point of failure)

One company was very happy with Brisk’s seamless Hive integration. Several people expressed issues with the apparent inability to execute map reduce jobs on more than one column.

Regarding client drivers, the Java users all concurred on the horrors of the accessing Cassandra via the complex Thrift-based data model. Hector marginally alleviates this but it is still painful. For example, compare this to Mongo’s extremely intuitive JSON-based interface! Two companies were using customized Ruby/JRuby gems.  I learned that a previously advertised Cassandra option for Avro in addition to Thrift had been dropped.

There was some discussion of other NoSQL vendors – namely MongoDB and Riak. One company was happily using MongoDB. Others had naturally gravitated to Cassandra because of its core competencies of failover robustness and scalability.  The fact that MongoDB used a master/slave paradigm for replication was an issue for some – the master is a SPOF for writes. People also commented that Cassandra cluster was easier to manage since Cassandra has only one process per node whereas with sharded MongoDB you need to launch several processes: mongod, mongos, and config.

MongoDB vs. Cassandra Benchmarks

October 19, 2011

Cassandra and Mongo passed the first set of winnowing tests – both in terms of speed and functionality. In order to provide more data for an informed decision, I subjected both to a series of high throughput tests. The test suite was a total of 120 million operations (insert, update, get) – 12 runs of 10 million requests.

For those impatient for the ultimate conclusion, the executive summary is: Cassandra writes are faster, Mongo reads are faster. But what is interesting is how the latencies evolve over time, i.e. over the course of 120 million requests.

Overview of test suite

  • Remove all disk data of the persistent store with rm -rf
  • Start the NoSQL server
  • Maintain a constant throughput of 20 or 50 concurrent client threads with no rampup time
  • Insert N records
  • Get N records – this will reflect any caching or compaction (e.g. Cassandra SSTables)
  • Get N records – this should reflect already compacted tables – should be faster
  • Get N records – should correspond to previous get
  • Update N keys with different sized values (value seed). We want to make sure we’re exercising Mongo compaction
  • Repeat the three get runs again

Results Commentary

When one result is faster than another, the important question to always ask is: how important is the difference? Does it really matter? Is the slower system still fast enough? If the lower value still satisfies your requirements, then the difference is of little signficance and other factors weigh more heavily on the ultimate decision.

In order to interpret results, it is important to have a basic understanding of some statistics. You should not rely solely on the mean, but also pay particular attention to latency percentiles.  Percentiles give you an idea of the distribution of results which is especially important when predictability is required. Your SLA might say:  P reads MUST be less than 10ms where P is the percentage, e.g. 99% or 100%. This of course raises the question what is to be done for those operations that exceed N, but that is another discussion. As you can see, there are some real whoppers – note the Max value for put – 37 seconds for MongoDB and the 175 seconds for Cassandra!

As the tables below indicate,  Cassandra was a winner for puts – both inserts and updates. For the initial 10M puts, the distribution is interesting – 50% of Mongo puts were under 1ms while for Cassandra it was 2ms. But for the 90-th percentile, Mongo took 20ms but Cassandra was 4ms.

For the subsequent two batches of 10M puts (inserts), Cassandra’s throughput was 4054 and 5868 – quite a bit of variability! Note the max value for a put was 174 seconds – must have been one heck of a GC! Mongo’s two insert batches ranged from 1753 to 2197 puts per second. 90% of Cassandra puts are finishing under 4ms, whereas 90% of Mongo puts are  under 17ms.

Cassandra’s superior write numbers are  not surprising since it is optimized for faster writes than reads. See here. In order to avoid expensive disk seeks, writes are appended to sequential files called SSTables. However the hit is taken on reads, where potentially the system has to examine several SSTables for updated values. Unfortunately, for our needs it fast writes were of more importance.

For reads the situation is quite different and Mongo comes out ahead. For the first three get batches Cassandra is giving us between 2300-2600 gets per second. Mongo is in the predictable 3100s. Both of these fall within our target SLA and are roughly comparable.

However, its a completely different story for reads after we do inserts! Look at the numbers: Mongo is slightly slower at 2999 and 3099, but Cassandra drops tenfold to 294 and 350!  Que caramba! I’m assuming this is because of the need to access multiple SSTables, but I would’ve assumed that at some point the values were merged into one file.

Perhaps there’s some tuning that needs to be done for Cassandra though I saw nothing obvious in my initial look at the documentation. All I can say with confidence is, is that un-tuned Cassandra get operations drastically slow down. I did invoke an explicit compaction using nodetool, and throughput for next 10 millions gets did improve to 1000/sec. But the caveat was that Cassandra documentation did not recommend this since automated compactions will not be run after a manual one! By the way, one of Mongo’s strong suits is that they strive to minimize the amount of configuration required to get a system running.

Context

In order to establish a baseline for the vendor’s capabilities, a non-clustered server was used. For Cassandra this was single node cluster, for Mongo simply one server with no replication. Cluster tests were run for functionality.

Test Configuration

  • Number of requests: 10,000,000
  • Object Mapper: Jackson JSON
  • Key size: 32
  • Value size: average is 1000 bytes, range from 100 to 1900
Server specs
  • Centos 5.5 Linux
  • Intel Xeon 2.4 MHz 64 bit processor
  • 8 processors, 2 cores each
  • 8 GB RAM
  • The client pinging the server averaged 0.2 ms
The columns are for the most part self-descriptive. The ValSeed column indicates the random seed for the value size. This is important since we want to exercise the vendor’s compaction algorithm by assigning differently-sized values to an updated key. Values are generated  by creating an N-sized String from a randomly set of  ASCII characters. Since we want to avoid equally sized value, we want to vary N  from 100 to 1800.
  int baseSize = 100 ;
  int range = 1800 ;
  Random random = new Random(valueSizeSeed)
  int valueSize = random.nextInt(range) + baseSize ;

MongoDB

  • Version: 2.0.0
  • Journaling: turned on
Test     Req/Sec   Millis    50%    90%    99%  99.5%  99.9%    Max     Mean  Err  Fail Thr ValSeed
Put         2114  4729454      1     20     43     60   1576  36232    9.442    0     0  20    1776
Get         3182  3142258      2     10     18     28    273   4684    6.268    0     0  20
Get         3182  3142850      2     10     18     26    229   3950    6.269    0     0  20
Get         3106  3219539      2     33     88    144   2120   3885   16.060    0     0  50
Put         1753  5706060      1     17     76    278   1985  44472   11.395    0     0  20    1812
Put         2197  4552045      1     17     52    182   1293  37299    9.087    0     0  20    1846
Get         2999  3333966      2     11     19     39    308   4380    6.651    0     0  20
Get         3039  3290874      2     10     19     41    289   4676    6.565    0     0  20
PutGet       907 11027045     14     28     70    150   2531   8886   22.036    0     0  20    1861
Get         2992  3342034      2     11     20     40    299   4299    6.666    0     0  20
Get         2975  3361762      2     11     20     38    301   4478    6.707    0     0  20
Get         2957  3381393      2     34    112    166   2160   4363   16.871    0     0  50

Cassandra – Version: 0.8.6

Test     Req/Sec   Millis    50%    90%    99%  99.5%  99.9%    Max     Mean  Err  Fail Thr ValSeed
Put         6045  1654219      2      4     16     24    160   4169    3.288    0     0  20    1776
Get         2334  4285208      1     14    111    150    427   9003    8.552    0     0  20
Get         2380  4202468      2     10    110    144    290   8380    8.387    0     0  20
Get         2616  3822749      8     40    177    226   1559  24092   19.069    0     0  50
Put         4054  2466583      2      4     17     26    194 174614    4.915    0     0  20    1812
Put         5768  1733745      2      4     16     24    172   4190    3.446    0     0  20    1846
Get          294 33972166     43    141    371    567   1737  67276   67.928    0     0  20
Get          350 28551712     39    116    256    354   1629  37371   57.087    0     0  20
PutGet       285 35124479     40    124    388    801   2384 456321   70.232    0     0  20    1861
Get          210 47730677     79    182    341    418   1141  36368   95.446    0     0  20
Get          211 47305663     79    180    335    409   1097 104592   94.595    0     0  20
Get          249 40157243    175    339    598    730   2164  77675  200.751    0     0  50

Variability Examples

Here are some examples demonstrating the variability in results. The examples are from identical tests in the same context.

Mongo

First 10M puts

Test     Req/Sec   Millis    50%    90%    99%  99.5%  99.9%    Max     Mean  Err  Fail  Thr
Put         2137  4679117      1     21     41     49   1388  40931    9.338    0     0  20
Put         2156  4639027      1     21     41     48   1450  36412    9.258    0     0  20
Put         2114  4729454      1     20     43     60   1576  36232    9.442    0     0  20
Put         2079  4810938      1     20     42     55   1354  38436    9.605    0     0  20

First 10M gets

Test     Req/Sec   Millis    50%    90%    99%  99.5%  99.9%    Max     Mean  Err  Fail  Thr

Get         3187  3138189      2     10     18     25    225   4665    6.260    0     0  20
Get         3115  3210200      2     10     18     26    258  13211    6.403    0     0  20
Get         3182  3142258      2     10     18     28    273   4684    6.268    0     0  20
Get         3141  3184198      2     10     18     27    271  22285    6.352    0     0  20

No SQL Data Store Evaluation

October 18, 2011

Recently I’ve been doing some extensive and intensive evaluation of various No SQL implementations capable of storing 600,000 key/value items with a latency preferably under 50 milliseconds. Extensive in the sense of examining many providers and intensive in drilling down to test key features of each system. The specific requirements precluded looking at all stores, but a number of quite unique systems – each with its strong personality – were examined in substantial detail.

The initial requirements were:

  • Eventually be able to store six months of user profiles for ad engagements
  • Insertion of bout 3-5 million records daily – an equivalent amount had to be deleted
  • Key/value access – no need for range or index queries
  • All keys were random MDA hashes
  • Retrieval latency under 50 ms
  • Timestamp expiration of records
  • Reads needed to be very fast – writes could be batched and delayed
  • Ability to manipulate nodes of to live cluster, e.g. add a new node without restart
In order to execute the evaluation I developed a suite of structured high load tests that could be run against all providers.  Besides raw benchmarks there were also tests for key technical issues such as failover behavior. It was not possible for all implementations to be comparable for all features. Obviously providers made different design and feature trade-offs – that’s their unique personality.
Some topics examined:
  • Predictability – we need low variability in response time – latency percentiles are helpful here.
  • Repeatibility – we want to run the same test in the same context repeatedly with same results subject to an obvious margin or error. Establishing a consistent margin is a difficult chore.
  • Consistency model – eventual consistency and/or immediate. How is the CAP theorem handled?
  • Failover – when a node dies what happens to reads and writes?
  • Sharding – ability to shard data and key partitioning strategies.
  • Compaction policy – how is data stored, how are updates and deletions handled (tombstones). If implemented in Java, how much of a hist is garbage collection? I
The following systems were evaluated, with the first two as  the final candidates.
  • Cassandra
  • MongoDB
  • Membase
  • Riak
  • Redis
In addition the idea of a bank of caching servers fronting the NoSQL stores was examined – Memcached, Ehcache REST server, and custom REST servers with a pluggable cache engine (Ehcache, Infinispan). In some cases the NoSQL stores were so performant (MongoDB) that a separate caching layer was not immediately necessary. Perhaps under greater volume it would be. They key was to have a provisional caching strategy as a reserve.

MongoDB is very fast and reliable and it made the first cut. Membase also proved fast, but it had some reliability issues especially in server restarts. Cassandra provided the full set of features we needed though it was significantly slower than MongoDB.  Since we were testing with a constant client thread count of 20 or 50, being able to stay alive and keep up with this rate was important.

One of the frustrating aspects was the difficulty in obtaining adequate or minimal hardware. Since the raison de etre of the project was high volume, it was important to test with as large numbers as possible. Differences between low and high volume scenarios can give you drastically different results and lead you to costly wrong conclusions. There is also a natural tension between reasonable turnaround time for results and the volume and rate of data testing.  For example, to run a standard  melange of CRUD operations (inserts, gets, updates) 14 million times, MongoDB takes 20 hours versus 73 hours for Cassandra. That’s a lot of time to wait for the results of a configuration toggle switch!

One of the great parts of the project was the ability to study in detail key features of  systems and then implement test scenarios that exercised these features. A good example of a praxis: applying solidly founded ideas into practice. One has to first discover and understand what has to be tested. Comparing eventual consistency systems such as Cassandra and Riak is a challenging endeavor that stimulates the intellect. For instance, how do Bloom filters (slight risk of false positives) obviate the need for disk access? How do go about testing to see if this is true? Bloom filters are even used in DNA sequencing – see here. Then throw in MongoDB’s differing but overlapping approach, and you get quite a rich NoSQL stew. Documentation and articulation of key features was an important evaluation criterion.