Archive for the ‘Mongo’ Category

Adventures in Hive SerDe-landia

July 25, 2014


I was recently involved  in a product requirements effort for external tables for an SQL-on-Hadoop product. The first thing that came to mind was to take a deep dive into Hive’s SerDe and StorageHandler capabilities. The top-level StorageHandler interface in essence represents a good example of a Hadoop design pattern. These features allow you to connect any external data source to Hive – be it a file or network connection.  One of my core beliefs is that before starting any task, it is important do due diligence and see what other folks are doing in the same space. Prior knowledge and awareness of the current state of the art are things all developers and product managers should be required to do.


There are two dimensions to external data source connectivity:

  • The syntax for defining external data sources in Hive QL which is in essence a DSL (domain-specific language).
  • External data sources connector implementations

In my view, the DSL is not consistent and confusing. There are several ways to do the same thing, and some storage handlers are privileged and exposed as keywords in the DSL. For example, attributes of the default text storage handler are treated as first-order keywords instead of SERDEPROPERTIES. This blog will focus on the storage handler implementations.

There are two dimensions to the way Hive stores table data:

  • file format –  How the input source (e.g. file) is broken up into records.
  • row format – How a record is parsed and mapped into database columns. The SerDe interface defines this extension point.

HiveStorageHandler is the top-level interface that exposes the file and row format handlers. It has three methods – two defining the file format’s input and output, and one method defining the SerDe. A SerDe is simply an an aggregation of a Deserializer and Serializer. The nice thing about Hive is that eats its own dog food – the default text-based handler is implemented as a SerDe itself – MetadataTypedColumnsetSerDe.

One API design note. The the file and row format methods of HiveStorageHandler should be treated in a uniform fashion. Currently the two Input/Output file format methods are exposed as methods whereas the row feature is encapsulated in one method that returns SerDe.


MongoDB Hadoop Connector

The MongoDB folks have written a very comprehensive Hadoop connector:

Connect to MongoDB BSON File

This example illustrates how you can connect to BSON dump files from a MongoDB database. One potential problem I see here is mapping large schemas or managing schema changes. Specifying a large list in the DDL can get unwieldy. Having an external configuration file with the mapping might be easier to handle.

CREATE external TABLE person (
  name string,
  yob int,
  status boolean
ROW FORMAT SERDE 'com.mongodb.hadoop.hive.BSONSerDe'
    'mongo.columns.mapping'='{"name" : "name", "yob" : "yob", "status" : "status" ')
  INPUTFORMAT 'com.mongodb.hadoop.mapred.BSONFileInputFormat'
  OUTPUTFORMAT 'com.mongodb.hadoop.hive.output.HiveBSONFileOutputFormat'
LOCATION '/data/person';

Connect to MongoDB database

This is equivalent to the BSON example except that we connect to a live database. As you add more documents to your MongoDB database they automatically appear in the Hive table! Note here my earlier complaint about having multiple ways in doing the same thing. The BSON example uses the STORED AS, INPUTFORMAT and OUTPUTFORMAT keywords whereas this example uses STORED BY. This distinction is not very intuitive.

CREATE external TABLE person (
  name string,
  yob int,
  age INT,
  status boolean
STORED BY 'com.mongodb.hadoop.hive.MongoStorageHandler'
  'mongo.columns.mapping'='{"name" : "name", "yob" : "yob", "status" : "status" ')

Plugin Ecosystem

Having a well defined interface and productive abstractions has allowed a vigorous third-party plug-in ecosystem to flourish. Besides the text handler, Hive comes bundled with the following storage handlers:

  • Avro (Hive 0.9.1 and later)
  • ORC (Hive 0.11 and later)
  • RegEx
  • Thrift
  • Parquet (Hive 0.13 and later)

Some third-party implementations:


MongoDB and Cassandra Cluster Failover

October 21, 2011

One of the most important features of a scalable clustered NoSQL store is how to handle failover. The basic question is: is failover seamless from the client perspective? This is not always immediately apparent from vendor documentation especially for open-source packages where documentation is often wanting. The only way to really know is to run your own tests – to both verify vendor claims and to fully understand them. Caveat emptor – especially if the product is free!

Mongo and Cassandra use fundamentally different approaches to clustering. Mongo is based on the classical master/slave model (similar to MySQL) whereas Cassandra is a peer-to-peer system modeled on the eventual consistency paradigm pioneered by Amazon’s Dynamo system. This difference has specific ramifications regarding failover capabilities. The design trade-offs regarding the CAP theorem are described very well in the dbmusings blog post Overview of the Oracle NoSQL Database (Section CAP).

For Mongo, the client can only write to the master and therefore it is a single point of failure. Until a new master is elected from the secondaries, the cluster will not be reachable for writes.

For Mongo reads, you have two options: either talk to the master or to the secondaries.  The default  is to read from the master, and you will be subject to the same semantics as for writes. To enable reading from secondaries, you can call Mongo.slaveOk() for you client driver. For details see here.

For Cassandra, as long  as your selected consistency policy is satisfied, failing nodes will not prevent client access.

Mongo Setup

I’ll illustrate the issue with a simple three node cluster. For Mongo, you’ll need to create a three-node replica set which is well described on the Replica Set Tutorial page.

Here’s a convenience shell script to launch a Mongo replica set.

  mv nohup.out old-nohup.out
  OPTS="--rest --nojournal --replSet myReplSet"
  nohup mongod $OPTS --port 27017 --dbpath $dir/node0  &
  nohup mongod $OPTS --port 27018 --dbpath $dir/node1  &
  nohup mongod $OPTS --port 27019 --dbpath $dir/node2  &

Cassandra Setup

For Cassandra, create a keyspace with replication factor of 3 in your schema definition file.

  create keyspace UserProfileKeyspace
    with strategy_options=[{replication_factor:3}]
    and placement_strategy = 'org.apache.cassandra.locator.SimpleStrategy';

Since I was using the standard Java Hector client along with Spring, I’ll highlight some of the key Spring bean configurations. The important point to note is the consistency policy must be quorum which means that for an operation (read or write) to succeed, two out of the three nodes must respond.

Properties File


Application Context File

  <bean id="userProfileKeyspace"
    <constructor-arg value="UserProfileKeyspace" />
    <constructor-arg ref="cluster"/>
    <property name="consistencyLevelPolicy" ref="${cassandra.consistencyLevelPolicy}" />

  <bean id="cluster"
    <constructor-arg value="${cassandra.cluster}"/>
    <constructor-arg ref="cassandraHostConfigurator"/>

  <bean id="cassandraHostConfigurator"
     <constructor-arg value="${cassandra.hosts}"/>

  <bean id="quorumAllConsistencyLevelPolicy"
        class="me.prettyprint.cassandra.model.QuorumAllConsistencyLevelPolicy" />

  <bean id="allOneConsistencyLevelPolicy"
        class="me.prettyprint.cassandra.model.AllOneConsistencyLevelPolicy" />

Test Scenario

The basic steps of the test are as follows. Note the Mongo example assumes you do not have slaveOk turned on.

  • Launch cluster
  • Execute N requests where N is a large number such as 100,000 to give your cluster time to fail over. The request is either a read or write.
  • While your N requests are running, kill one of the nodes. For Cassandra this can be any node since it is peer-to-peer. For Mongo, kill the master node.
  • For Cassandra, there will be no exceptions. If your requests are inserts, you will be able to subsequently retrieve them.
  • For Mongo, your requests will fail until the secondary is promoted to the master. This happens for both writes and reads. The time window is “small”, but depending upon your client request rate, the number of failed requests can be quite a few thousand! See the sample exceptions below.
With Mongo, the client can directly access only the master node. The secondaries can only be accessed by the master and never directly by the client. With Cassandra, as long as the minimum number of nodes as specified by the quorum can be reached, your operation will succeed.

Mongo Put Exception Example

I really like the message “can’t say something” – sort of cute!

com.mongodb.MongoException$Network: can't say something
    at com.mongodb.DBTCPConnector.say(
    at com.mongodb.DBTCPConnector.say(
    at com.mongodb.DBApiLayer$MyCollection.update(
    at com.amm.nosql.dao.mongodb.MongodbDao.put(

Mongo Get Exception Example

com.mongodb.MongoException$Network: can't call something
    at com.mongodb.DBApiLayer$MyCollection.__find(
    at com.mongodb.DBCursor._check(
    at com.mongodb.DBCursor._next(
    at com.amm.nosql.dao.mongodb.MongodbDao.get(

Mongo Get with slaveOk() 

If you invoke Mongo.slaveOk() for your client driver, then your reads will not fail if a node goes down. You will get the following warning.

Oct 30, 2011 8:12:22 PM com.mongodb.ReplicaSetStatus$Node update
WARNING: Server seen down: localhost:27019 Connection reset by peer: socket write error
        at Method)
        at com.mongodb.OutMessage.pipe(
        at com.mongodb.DBPort.go(
        at com.mongodb.DBPort.go(
        at com.mongodb.DBPort.findOne(
        at com.mongodb.DBPort.runCommand(
        at com.mongodb.ReplicaSetStatus$Node.update(
        at com.mongodb.ReplicaSetStatus.updateAll(
        at com.mongodb.ReplicaSetStatus$
Oct 30, 2011 8:12:22 PM com.mongodb.DBTCPConnector _set

Cassandra Users Group Impressions

October 20, 2011

Tonight I attended my first local Cassandra Users Group meetup. The wet walk in the rain sure was worth it!

About half a dozen people attended – most had a Cassandra deployment in production and all had something interesting to say. No fluff here tonight. Everyone had a high-level of knowledge of EC – it was good not to be the big fish in the NoSQL pond so I could learn from folks out in the field.

Sidebar: Cassandra 1.0 was released yesterday! I wonder what the significance of the release number jump from 0.8.7 to 1.0 is – a new item on my TODO queue.

Here follows a synopsis.

Company B

  • 5 TB of data
  • 21 nodes on AWS – spiked to 42 once – now back to 21
  • 90% writes – hence Cassandra is ideal
  • Two column families:
    • one column – basic key/value
    • wide column – key is user ID and columns are his/her emails

Company C

  • 72 nodes
  • 2 data centers

Company Z

  • Cassandra 0.7.6
  • 4 nodes
  • Raid 0 disks – redundancy is achieved through Cassandra clustering
  • 21 million writes a day
  • 250 GB – 7 million XML files ranging from 4 to 450 MB in size
  • Column family: key/value
  • Replication factor is 3
  • 80 concurrent write threads, 160 read threads
  • QuickLZ compression down to 20%
  • Migrated from Mogile FS


Regarding adding new nodes to a live cluster, no one had any negative experiences. For RandomPartitioner, several folks said it was best to double the number of nodes assuming your company can foot the bill. There was apparently no negative impact in rebalancing overhead related to shifting data to the new nodes.

Two folks were using manual server token assignments for a RandomPartitioner – a third person was not keen on this. Asssuming you are using a MD5 hash, you simply divide the 128 bit name space by the number of servers. This apparently was better than letting Cassandra compute the tokens. This was OK even if you added a new node(s) and had to recompute server tokens.

Hadoop integration was important for many – and there was nothing but praise for DataStax’s Brisk tool that integrates Hadoop and Cassandra. Instead of using the standard HDFS (Hadoop Distributed File System) you can simply point Hadoop to use Cassandra. Two advantages to this are:

  • Random access of Cassandra is  faster than HDFS
  • Avoid HDFS SPOF (single point of failure)

One company was very happy with Brisk’s seamless Hive integration. Several people expressed issues with the apparent inability to execute map reduce jobs on more than one column.

Regarding client drivers, the Java users all concurred on the horrors of the accessing Cassandra via the complex Thrift-based data model. Hector marginally alleviates this but it is still painful. For example, compare this to Mongo’s extremely intuitive JSON-based interface! Two companies were using customized Ruby/JRuby gems.  I learned that a previously advertised Cassandra option for Avro in addition to Thrift had been dropped.

There was some discussion of other NoSQL vendors – namely MongoDB and Riak. One company was happily using MongoDB. Others had naturally gravitated to Cassandra because of its core competencies of failover robustness and scalability.  The fact that MongoDB used a master/slave paradigm for replication was an issue for some – the master is a SPOF for writes. People also commented that Cassandra cluster was easier to manage since Cassandra has only one process per node whereas with sharded MongoDB you need to launch several processes: mongod, mongos, and config.

MongoDB vs. Cassandra Benchmarks

October 19, 2011

Cassandra and Mongo passed the first set of winnowing tests – both in terms of speed and functionality. In order to provide more data for an informed decision, I subjected both to a series of high throughput tests. The test suite was a total of 120 million operations (insert, update, get) – 12 runs of 10 million requests.

For those impatient for the ultimate conclusion, the executive summary is: Cassandra writes are faster, Mongo reads are faster. But what is interesting is how the latencies evolve over time, i.e. over the course of 120 million requests.

Overview of test suite

  • Remove all disk data of the persistent store with rm -rf
  • Start the NoSQL server
  • Maintain a constant throughput of 20 or 50 concurrent client threads with no rampup time
  • Insert N records
  • Get N records – this will reflect any caching or compaction (e.g. Cassandra SSTables)
  • Get N records – this should reflect already compacted tables – should be faster
  • Get N records – should correspond to previous get
  • Update N keys with different sized values (value seed). We want to make sure we’re exercising Mongo compaction
  • Repeat the three get runs again

Results Commentary

When one result is faster than another, the important question to always ask is: how important is the difference? Does it really matter? Is the slower system still fast enough? If the lower value still satisfies your requirements, then the difference is of little signficance and other factors weigh more heavily on the ultimate decision.

In order to interpret results, it is important to have a basic understanding of some statistics. You should not rely solely on the mean, but also pay particular attention to latency percentiles.  Percentiles give you an idea of the distribution of results which is especially important when predictability is required. Your SLA might say:  P reads MUST be less than 10ms where P is the percentage, e.g. 99% or 100%. This of course raises the question what is to be done for those operations that exceed N, but that is another discussion. As you can see, there are some real whoppers – note the Max value for put – 37 seconds for MongoDB and the 175 seconds for Cassandra!

As the tables below indicate,  Cassandra was a winner for puts – both inserts and updates. For the initial 10M puts, the distribution is interesting – 50% of Mongo puts were under 1ms while for Cassandra it was 2ms. But for the 90-th percentile, Mongo took 20ms but Cassandra was 4ms.

For the subsequent two batches of 10M puts (inserts), Cassandra’s throughput was 4054 and 5868 – quite a bit of variability! Note the max value for a put was 174 seconds – must have been one heck of a GC! Mongo’s two insert batches ranged from 1753 to 2197 puts per second. 90% of Cassandra puts are finishing under 4ms, whereas 90% of Mongo puts are  under 17ms.

Cassandra’s superior write numbers are  not surprising since it is optimized for faster writes than reads. See here. In order to avoid expensive disk seeks, writes are appended to sequential files called SSTables. However the hit is taken on reads, where potentially the system has to examine several SSTables for updated values. Unfortunately, for our needs it fast writes were of more importance.

For reads the situation is quite different and Mongo comes out ahead. For the first three get batches Cassandra is giving us between 2300-2600 gets per second. Mongo is in the predictable 3100s. Both of these fall within our target SLA and are roughly comparable.

However, its a completely different story for reads after we do inserts! Look at the numbers: Mongo is slightly slower at 2999 and 3099, but Cassandra drops tenfold to 294 and 350!  Que caramba! I’m assuming this is because of the need to access multiple SSTables, but I would’ve assumed that at some point the values were merged into one file.

Perhaps there’s some tuning that needs to be done for Cassandra though I saw nothing obvious in my initial look at the documentation. All I can say with confidence is, is that un-tuned Cassandra get operations drastically slow down. I did invoke an explicit compaction using nodetool, and throughput for next 10 millions gets did improve to 1000/sec. But the caveat was that Cassandra documentation did not recommend this since automated compactions will not be run after a manual one! By the way, one of Mongo’s strong suits is that they strive to minimize the amount of configuration required to get a system running.


In order to establish a baseline for the vendor’s capabilities, a non-clustered server was used. For Cassandra this was single node cluster, for Mongo simply one server with no replication. Cluster tests were run for functionality.

Test Configuration

  • Number of requests: 10,000,000
  • Object Mapper: Jackson JSON
  • Key size: 32
  • Value size: average is 1000 bytes, range from 100 to 1900
Server specs
  • Centos 5.5 Linux
  • Intel Xeon 2.4 MHz 64 bit processor
  • 8 processors, 2 cores each
  • 8 GB RAM
  • The client pinging the server averaged 0.2 ms
The columns are for the most part self-descriptive. The ValSeed column indicates the random seed for the value size. This is important since we want to exercise the vendor’s compaction algorithm by assigning differently-sized values to an updated key. Values are generated  by creating an N-sized String from a randomly set of  ASCII characters. Since we want to avoid equally sized value, we want to vary N  from 100 to 1800.
  int baseSize = 100 ;
  int range = 1800 ;
  Random random = new Random(valueSizeSeed)
  int valueSize = random.nextInt(range) + baseSize ;


  • Version: 2.0.0
  • Journaling: turned on
Test     Req/Sec   Millis    50%    90%    99%  99.5%  99.9%    Max     Mean  Err  Fail Thr ValSeed
Put         2114  4729454      1     20     43     60   1576  36232    9.442    0     0  20    1776
Get         3182  3142258      2     10     18     28    273   4684    6.268    0     0  20
Get         3182  3142850      2     10     18     26    229   3950    6.269    0     0  20
Get         3106  3219539      2     33     88    144   2120   3885   16.060    0     0  50
Put         1753  5706060      1     17     76    278   1985  44472   11.395    0     0  20    1812
Put         2197  4552045      1     17     52    182   1293  37299    9.087    0     0  20    1846
Get         2999  3333966      2     11     19     39    308   4380    6.651    0     0  20
Get         3039  3290874      2     10     19     41    289   4676    6.565    0     0  20
PutGet       907 11027045     14     28     70    150   2531   8886   22.036    0     0  20    1861
Get         2992  3342034      2     11     20     40    299   4299    6.666    0     0  20
Get         2975  3361762      2     11     20     38    301   4478    6.707    0     0  20
Get         2957  3381393      2     34    112    166   2160   4363   16.871    0     0  50

Cassandra – Version: 0.8.6

Test     Req/Sec   Millis    50%    90%    99%  99.5%  99.9%    Max     Mean  Err  Fail Thr ValSeed
Put         6045  1654219      2      4     16     24    160   4169    3.288    0     0  20    1776
Get         2334  4285208      1     14    111    150    427   9003    8.552    0     0  20
Get         2380  4202468      2     10    110    144    290   8380    8.387    0     0  20
Get         2616  3822749      8     40    177    226   1559  24092   19.069    0     0  50
Put         4054  2466583      2      4     17     26    194 174614    4.915    0     0  20    1812
Put         5768  1733745      2      4     16     24    172   4190    3.446    0     0  20    1846
Get          294 33972166     43    141    371    567   1737  67276   67.928    0     0  20
Get          350 28551712     39    116    256    354   1629  37371   57.087    0     0  20
PutGet       285 35124479     40    124    388    801   2384 456321   70.232    0     0  20    1861
Get          210 47730677     79    182    341    418   1141  36368   95.446    0     0  20
Get          211 47305663     79    180    335    409   1097 104592   94.595    0     0  20
Get          249 40157243    175    339    598    730   2164  77675  200.751    0     0  50

Variability Examples

Here are some examples demonstrating the variability in results. The examples are from identical tests in the same context.


First 10M puts

Test     Req/Sec   Millis    50%    90%    99%  99.5%  99.9%    Max     Mean  Err  Fail  Thr
Put         2137  4679117      1     21     41     49   1388  40931    9.338    0     0  20
Put         2156  4639027      1     21     41     48   1450  36412    9.258    0     0  20
Put         2114  4729454      1     20     43     60   1576  36232    9.442    0     0  20
Put         2079  4810938      1     20     42     55   1354  38436    9.605    0     0  20

First 10M gets

Test     Req/Sec   Millis    50%    90%    99%  99.5%  99.9%    Max     Mean  Err  Fail  Thr

Get         3187  3138189      2     10     18     25    225   4665    6.260    0     0  20
Get         3115  3210200      2     10     18     26    258  13211    6.403    0     0  20
Get         3182  3142258      2     10     18     28    273   4684    6.268    0     0  20
Get         3141  3184198      2     10     18     27    271  22285    6.352    0     0  20

No SQL Data Store Evaluation

October 18, 2011

Recently I’ve been doing some extensive and intensive evaluation of various No SQL implementations capable of storing 600,000 key/value items with a latency preferably under 50 milliseconds. Extensive in the sense of examining many providers and intensive in drilling down to test key features of each system. The specific requirements precluded looking at all stores, but a number of quite unique systems – each with its strong personality – were examined in substantial detail.

The initial requirements were:

  • Eventually be able to store six months of user profiles for ad engagements
  • Insertion of bout 3-5 million records daily – an equivalent amount had to be deleted
  • Key/value access – no need for range or index queries
  • All keys were random MDA hashes
  • Retrieval latency under 50 ms
  • Timestamp expiration of records
  • Reads needed to be very fast – writes could be batched and delayed
  • Ability to manipulate nodes of to live cluster, e.g. add a new node without restart
In order to execute the evaluation I developed a suite of structured high load tests that could be run against all providers.  Besides raw benchmarks there were also tests for key technical issues such as failover behavior. It was not possible for all implementations to be comparable for all features. Obviously providers made different design and feature trade-offs – that’s their unique personality.
Some topics examined:
  • Predictability – we need low variability in response time – latency percentiles are helpful here.
  • Repeatibility – we want to run the same test in the same context repeatedly with same results subject to an obvious margin or error. Establishing a consistent margin is a difficult chore.
  • Consistency model – eventual consistency and/or immediate. How is the CAP theorem handled?
  • Failover – when a node dies what happens to reads and writes?
  • Sharding – ability to shard data and key partitioning strategies.
  • Compaction policy – how is data stored, how are updates and deletions handled (tombstones). If implemented in Java, how much of a hist is garbage collection? I
The following systems were evaluated, with the first two as  the final candidates.
  • Cassandra
  • MongoDB
  • Membase
  • Riak
  • Redis
In addition the idea of a bank of caching servers fronting the NoSQL stores was examined – Memcached, Ehcache REST server, and custom REST servers with a pluggable cache engine (Ehcache, Infinispan). In some cases the NoSQL stores were so performant (MongoDB) that a separate caching layer was not immediately necessary. Perhaps under greater volume it would be. They key was to have a provisional caching strategy as a reserve.

MongoDB is very fast and reliable and it made the first cut. Membase also proved fast, but it had some reliability issues especially in server restarts. Cassandra provided the full set of features we needed though it was significantly slower than MongoDB.  Since we were testing with a constant client thread count of 20 or 50, being able to stay alive and keep up with this rate was important.

One of the frustrating aspects was the difficulty in obtaining adequate or minimal hardware. Since the raison de etre of the project was high volume, it was important to test with as large numbers as possible. Differences between low and high volume scenarios can give you drastically different results and lead you to costly wrong conclusions. There is also a natural tension between reasonable turnaround time for results and the volume and rate of data testing.  For example, to run a standard  melange of CRUD operations (inserts, gets, updates) 14 million times, MongoDB takes 20 hours versus 73 hours for Cassandra. That’s a lot of time to wait for the results of a configuration toggle switch!

One of the great parts of the project was the ability to study in detail key features of  systems and then implement test scenarios that exercised these features. A good example of a praxis: applying solidly founded ideas into practice. One has to first discover and understand what has to be tested. Comparing eventual consistency systems such as Cassandra and Riak is a challenging endeavor that stimulates the intellect. For instance, how do Bloom filters (slight risk of false positives) obviate the need for disk access? How do go about testing to see if this is true? Bloom filters are even used in DNA sequencing – see here. Then throw in MongoDB’s differing but overlapping approach, and you get quite a rich NoSQL stew. Documentation and articulation of key features was an important evaluation criterion.

Mongo Conference Impressions

October 17, 2011

Last week I attended a full day Mongo conference hosted at our local Microsoft Nerd Center. The timing was quite fortuitous as I’m heavily involved in evaluating Mongo and Cassandra for a very large data store (600 million records). My head was full of questions especially regarding replication and sharding scenarios.

I noticed that 10gen seems to be very user responsive, and on numerous occasions speakers emphasized that client feedback drove many new features. Furthermore, speakers were very open about Mongo shortcomings. For example,  they openly admitted free list management was in their opinion wanting (I would have never known), and that version 2.2 would have a major overhaul. And above all the no-fluff quotient was high – seems everyone writes code at 10gen. See: 10Gen CEO Dwight Merriman Still Writes His Own Code!

Overall  the conference was great – a large turnout of 250 people and a good mix of presentations by 10gen folks and customers showcasing their uses of Mongo. One of the perennial conference problems I had to wrestle with was which concurrently scheduled event to attend!  MTV CMS vs Morphia Java? Replicas or Art Genome project?

I was specifically interested in obtaining some more details regarding MongoDB’s scaling capabilities in the real world – what were some of the largest sites out there, what are their issues, etc. Some of the tidbits I picked up are:

  • Largest cluster is 1000 shards
    • Each shard contains a few terabytes of data
    • Replication set of three
  • Not many folks are using shards – typical sharding factor is between 3-10.

The “Journaling and Storage Engine” by CTO Eliot Horowitz was full of gory/great details on internals. The description of how and why MongoDB uses memory mapped files was very interesting. Other subjects covered where how data and indexes are stored, journaling, fragmentation, and record padding. The upcoming MongoDB version 2.2 will have a new improved fragmentation implementation.

The talk on “Schema Design at Scale” was particularly enlightening and opened my eyes to an entirely new topic of document-oriented schema design. Just because the schema is flexible doesn’t mean that schema problems go away. On the contrary, because the flexibility allows for more choices and therefore less constraints, the number of design decisions correspondingly increases. This presents a whole new set of issues – many of them intellectually very interesting (e.g. embedded collections best practices). And many problems are the same as those facing traditional SQL databases: covering indexes, sharding partition keys, key autoincrements, B-Tree issues, etc. I forgot to ask what 10gen’s take on the recently introduced UnQL (Unstructured Query Language) was. In UnQL’s own words:  it’s an open query language for JSON, semi-structured and document databases.

The “Replication and Replica Sets” presentation described MongoDB’s replication feature in detail. Essentially it is a master/slave model in contrast to Cassandra’s peer-to-peer design. One failover problem I had discovered in high-throughput testing was the time window between a master’s death and the slave’s promotion where writes were not accepted.  The 10gen speaker confirmed my doubts and suggested queueing failed writes and then resubmitting them at a later time (not ideal).  Another issue was that heartbeats are hard-coded to 200 ms and not configurable. One nice new feature that is being worked on is standardizing client access to replica sets. Currently routing logic is dependent on client drivers, and for those sites using a mix of different language drivers this could present problems.

The “Sharding and Scaling” talk by the CTO outlined classical problems regarding sharding – the difficulty in choosing a good key.  Lots of information was provided on the Mongo shard process “mongos” that routes requests to the data process “mongod”. And then there was a config process too – quite a few processes involved here. I just noticed a new Developer Blog Contest: How do you Shard your Data? A point emphasized by several folks was that don’t wait until the last moment to add a new node to your cluster. Best to add it when the current nodes are at 70% capacity – interestingly the same percentage that Cassandra advocates. In general, adding a new node to live cluster is a very difficult exercise in regards to repartitioning current data. I didn’t get around to asking how and if Mongo uses consistent partitioning which is the basis of Dynamo-like eventual consistency stores.

From a customer use case perspective  Jeff Yemin of MTV gave a great talk  how MTV is currently using MongoDB, and also described the historical evolution of their CMS system – from SQL, to XML database to finally to a document-oriented store. Its always instructive to see how people arrive at a decision. Its all about the old philosophical maxim: context of justification and context of discovery. They’re not using sharding since all data fits on one disk.

Finally, new features for Mongo 2.2 due in January were described: improvements in concurrency, TTL collections, hash sharding features, free list management. A major concern of mine was data expiration since for my current project we need to regularly evict old data to make room for new records. Currently the only solution is to create a timestamp index, and write a manual cron-like job to delete stale items. I’ll be looking forward to TTL collections!