Archive for the ‘Cassandra’ Category

ALT.NET and Cassandra

October 27, 2011

On a lark, tonight I attended an ALT.NET presentation on Cassandra at our local Microsoft Nerd Center. Kudos to Microsoft for hosting all these great meetings Рits a real service to the broader developer community.

Coming from a Java perspective, I found this meeting especially interesting since it was all about .NET. It is always informative to see how other different developer communities are using NoSQL technologies.

The speaker, John Zablocki, emphasized how his goal with the ALT.NET group is to introduce open source projects to Microsoft developers. He was an engaging and entertaining speaker with a background in Mongo, CouchDB and Cassandra. I really enjoyed his joke how DB admins are increasing global warming by being so highly paid and thus buying bigger houses, consuming more resources, etc. ūüôā

One of the major outcomes of the meeting for me was my increased awareness of Microsoft¬†Azure.¬†I’ve definitely added a ¬†new item on my to-learn-more queue regarding Azure and more specifically their key/value consistent store called¬†Windows Azure Table. There were several Microsoft folks involved with Azure that imparted some great information. They seem ¬†eager to hook up with greater No SQL community – things are a happening here.

The first part of the talk was on general No SQL background stuff – a bit redundant to the knowledgeable ones. But I always find reviews productive – there is always something new to learn, a slight new angle from a different perspective or a new nugget to store in ye old gray matter. We certainly had fun with Cassandra’s sub-par cassandra-cli shell!

Regarding specific Windows issues, I asked if there were any Windows-based Cassandra deployments out there but no one knew of any. John was obviously very much in-the-know on the three .NET Cassandra client drivers and according to him Fluent Cassandra is the best choice since it most true to the .NET programming paradigm. I was rather surprised at the number of clients for .NET compared to Java!

Finally John was kind enough to raffle a copy of Cassandra The Definitive Guide book to the person who knew who the author of the CAP theorem was. Hint: he recently joined the board of Riak! I volunteered the answer, and lo and behold I now have to update my Cassandra knowledge with “out-of-date” knowledge. How quickly tool-specific information is outdated! The book was published not even a year ago (Nov. 2010) and deals with version 7 whereas the current version is 1.0! Nevertheless, I am confident that there is plenty worthwhile core architectural information to be learned.

As a sidebar, with the tragic demise of Borders bookstore, I had somewhat guiltily  opted to buy a copy of the classical book Principles of Transaction Processing, Second Edition instead of the Cassandra book since the pages-to-price-to-knowledge ratio was higher for the former than the latter. Now my conscience is clear-er! Of particular interest is Chapter 9 Replication which presents a priceless rigorous exposition of replication Рfrom master/slave to eventual consistency. A must read for the No SQL professional!

MongoDB and Cassandra Cluster Failover

October 21, 2011

One of the most important features of a scalable clustered NoSQL store is how to handle failover. The basic question is: is failover seamless from the client perspective? This is not always immediately apparent from vendor documentation especially for open-source packages where documentation is often wanting. The only way to really know is to run your own tests – to both verify vendor claims and to fully understand them. Caveat emptor – especially if the product is free!

Mongo and Cassandra use fundamentally different approaches to clustering. Mongo is based on the classical master/slave model (similar to MySQL) whereas Cassandra is a peer-to-peer system modeled on the eventual consistency paradigm pioneered by Amazon’s Dynamo system. This difference has specific ramifications regarding failover capabilities. The design trade-offs regarding the CAP theorem are described very well in the dbmusings blog post Overview of the Oracle NoSQL Database (Section CAP).

For Mongo, the client can only write to the master and therefore it is a single point of failure. Until a new master is elected from the secondaries, the cluster will not be reachable for writes.

For Mongo reads, you have two options: either talk to the master or to the secondaries.  The default  is to read from the master, and you will be subject to the same semantics as for writes. To enable reading from secondaries, you can call Mongo.slaveOk() for you client driver. For details see here.

For Cassandra, as long  as your selected consistency policy is satisfied, failing nodes will not prevent client access.

Mongo Setup

I’ll illustrate the issue with a simple three node cluster. For Mongo, you’ll need to create a three-node replica set which is well described on the Replica Set Tutorial page.

Here’s a convenience shell script to launch a Mongo replica set.

  dir=/work/server-data/mongo-cluster
  mv nohup.out old-nohup.out
  OPTS="--rest --nojournal --replSet myReplSet"
  nohup mongod $OPTS --port 27017 --dbpath $dir/node0  &
  nohup mongod $OPTS --port 27018 --dbpath $dir/node1  &
  nohup mongod $OPTS --port 27019 --dbpath $dir/node2  &

Cassandra Setup

For Cassandra, create a keyspace with replication factor of 3 in your schema definition file.

  create keyspace UserProfileKeyspace
    with strategy_options=[{replication_factor:3}]
    and placement_strategy = 'org.apache.cassandra.locator.SimpleStrategy';

Since I was using the standard Java Hector client along with Spring, I’ll highlight some of the key Spring bean configurations. The important point to note is the consistency policy must be quorum which means that for an operation (read or write) to succeed, two out of the three nodes must respond.

Properties File

  cassandra.hosts=host1,host2,host3
  cassandra.cluster=MyCluster
  cassandra.consistencyLevelPolicy=quoromAllConsistencyLevelPolicy

Application Context File

  <bean id="userProfileKeyspace"
        class="me.prettyprint.hector.api.factory.HFactory"
        factory-method="createKeyspace">
    <constructor-arg value="UserProfileKeyspace" />
    <constructor-arg ref="cluster"/>
    <property name="consistencyLevelPolicy" ref="${cassandra.consistencyLevelPolicy}" />
  </bean>

  <bean id="cluster"
        class="me.prettyprint.cassandra.service.ThriftCluster">
    <constructor-arg value="${cassandra.cluster}"/>
    <constructor-arg ref="cassandraHostConfigurator"/>
  </bean>

  <bean id="cassandraHostConfigurator"
        class="me.prettyprint.cassandra.service.CassandraHostConfigurator">
     <constructor-arg value="${cassandra.hosts}"/>
  </bean>

  <bean id="quorumAllConsistencyLevelPolicy"
        class="me.prettyprint.cassandra.model.QuorumAllConsistencyLevelPolicy" />

  <bean id="allOneConsistencyLevelPolicy"
        class="me.prettyprint.cassandra.model.AllOneConsistencyLevelPolicy" />

Test Scenario

The basic steps of the test are as follows. Note the Mongo example assumes you do not have slaveOk turned on.

  • Launch cluster
  • Execute N requests where N is a large number such as 100,000 to give your cluster time to fail over. The request is either a read or write.
  • While your N requests are running, kill one of the nodes. For Cassandra this can be any node since it is peer-to-peer. For Mongo, kill the master node.
  • For Cassandra, there will be no exceptions. If your requests are inserts, you will be able to subsequently retrieve them.
  • For Mongo, your requests will fail until the secondary is promoted to the master. This happens for both writes and reads. The time window is “small”, but depending upon your client request rate, the number of failed requests can be quite a few thousand! See the sample exceptions below.
With Mongo, the client can directly access only the master node. The secondaries can only be accessed by the master and never directly by the client. With Cassandra, as long as the minimum number of nodes as specified by the quorum can be reached, your operation will succeed.

Mongo Put Exception Example

I really like the message “can’t say something” – sort of cute!

com.mongodb.MongoException$Network: can't say something
    at com.mongodb.DBTCPConnector.say(DBTCPConnector.java:159)
    at com.mongodb.DBTCPConnector.say(DBTCPConnector.java:132)
    at com.mongodb.DBApiLayer$MyCollection.update(DBApiLayer.java:343)
    at com.mongodb.DBCollection.save(DBCollection.java:641)
    at com.mongodb.DBCollection.save(DBCollection.java:608)
    at com.amm.nosql.dao.mongodb.MongodbDao.put(MongodbDao.java:48)

Mongo Get Exception Example

com.mongodb.MongoException$Network: can't call something
    at com.mongodb.DBTCPConnector.call(DBTCPConnector.java:211)
    at com.mongodb.DBTCPConnector.call(DBTCPConnector.java:222)
    at com.mongodb.DBTCPConnector.call(DBTCPConnector.java:231)
    at com.mongodb.DBApiLayer$MyCollection.__find(DBApiLayer.java:303)
    at com.mongodb.DBCursor._check(DBCursor.java:360)
    at com.mongodb.DBCursor._next(DBCursor.java:442)
    at com.mongodb.DBCursor.next(DBCursor.java:525)
    at com.amm.nosql.dao.mongodb.MongodbDao.get(MongodbDao.java:38)

Mongo Get with slaveOk() 

If you invoke Mongo.slaveOk() for your client driver, then your reads will not fail if a node goes down. You will get the following warning.

Oct 30, 2011 8:12:22 PM com.mongodb.ReplicaSetStatus$Node update
WARNING: Server seen down: localhost:27019
java.net.SocketException: Connection reset by peer: socket write error
        at java.net.SocketOutputStream.socketWrite0(Native Method)
        at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:92)
        at java.net.SocketOutputStream.write(SocketOutputStream.java:136)
        at org.bson.io.PoolOutputBuffer.pipe(PoolOutputBuffer.java:129)
        at com.mongodb.OutMessage.pipe(OutMessage.java:160)
        at com.mongodb.DBPort.go(DBPort.java:108)
        at com.mongodb.DBPort.go(DBPort.java:82)
        at com.mongodb.DBPort.findOne(DBPort.java:142)
        at com.mongodb.DBPort.runCommand(DBPort.java:151)
        at com.mongodb.ReplicaSetStatus$Node.update(ReplicaSetStatus.java:178)
        at com.mongodb.ReplicaSetStatus.updateAll(ReplicaSetStatus.java:349)
        at com.mongodb.ReplicaSetStatus$Updater.run(ReplicaSetStatus.java:296)
Oct 30, 2011 8:12:22 PM com.mongodb.DBTCPConnector _set

Cassandra Users Group Impressions

October 20, 2011

Tonight I attended my first local Cassandra Users Group meetup. The wet walk in the rain sure was worth it!

About half a dozen people attended – most had a Cassandra deployment in production and all had something interesting to say. No fluff here tonight. Everyone had a high-level of knowledge of EC – it was good not to be the big fish in the NoSQL pond so I could learn from folks out in the field.

Sidebar: Cassandra 1.0 was released yesterday! I wonder what the significance of the release number jump from 0.8.7 to 1.0 is – a new item on my TODO queue.

Here follows a synopsis.

Company B

  • 5 TB of data
  • 21 nodes on AWS – spiked to 42 once – now back to 21
  • 90% writes – hence Cassandra is ideal
  • Two column families:
    • one column – basic key/value
    • wide column – key is user ID and columns are his/her emails

Company C

  • 72 nodes
  • 2 data centers

Company Z

  • Cassandra 0.7.6
  • 4 nodes
  • Raid 0 disks – redundancy is achieved through Cassandra clustering
  • 21 million writes a day
  • 250 GB – 7 million XML files ranging from 4 to 450 MB in size
  • Column family: key/value
  • Replication factor is 3
  • 80 concurrent write threads, 160 read threads
  • QuickLZ compression down to 20%
  • Migrated from Mogile FS

Other

Regarding adding new nodes to a live cluster, no one had any negative experiences. For RandomPartitioner, several folks said it was best to double the number of nodes assuming your company can foot the bill. There was apparently no negative impact in rebalancing overhead related to shifting data to the new nodes.

Two folks were using manual server token assignments for a RandomPartitioner Рa third person was not keen on this. Asssuming you are using a MD5 hash, you simply divide the 128 bit name space by the number of servers. This apparently was better than letting Cassandra compute the tokens. This was OK even if you added a new node(s) and had to recompute server tokens.

Hadoop integration was important for many – and there was nothing but praise for DataStax’s Brisk tool that integrates Hadoop and Cassandra. Instead of using the standard HDFS (Hadoop Distributed File System) you can simply point Hadoop to use Cassandra. Two advantages to this are:

  • Random access of Cassandra is ¬†faster than HDFS
  • Avoid HDFS SPOF (single point of failure)

One company was very happy with Brisk’s seamless Hive integration. Several people expressed issues with the apparent inability to execute map reduce jobs on more than one column.

Regarding client drivers, the Java users all concurred on the horrors of the accessing Cassandra via the complex Thrift-based data model. Hector marginally alleviates this but it is still painful. For example, compare this to Mongo’s extremely intuitive JSON-based interface! Two companies were using customized Ruby/JRuby gems. ¬†I learned that a previously advertised Cassandra option for Avro in addition to Thrift had been dropped.

There was some discussion of other NoSQL vendors Рnamely MongoDB and Riak. One company was happily using MongoDB. Others had naturally gravitated to Cassandra because of its core competencies of failover robustness and scalability.  The fact that MongoDB used a master/slave paradigm for replication was an issue for some Рthe master is a SPOF for writes. People also commented that Cassandra cluster was easier to manage since Cassandra has only one process per node whereas with sharded MongoDB you need to launch several processes: mongod, mongos, and config.

MongoDB vs. Cassandra Benchmarks

October 19, 2011

Cassandra and Mongo passed the first set of winnowing tests – both in terms of speed and functionality. In order to provide more data for an informed decision, I subjected both to a series of high throughput tests. The test suite was a total of 120 million operations (insert, update, get) – 12 runs of 10 million requests.

For those impatient for the ultimate conclusion, the executive summary is: Cassandra writes are faster, Mongo reads are faster. But what is interesting is how the latencies evolve over time, i.e. over the course of 120 million requests.

Overview of test suite

  • Remove all disk data of the persistent store with rm -rf
  • Start the NoSQL server
  • Maintain a constant throughput of 20 or 50 concurrent client threads with no rampup time
  • Insert N records
  • Get N records – this will reflect any caching or compaction (e.g. Cassandra SSTables)
  • Get N records – this should reflect already compacted tables – should be faster
  • Get N records – should correspond to previous get
  • Update N keys with different sized values (value seed). We want to make sure we’re exercising Mongo compaction
  • Repeat the three get runs again

Results Commentary

When one result is faster than another, the important question to always ask is: how important is the difference? Does it really matter? Is the slower system still fast enough? If the lower value still satisfies your requirements, then the difference is of little signficance and other factors weigh more heavily on the ultimate decision.

In order to interpret results, it is important to have a basic understanding of some statistics. You should not rely solely on the mean, but also pay particular attention to latency percentiles.  Percentiles give you an idea of the distribution of results which is especially important when predictability is required. Your SLA might say:  P reads MUST be less than 10ms where P is the percentage, e.g. 99% or 100%. This of course raises the question what is to be done for those operations that exceed N, but that is another discussion. As you can see, there are some real whoppers Рnote the Max value for put Р37 seconds for MongoDB and the 175 seconds for Cassandra!

As the tables below indicate,  Cassandra was a winner for puts Рboth inserts and updates. For the initial 10M puts, the distribution is interesting Р50% of Mongo puts were under 1ms while for Cassandra it was 2ms. But for the 90-th percentile, Mongo took 20ms but Cassandra was 4ms.

For the subsequent two batches of 10M puts (inserts), Cassandra’s throughput was 4054 and 5868 – quite a bit of variability! Note the max value for a put was 174 seconds – must have been one heck of a GC! Mongo’s two insert batches ranged from 1753 to 2197 puts per second. 90% of Cassandra puts are finishing under 4ms, whereas 90% of Mongo puts are ¬†under 17ms.

Cassandra’s superior write numbers are ¬†not surprising since it is optimized for faster writes than reads. See here. In order to avoid expensive disk seeks, writes are appended to sequential files called SSTables. However the hit is taken on reads, where potentially the system has to examine several SSTables for updated values. Unfortunately, for our needs it fast writes were of more importance.

For reads the situation is quite different and Mongo comes out ahead. For the first three get batches Cassandra is giving us between 2300-2600 gets per second. Mongo is in the predictable 3100s. Both of these fall within our target SLA and are roughly comparable.

However, its a completely different story for reads after we do inserts! Look at the numbers: Mongo is slightly slower at 2999 and 3099, but Cassandra drops tenfold to 294 and 350! ¬†Que caramba! I’m assuming this is because of the need to access multiple SSTables, but I would’ve assumed that at some point the values were merged into one file.

Perhaps there’s some tuning that needs to be done for Cassandra though I saw nothing obvious in my initial look at the documentation. All I can say with confidence is, is that un-tuned Cassandra get operations drastically slow down. I did invoke an explicit compaction using nodetool, and throughput for next 10 millions gets did improve to 1000/sec. But the caveat was that Cassandra documentation did not recommend this since automated compactions will not be run after a manual one!¬†By the way, one of Mongo’s strong suits is that they strive to minimize the amount of configuration required to get a system running.

Context

In order to establish a baseline for the vendor’s capabilities, a non-clustered server was used. For Cassandra this was single node cluster, for Mongo simply one server with no replication. Cluster tests were run for functionality.

Test Configuration

  • Number of requests: 10,000,000
  • Object Mapper: Jackson JSON
  • Key size: 32
  • Value size: average is 1000 bytes, range from 100 to 1900
Server specs
  • Centos 5.5 Linux
  • Intel Xeon 2.4 MHz 64 bit processor
  • 8 processors, 2 cores each
  • 8 GB RAM
  • The client pinging the server averaged 0.2 ms
The columns are for the most part self-descriptive. The ValSeed column indicates the random seed for the value size. This is important since we want to exercise the vendor’s compaction algorithm by assigning differently-sized values to an updated key.¬†Values are generated ¬†by creating an N-sized String from a randomly set of ¬†ASCII characters. Since we want to avoid equally sized value, we want to vary N ¬†from 100 to 1800.
  int baseSize = 100 ;
  int range = 1800 ;
  Random random = new Random(valueSizeSeed)
  int valueSize = random.nextInt(range) + baseSize ;

MongoDB

  • Version: 2.0.0
  • Journaling: turned on
Test     Req/Sec   Millis    50%    90%    99%  99.5%  99.9%    Max     Mean  Err  Fail Thr ValSeed
Put         2114  4729454      1     20     43     60   1576  36232    9.442    0     0  20    1776
Get         3182  3142258      2     10     18     28    273   4684    6.268    0     0  20
Get         3182  3142850      2     10     18     26    229   3950    6.269    0     0  20
Get         3106  3219539      2     33     88    144   2120   3885   16.060    0     0  50
Put         1753  5706060      1     17     76    278   1985  44472   11.395    0     0  20    1812
Put         2197  4552045      1     17     52    182   1293  37299    9.087    0     0  20    1846
Get         2999  3333966      2     11     19     39    308   4380    6.651    0     0  20
Get         3039  3290874      2     10     19     41    289   4676    6.565    0     0  20
PutGet       907 11027045     14     28     70    150   2531   8886   22.036    0     0  20    1861
Get         2992  3342034      2     11     20     40    299   4299    6.666    0     0  20
Get         2975  3361762      2     11     20     38    301   4478    6.707    0     0  20
Get         2957  3381393      2     34    112    166   2160   4363   16.871    0     0  50

Cassandra –¬†Version: 0.8.6

Test     Req/Sec   Millis    50%    90%    99%  99.5%  99.9%    Max     Mean  Err  Fail Thr ValSeed
Put         6045  1654219      2      4     16     24    160   4169    3.288    0     0  20    1776
Get         2334  4285208      1     14    111    150    427   9003    8.552    0     0  20
Get         2380  4202468      2     10    110    144    290   8380    8.387    0     0  20
Get         2616  3822749      8     40    177    226   1559  24092   19.069    0     0  50
Put         4054  2466583      2      4     17     26    194 174614    4.915    0     0  20    1812
Put         5768  1733745      2      4     16     24    172   4190    3.446    0     0  20    1846
Get          294 33972166     43    141    371    567   1737  67276   67.928    0     0  20
Get          350 28551712     39    116    256    354   1629  37371   57.087    0     0  20
PutGet       285 35124479     40    124    388    801   2384 456321   70.232    0     0  20    1861
Get          210 47730677     79    182    341    418   1141  36368   95.446    0     0  20
Get          211 47305663     79    180    335    409   1097 104592   94.595    0     0  20
Get          249 40157243    175    339    598    730   2164  77675  200.751    0     0  50

Variability Examples

Here are some examples demonstrating the variability in results. The examples are from identical tests in the same context.

Mongo

First 10M puts

Test     Req/Sec   Millis    50%    90%    99%  99.5%  99.9%    Max     Mean  Err  Fail  Thr
Put         2137  4679117      1     21     41     49   1388  40931    9.338    0     0  20
Put         2156  4639027      1     21     41     48   1450  36412    9.258    0     0  20
Put         2114  4729454      1     20     43     60   1576  36232    9.442    0     0  20
Put         2079  4810938      1     20     42     55   1354  38436    9.605    0     0  20

First 10M gets

Test     Req/Sec   Millis    50%    90%    99%  99.5%  99.9%    Max     Mean  Err  Fail  Thr

Get         3187  3138189      2     10     18     25    225   4665    6.260    0     0  20
Get         3115  3210200      2     10     18     26    258  13211    6.403    0     0  20
Get         3182  3142258      2     10     18     28    273   4684    6.268    0     0  20
Get         3141  3184198      2     10     18     27    271  22285    6.352    0     0  20

No SQL Data Store Evaluation

October 18, 2011

Recently¬†I’ve been doing some extensive and intensive evaluation of various No SQL implementations capable of storing 600,000 key/value items with a latency preferably under 50 milliseconds. Extensive in the sense of examining many providers and intensive in drilling down to test key features of each system.¬†The specific requirements precluded looking at all stores, but a number of quite unique systems – each with its strong personality – were examined in substantial detail.

The initial requirements were:

  • Eventually be able to store six months of user profiles for ad engagements
  • Insertion of bout 3-5 million records daily – an equivalent amount had to be deleted
  • Key/value access – no need for range or index queries
  • All keys were random MDA hashes
  • Retrieval latency under 50 ms
  • Timestamp expiration of records
  • Reads needed to be very fast – writes could be batched and delayed
  • Ability to manipulate nodes of to live cluster, e.g. add a new node without restart
In order to execute the evaluation I developed a suite of structured high load tests that could be run against all providers. ¬†Besides raw benchmarks there were also tests for key technical issues such as failover behavior. It was not possible for all implementations to be comparable for all features. Obviously providers made different design and feature trade-offs – that’s their unique personality.
Some topics examined:
  • Predictability – we need low variability in response time – latency percentiles are helpful here.
  • Repeatibility – we want to run the same test in the same context repeatedly with same results subject to an obvious margin or error. Establishing a consistent margin is a difficult chore.
  • Consistency model – eventual consistency and/or immediate. How is the CAP theorem handled?
  • Failover – when a node dies what happens to reads and writes?
  • Sharding – ability to shard data and key partitioning strategies.
  • Compaction policy – how is data stored, how are updates and deletions handled (tombstones). If implemented in Java, how much of a hist is garbage collection? I
The following systems were evaluated, with the first two as  the final candidates.
  • Cassandra
  • MongoDB
  • Membase
  • Riak
  • Redis
In addition the idea of a bank of caching servers fronting the NoSQL stores was examined – Memcached, Ehcache REST server, and custom REST servers with a pluggable cache engine (Ehcache, Infinispan). In some cases the NoSQL stores were so performant (MongoDB) that a separate caching layer was not immediately necessary. Perhaps under greater volume it would be. They key was to have a provisional caching strategy as a reserve.

MongoDB is very fast and reliable and it made the first cut. Membase also proved fast, but it had some reliability issues especially in server restarts. Cassandra provided the full set of features we needed though it was significantly slower than MongoDB.  Since we were testing with a constant client thread count of 20 or 50, being able to stay alive and keep up with this rate was important.

One of the frustrating aspects was the difficulty in obtaining adequate or minimal hardware. Since the raison de etre of the project was high volume, it was important to test with as large numbers as possible. Differences between low and high volume scenarios can give you drastically different results and lead you to costly wrong conclusions. There is also a natural tension between reasonable turnaround time for results and the volume and rate of data testing. ¬†For example, to run a standard ¬†melange of CRUD operations (inserts, gets, updates) 14 million times, MongoDB takes 20 hours versus 73 hours for Cassandra. That’s a lot of time to wait for the results of a configuration toggle switch!

One of the great parts of the project was the ability to study in detail key features of ¬†systems and then implement test scenarios that exercised these features. A good example of a praxis: applying solidly founded ideas into practice. One has to first discover and understand what has to be tested. Comparing eventual consistency systems such as Cassandra and Riak is a challenging endeavor that stimulates the intellect. For instance, how do Bloom filters¬†(slight risk of false positives) obviate the need for disk access? How do go about testing to see if this is true? Bloom filters are even used in DNA sequencing – see here. Then throw in MongoDB’s differing but overlapping approach, and you get quite a rich NoSQL stew. Documentation and articulation of key features was an important evaluation criterion.

Mongo Conference Impressions

October 17, 2011

Last week I attended a full day Mongo conference hosted at our local Microsoft Nerd Center. The timing was quite¬†fortuitous as I’m heavily involved in evaluating Mongo and Cassandra for a very large data store (600 million records). My head was full of questions especially regarding replication and sharding scenarios.

I noticed that 10gen seems to be very user responsive, and on numerous occasions speakers emphasized that client feedback drove many new features. Furthermore, speakers were very open about Mongo shortcomings. For example,  they openly admitted free list management was in their opinion wanting (I would have never known), and that version 2.2 would have a major overhaul. And above all the no-fluff quotient was high Рseems everyone writes code at 10gen. See: 10Gen CEO Dwight Merriman Still Writes His Own Code!

Overall  the conference was great Рa large turnout of 250 people and a good mix of presentations by 10gen folks and customers showcasing their uses of Mongo. One of the perennial conference problems I had to wrestle with was which concurrently scheduled event to attend!  MTV CMS vs Morphia Java? Replicas or Art Genome project?

I was specifically interested in obtaining some more details regarding MongoDB’s scaling capabilities in the real world – what were some of the largest sites out there, what are their issues, etc. Some of the tidbits I picked up are:

  • Largest cluster is 1000 shards
    • Each shard contains a few terabytes of data
    • Replication set of three
  • Not many folks are using shards – typical sharding factor is between 3-10.

The “Journaling and Storage Engine” by CTO Eliot Horowitz was full of gory/great details on internals. The description of how and why MongoDB uses memory mapped files was very interesting. Other subjects covered where how data and indexes are stored, journaling, fragmentation, and record padding. The upcoming MongoDB version 2.2 will have a new improved fragmentation implementation.

The talk on “Schema Design at Scale” was particularly enlightening and opened my eyes to an entirely new topic of document-oriented schema design. Just because the schema is flexible doesn’t mean that schema problems go away. On the contrary, because the flexibility allows for more choices and therefore less constraints, the number of design decisions correspondingly increases. This presents a whole new set of issues – many of them intellectually very interesting (e.g. embedded collections best practices). And many problems are the same as those facing traditional SQL databases: covering indexes, sharding partition keys, key autoincrements, B-Tree issues, etc. I forgot to ask what 10gen’s take on the recently introduced¬†UnQL (Unstructured Query Language) was.¬†In UnQL’s own words: ¬†it’s an open query language for JSON, semi-structured and document databases.

The “Replication and Replica Sets” presentation described MongoDB’s replication feature in detail. Essentially it is a master/slave model in contrast to Cassandra’s peer-to-peer design. One failover problem I had discovered in high-throughput testing was the time window between a master’s death and the slave’s promotion where writes were not accepted. ¬†The 10gen speaker confirmed my doubts and suggested queueing failed writes and then resubmitting them at a later time (not ideal). ¬†Another issue was that heartbeats are hard-coded to 200 ms and not configurable. One nice new feature that is being worked on is standardizing client access to replica sets. Currently routing logic is dependent on client drivers, and for those sites using a mix of different language drivers this could present problems.

The “Sharding and Scaling” talk by the CTO outlined classical problems regarding sharding – the difficulty in choosing a good key. ¬†Lots of information was provided on the Mongo shard process “mongos” that routes requests to the data process “mongod”. And then there was a config process too – quite a few processes involved here. I just noticed a new Developer Blog Contest: How do you Shard your Data? A point emphasized by several folks was that don’t wait until the last moment to add a new node to your cluster. Best to add it when the current nodes are at 70% capacity – interestingly the same percentage that Cassandra advocates. In general, adding a new node to live cluster is a very difficult exercise in regards to repartitioning current data. I didn’t get around to asking how and if Mongo uses consistent partitioning which is the basis of Dynamo-like eventual consistency stores.

From a customer use case perspective ¬†Jeff Yemin of MTV gave a great talk ¬†how MTV is currently using MongoDB, and also described the historical evolution of their CMS system – from SQL, to XML database to finally to a document-oriented store. Its always instructive to see how people arrive at a decision. Its all about the old philosophical maxim: context of justification and context of discovery. They’re not using sharding since all data fits on one disk.

Finally, new features for Mongo 2.2 due in January were described: improvements in concurrency, TTL collections, hash sharding features, free list management. A major concern of mine was data expiration since for my current project we need to regularly evict old data to make room for new records. Currently the only solution is to create a timestamp index, and write a manual cron-like job to delete stale items. I’ll be looking forward to TTL collections!

Cassandra Java Annotations

August 30, 2010

Overview

Cassandra has a unique column-oriented data model which does not easily map to an entity-based Java model. Furthermore, the Java Thrift client implementation is very low-level and presents the developer with a rather difficult API to work with on a daily basis. This situation  is a good candidate for an adapter to shield the business code from mundane plumbing details.

I recently did some intensive Cassandra (version 0.6.5) work to load millions of geographical postions for ships at sea.¬† Locations were already being stored in MySQL/Innodb using JPA/Hibernate so I already had a ready-made model based on JPA entity beans. After some analysis, I created a mini-framework based on custom annotations and a substantial adapter to encapsulate all the “ugly” Thrift boiler-plate code.¬† Naturally everything was wired together with Spring.

Implementation

The very first step was to investigate existing Cassandra Java client toolkits. As usual in a startup environment time was at a premium, but I quickly checked out a few key clients. Firstly, I looked at Hector, but its API still exposed too much of the Thrift cruft for my needs. It did have nice features for failover and connection pooling, and I will definitely look at it in more detail in the future. Pelops looked really cool with its Mutators and Selectors, but it too dealt with columns Рsee the description.  What I was looking for was an object-oriented way to load and query Java beans. Note that this OO entity-like paradigm might not be applicable to other Cassandra data models, e.g. sparse matrices.

And then there was DataNucleus which advertises JPA/JDO implementations for a large variety of non-SQL persistence stores: LDAP, Hadoop Hbase, Google App, etc. There was mention of a Cassandra solution, but it wasn’t yet ready for prime time. How they manage to address the massive semantic mismatch between JPA is beyond me – unfortunately I didn’t have time to drill down. Seems fishy – but I’ll definitely check this out in the future. Even though I’m a big fan of using existing frameworks/tools, there are times when “rolling your own” is the best course of action.

The following collaborating classes comprised the  framework:

  • CassandraDao – High-level class that understands annotated entity beans
  • ColumnFamily – An adapter for common column family operations – hides the Thrift gore
  • AnnotationManager – Manages the annotated beans
  • TypeMapper – Maps Java data types into bytes and vice versa

Since we already had a JPA-annotated Location bean, my first thought was to reuse this class and simply process the the JPA annotations into their equivalent Cassandra concepts. Upon further examination this proved ugly – the semantic mismatch was too great. I certainly did not want to be importing JPA/Hibernate packages into a Cassandra application! Furthermore, many annotations (such as collections) were not applicable and I needed¬† annotations for Cassandra concepts that did not exist in JPA. In “set theoretic” terms, there are JPA-specific features, Cassandra-specific features and an intersection of the two.

The first-pass implementation required only three annotations: Entity, Column and Key. The Entity annotation is a class-level annotation with keyspace and columnFamily attributes. The Column annotation closely corresponded to its JPA equivalent. The Key annotation specifies the row key. The Entity defines the column family/keyspace  that the entity belongs to and its constituent columns. The CassandraDao class corresponds to a single column family and accepts an entity and type mapper.

Two column families were created: a column family for ship definitions, and a super column family for ship locations. The Ship CF was a simple collection of ship details keyed by each ship’s MMSI (a unique ID for a ship which is typically engraved on the keel). ¬†The Location CF represented a one-to-many relationship for all the possible locations of a ship. The key was the ship’s MMSI, and the column names were Long types representing the millisecond timestamp for the location. The value of the column was a super column – it contained the columns as defined in the ShipLocation bean – latitude, longitude, course over ground, speed over ground, etc.¬† The number of location for a given ship could possibly range in the millions!

From an implementation perspective, I was rather surprised to find that there are no standard reusable classes to map basic Java data types to bytes. Sure, String has getBytes(), but I had to do some non-trivial distracting detective work to get doubles, longs, BigInteger, BigDecimal and Dates converted – all the shifting magic etc. Also made sure to run some performance tests to choose the best alternative!

CassandraDao

The DAO is based on the standard concept of  a genericized DAO of which many versions are floating around:

The initial version of the DAO with basic CRUD functionality is shown below:

public class CassandraDao<T> {
  public CassandraDao(Class<T> clazz, CassandraClient client, TypeMapper mapper)
  public T get(String key)
  public void insert(T entity)
  public T getSuperColumn(String key, byte[] superColumnName)
  public List<T> getSuperColumns(String key, List<byte[]> superColumnNames)
  public void insertSuperColumn(String key, T entity)
  public void insertSuperColumns(String key, List<T> entities)
 }

Of course more complex batch and range operations that reflect advanced Cassandra API methods are needed.

Usage Sample

  import com.google.common.collect.ImmutableList;
  import org.springframework.context.support.ClassPathXmlApplicationContext;
  import org.springframework.context.ApplicationContext;

  // initialization
  ApplicationContext context = new ClassPathXmlApplicationContext("config.xml");
  CassandraDao<Ship> shipDao = (CassandraDao<Ship>)context.getBean("shipDao");
  CassandraDao<ShipLocation> shipLocationDao =
    (CassandraDao<ShipLocation>)context.getBean("shipLocationDao");
  TypeMapper mapper = (DefaultTypeMapper)applicationContext.getBean("typeMapper");

  // get ship
  Ship ship = shipDao.get("1975");

  // insert ship
  Ship ship = new Ship();
  ship.setMmsi(1975); // note: row key - framework insert() converts to required String
  ship.setName("Hokulea");
  shipDao.insert(ship);

  // get ship location (super column)
  byte [] superColumn = typeMapper.toBytes(1283116367653L));
  ShipLocation location = shipLocationDao.getSuperColumn("1975",superColumn);

  // get ship locations (super column)
  ImmutableList<byte[]> superColumns = ImmutableList.of( // Until Java 7, Google rocks!
    typeMapper.toBytes(1283116367653L),
    typeMapper.toBytes(1283116913738L),
    typeMapper.toBytes(1283116977580L));
  List<ShipLocation> locations = shipLocationDao.getSuperColumns("1975",superColumns);

  // insert ship location (super column)
  ShipLocation location = new ShipLocation();
  location.setTimestamp(new Date());
  location.setLat(20);
  location.setLon(-90);
  shipLocationDao.insertSuperColumn("1775",location);

Java Entity Beans

Ship

@Entity( keyspace="Marine", columnFamily="Ship")
public class Ship {
  private Integer mmsi;
  private String name;
  private Integer length;
  private Integer width;

  @Key
  @Column(name = "mmsi")
  public Integer getMmsi() {return this.mmsi;}
  public void setMmsi(Integer mmsi) {this.mmsi= mmsi;}

  @Column(name = "name")
  public String getName() { return name; }
  public void setName(String name) { this.name = name; }
}

ShipLocation

@Entity( keyspace="Marine", columnFamily="ShipLocation")
public class ShipLocation {
  private Integer mmsi;
  private Date timestamp;
  private Double lat;
  private Double lon;

  @Key
  @Column(name = "mmsi")
  public Integer getMmsi() {return this.mmsi;}
  public void setMmsi(Integer mmsi) {this.mmsi= mmsi;}

  @Column(name = "timestamp")
  public Date getTimestamp() {return this.timestamp;}
  public void setTimestamp(Date timestamp) {this.timestamp = msgTimestamp;}

  @Column(name = "lat")
  public Double getLat() {return this.lat;}
  public void setLat(Double lat) {this.lat = lat;}

  @Column(name = "lon")
  public Double getLon() {return this.lon;}
  public void setLon(Double lon) {this.lon = lon;}
}

Spring Configuration

 <bean id="propertyConfigurer">
   <property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE" />
   <property name="location" value="classpath:config.properties</value>
 </bean>
 <bean id="shipDao" class="com.andre.cassandra.dao.CassandraDao" scope="prototype" >
   <constructor-arg value="com.andre.cassandra.data.Ship" />
   <constructor-arg ref="cassandraClient" />
   <constructor-arg ref="typeMapper" />
 </bean>
 <bean id="shipLocationDao" scope="prototype" >
   <constructor-arg value="com.andre.cassandra.data.ShipLocation" />
   <constructor-arg ref="cassandraClient" />
   <constructor-arg ref="typeMapper" />
 </bean>

<bean id="cassandraClient" class="com.andre.cassandra.util.CassandraClient" scope="prototype" >
  <constructor-arg value="${cassandra.host}" />
  <constructor-arg value="${cassandra.port}" />
</bean>

<bean id="typeMapper" class="com.andre.cassandra.util.DefaultTypeMapper" scope="prototype" />

Annotation Documentation

Annotations

Annotation Class/Field Description
Entity Class Defines the keyspace and column family
Column Field Column name
Key Field Row key

Entity Attributes

Attribute Type Description
keyspace String Keyspace
columnFamily String Column Family

Initial Cassandra Impressions

August 30, 2010

Recently I’ve been doing some intensive work with the popular NoSQL framework Cassandra. In this post I describe some of my first impressions of working with Cassandra Thrift Java stubs and some comparisons with Voldemort – another NoSQL framework that I am familiar with.

Cassandra Issues

Data Model

The Cassandra data model – with its columns and super columns is radically different from the traditional SQL data model. Most of the Cassandra descriptions are example-based, and though rich in details they lack generality. While examples are necessary they are not sufficient. What is missing is some formalism to capture the essential qualities of the model which no example fully captures. I recently came across a very good article about “NoSQL data model” from a “relational purist” that strongly resonates with me – see¬†The Cassandra Data Model – highly recommended!

One day soon, I’ll try to write a new post summarizing some of my thoughts on NoSQL data modeling. In short, as the field matures there is going to be a need to create some types of standards out of the wide variety of implementations. There are distinct NoSQL categories: key/value stores, column-oriented stores, document-oriented stores – ¬†but even within these categories there is much unnecessary overlap.

Regarding Cassandra columns, here’s a bit of clarification that may help. There are essentially two kinds of column families:

  • Those that have a fixed finite set of columns. The columns represent the attributes of single objects. Each row has the same number of columns, and the column names are fixed metadata.
  • Those that have an infinite set of columns that represent a collection for the key. The confusing part is that the column name is not really metadata – it is actually a value in its own right!

Thrift Client Limitations

Let me be frank – working with Cassandra’s Java Thrift client is a real pain. In part this due to the auto-generated cross-platform nature of the beast, but there are many pain points that reflect accidental and not inherent complexity. As Cassandra/Thrift matures, I hope more attention will be paid to ameliorating the life of poor programmers.

No class hierarchy for Thrift exceptions

Not deriving your exceptions from a base class is truly a disappointment. Interestingly, neither does Google ProtoBuf! The developer is  forced to either catch up to five exceptions for each call, or resort to the ugly catch Exception workaround. How much nicer would it have been to catch one Thrift base exception!

For example, just look at all the exceptions thrown by the get method of Cassandra.client!

  • org.apache.cassandra.thrift.InvalidRequestException
  • org.apache.cassandra.thrift.UnavailableException
  • org.apache.cassandra.thrift.TimedOutException
  • org.apache.thrift.TException
  • java.io.UnsupportedEncodingException

No class hierarchy for Column and SuperColumn

The core Thrift concepts Column and SuperColumn lack a base class for “implementation” reasons due to the “cross-platform” limitations of Thrift. Instead there is a ColumnOrSuperColumn class that encapsulates return results where either a Column or SuperColumn could be returned. For example, see get_slice. This leads to horrible non-OO onerous and problematic switch statements¬† – if is_setColumn() is true then call getColumn(), or if¬† is_setSuperColumn() then call getSuperColumn(). Aargh!

Documentation

Both Voldemort and Cassandra do not provide satisfactory documentation. If you are going to bet your company’s future on one of these products, you definitely have a right to expect better documentation. Interestingly, other open-source NoSQL products such as MongoDB and Riak do have better documentation.

Documentation for Voldemort configuration properties was truly a disaster (at least in version 60.1).  Parameters responsible for key system performance or even basic functionality were either cryptically documented or not at all. I counted a total of sixty properties. For the majority we were forced to scour the source code to get some basic understanding. Totally unecessary! Some examples: client.max.threads, client.max.connections.per.node, client.max.total.connections, client.connection.timeout.ms, client.routing.timeout.ms, client.max.queued.requests, enable.redirect.routing, socket.listen.queue.length, nio.parallel.processing.threshold, max.threads, scheduler.threads, socket.timeout.ms, etc.

Comparison of Cassandra with Voldemort

On the basic level, both Cassandra and Voldemort are sharded key value stores modeled on Dynamo. Cassandra can be regarded as a superset in that it also provides a data model on top of the base K/V store.

Some comparison points with Voldemort:

  • Cluster node failover
  • Quorum policies
  • Read or write optimized?
  • Can nodes be added to the cluster dynamically?
  • Pluggable store engines: Voldemort supports pluggable engines, Cassandra does not.
  • Dynamically adding column families
  • Hinted Hand-off
  • Read Repair
  • Vector Clocks

Cluster Node Failover

A Voldemort client can specify one or more cluster nodes to connect to. The first node that the client connects to will return to the client a list of all nodes. The client stubs will then account for failover and load balancing. In fact, you can plug in your custom strategies. The third-party Cassandra Java client Hector claims to support node failover.

Read/Write Optimization

Read or write optimized? Cassandra is write-optimized whereas Voldemort reads are faster. Cassandra uses a journaling and compacting paradigm model. Writes are instantaneous in that they simply append a log entry to the current log file. Reads are more expensive since they have to potentially look at more than one SSTable  file to find the latest version of a key. If you are lucky you will find it cached in memory Рotherwise one or more disk accesses will have to be performed. In a way the comparison is not truly apples-to-apples since Voldemort is simply storing blobs, while Cassandra has to deal with its accompanying data model overhead. However, it is curious to see such how two basically K/V products having a different performance profile regarding this vital issue.

Pluggable store engines

Voldemort supports pluggable engines, Cassandra does not. This is a big plus for Voldemort! Out of the box, Voldemort already provides a Berkeley DB and MySQL engine and allows you to easily plug-in your own custom engine. Being able to implement your own backing store is an important concern for many shops. ¬†In fact, on my recent project for a large telecom this was a crucial deal-breaking feature that played a large role in selecting Voldemort. We had in-house MySQL expertise and spent inordinate resources writing our own “highly optimized” MySQL engine. By the way, Riak also has pluggable engines – seven in total!

Dynamically adding column families

Neither Voldemort nor Cassandra (should do soon) support this. In order to add a new “database” or “table” you need update the configuration file and recycle all servers. Obviously this is not a viable production strategy. Riak does support this with buckets.

Quorum Policies

Quorum policies – Voldemort has one, Cassandra has several many Consistency Levels:

  • Zero – Ensure nothing. A write happens asynchronously in background
  • Any – Ensure that the write has been written to at least 1 node
  • One – Ensure that the write has been written to at least 1 replica’s commit log and memory table before responding to the client
  • Quorom – Ensure that the write has been written to¬†N¬†/¬†2¬†+¬†1 replicas before responding to the client
  • DCQuorom – As above but takes into account the rack aware placement strategy
  • All – Ensure that the write is written to all¬†N replicas before responding to the client

Hinted Hand-off

Cassandra and Voldemort both support hinted handoff. Riak also has suppport.

Cassandra:

If a node which should receive a write is down, Cassandra will write a hint to a live replica node indicating that the write needs to be replayed to the unavailable node. If no live replica nodes exist for this key, and ConsistencyLevel.ANY was specified, the coordinating node will write the hint locally. Cassandra uses hinted handoff as a way to (1) reduce the time required for a temporarily failed node to become consistent again with live ones and (2) provide extreme write availability when consistency is not required.

Voldemort:

Hinted Handoff is extremely useful when dealing with a multiple datacenter environment. However, work remains to make this feasible.

Riak:

Hinted handoff is a technique for dealing with node failure in the Riak cluster in which neighboring nodes temporarily takeover storage operations for the failed node. When the failed node returns to the cluster, the updates received by the neighboring nodes are handed off to it.

Hinted handoff allows Riak to ensure database availability. When a node fails, Riak can continue to handle requests as if the node were still there

Read Repair

Cassandra Read Repair

Read repair means that when a query is made against a given key, we perform that query against all the replicas of the key. If a low ConsistencyLevel was specified, this is done in the background after returning the data from the closest replica to the client; otherwise, it is done before returning the data.

This means that in almost all cases, at most the first instance of a query will return old data.

Voldemort

There are several methods for reaching consistency with different guarantees and performance tradeoffs.

Two-Phase Commit ‚ÄĒ This is a locking protocol that involves two rounds of co-ordination between machines. It perfectly consistent, but not failure tolerant, and very slow.

Paxos-style consensus ‚ÄĒ This is a protocol for coming to agreement on a value that is more failure tolerant.

Read-repair ‚ÄĒ The first two approaches prevent permanent inconsistency. This approach involves writing all inconsistent versions, and then at read-time detecting the conflict, and resolving the problems. This involves little co-ordination and is completely failure tolerant, but may require additional application logic to resolve conflicts.

Riak

Read repair occurs when a successful read occurs ‚Äď that is, the quorum was met ‚Äď but not all replicas from which the object was requested agreed on the value. There are two possibilities here for the errant nodes:

  1. The node responded with a “not found” for the object, meaning it doesn’t have a copy.
  2. The node responded with a vector clock that is an ancestor of the vector clock of the successful read.

When this situation occurs, Riak will force the errant nodes to update their object values based on the value of the successful read.

Version Conflict Resolution – Vector Clocks

Cassandra

Cassandra departs from the Dynamo paper by omitting vector clocks and moving from partition-based consistent hashing to key ranges, while adding functionality like order-preserving partitioners and range queries.  Source.

Voldemort

Voldemort uses Dynamo-style vector clocks for versioning.

Riak

Riak utilizes vector clocks (short: vclock) to handle version control. Since any node in a Riak cluster is able to handle a request, and not all nodes need to participate, data versioning is required to keep track of a current value. When a value is stored in Riak, it is tagged with a vector clock and establishes the initial version. When it is updated, the client provides the vector clock of the object being modified so that this vector clock can be extended to reflect the update. Riak can then compare vector clocks on different versions of the object and determine certain attributes of the data.

Other

Synchronous/asynchronous writes

For Voldemort, inserts of a key’s replicas are synchronous. Cassandra allows you to choose which policy best suits you. For cross-data center replication, synchronous updates can be extremely slow.

Caching

Cassandra caches data in-memory, periodically flushing to disk. Voldemort does not cache.

No SQL Taxonomy

May 13, 2010

In the last year or so there has been an incredible explosion of interest in the concept of No SQL. There are so many varying implementations that differ so wildly that it is often difficult to get a clear picture of what is what. Typically authors will either be intimately involved with one specific project or will give cursory overviews of a number of projects.

What is needed is some basic categorization and classification Рin other words a taxonomy of the No SQL provider space. For example, what are the key criteria  used to classify implementations? There are disjoint subsets in the No SQL space, and comparison can be only made between subsets or between  implementations within a given subset. Its all about apple-to-apple comparison.

Here are a few links to shed light on the topic:

And of course let us not forget the contrarian view: