Archive for the ‘Testing’ Category

On the Nature of Failure in Distributed Systems

March 26, 2012

I just finished reading an excellent article by Jay Kreps of Voldemort fame titled Getting Real About Distributed System Reliability. This is a must read for anyone seriously interested in the operational reliability of NoSQL or Big Data systems. The main points of the article are that node failure is not independent and the importance of having solid operational procedures for your production environment.

The most frequent reasons for node failures are not random network blips but  software bugs that unfortunately often manifest themselves at the same time on all the deployed nodes.  Michael Stonebraker makes a similar point in Clarifications on the CAP Theorem and Data-Related Errors where he describes Bohrbugs – “when multiple data base replicas are available, the same transaction issued to the replicas will cause all of them to crash”. Even though the theoretical probability of node failure is higher as the cluster size increases, this is not the main problem for distributed system reliability.

Kreps  points out that the theoretical conclusions regarding reliability provided by the CAP theorem do not accurately translate into run-time reliability. Stonebraker makes a similar point: “the model does not deal with several important classes of errors, which a real world system administrator must cope with”.

This is not so much a problem with the CAP theorem per se. Satisfying the CAP theorem is a necessary but not sufficient condition. It is more about system design rather than implementation. On a side note regarding the CAP theorem, there is an excellent discussion by Daniel Abadi in Problems with CAP, and Yahoo’s little known NoSQL system of some of the problems of differentiating CAP’s Availability and Partion-tolerance, i.e what exactly is the distinction between an CA and CP system?

Kreps underscores the importance of operational procedures since no code is bug free. This is especially true of relatively immature NoSQL systems since they are very young and have not gone through a long hardening phase such as traditional databases. In any complex long-running system you are bound to have outages no matter how good your code is, and the real question to be asked is: how do you recover from failures? Of course this is not a novel idea, but it is worth keeping focus on this point especially when it comes from a reliable NoSQL authority.

The importance of system reliability and recovery is underscored by the rather embarrassing recent Azure outage due to February’s leap day. Due to a minor software bug, Azure was unable to launch new elastic VMs on February 29, 2012. This didn’t just happen on one machine but across the whole cluster. In other words, the single point of failure was a software bug! All the redundant hardware and power sources weren’t much good in this situation! See a good recap:  The Azure Outage: Time Is A SPOF, Leap Day Doubly So. The two main points to emphasize here are: the failure was not random and it was widespread, and the necessity of operational recovery procedures.

Finally Kreps raises the need for empirically verifiable tests for  NoSQL products. Having spent an inordinate amount of time evaluating NoSQL systems (think cross-country drive instead of a mere drive around the block) this point strongly resonates with me. Distributed systems are notoriously complex beasts since they have to deal with failure in a fundamentally different way than single server systems. NoSQL vendors would be doing the community and themselves a great service if they would make public their high-volume tests and benchmarks. I would be delighted to see these test results posted regularly on public CI build dashboards. In order for NoSQL systems to be more widely accepted in the enterprise community, more transparency is needed regarding scalability issues.  Concrete comparable benchmarks are needed.

A step in this direction would be to develop some sort of standardized NoSQL equivalent of TPC benchmarks. For example, to start off with, implement a series of high-volume and high-throughput tests with a key/value data model. Kick off a few million writes, then reads, some read/writes, deletes along with different record and key sizes and see what happens. Set up a cluster and then start killing and restarting nodes and see if the system behaves correctly and performantly. I’ve put a lot of effort in such a framework called vtest and I know its not easy. By publicly exposing these kind of tests, vendors would only increase confidence in their products.

MongoDB vs. Cassandra Benchmarks

October 19, 2011

Cassandra and Mongo passed the first set of winnowing tests – both in terms of speed and functionality. In order to provide more data for an informed decision, I subjected both to a series of high throughput tests. The test suite was a total of 120 million operations (insert, update, get) – 12 runs of 10 million requests.

For those impatient for the ultimate conclusion, the executive summary is: Cassandra writes are faster, Mongo reads are faster. But what is interesting is how the latencies evolve over time, i.e. over the course of 120 million requests.

Overview of test suite

  • Remove all disk data of the persistent store with rm -rf
  • Start the NoSQL server
  • Maintain a constant throughput of 20 or 50 concurrent client threads with no rampup time
  • Insert N records
  • Get N records – this will reflect any caching or compaction (e.g. Cassandra SSTables)
  • Get N records – this should reflect already compacted tables – should be faster
  • Get N records – should correspond to previous get
  • Update N keys with different sized values (value seed). We want to make sure we’re exercising Mongo compaction
  • Repeat the three get runs again

Results Commentary

When one result is faster than another, the important question to always ask is: how important is the difference? Does it really matter? Is the slower system still fast enough? If the lower value still satisfies your requirements, then the difference is of little signficance and other factors weigh more heavily on the ultimate decision.

In order to interpret results, it is important to have a basic understanding of some statistics. You should not rely solely on the mean, but also pay particular attention to latency percentiles.  Percentiles give you an idea of the distribution of results which is especially important when predictability is required. Your SLA might say:  P reads MUST be less than 10ms where P is the percentage, e.g. 99% or 100%. This of course raises the question what is to be done for those operations that exceed N, but that is another discussion. As you can see, there are some real whoppers – note the Max value for put – 37 seconds for MongoDB and the 175 seconds for Cassandra!

As the tables below indicate,  Cassandra was a winner for puts – both inserts and updates. For the initial 10M puts, the distribution is interesting – 50% of Mongo puts were under 1ms while for Cassandra it was 2ms. But for the 90-th percentile, Mongo took 20ms but Cassandra was 4ms.

For the subsequent two batches of 10M puts (inserts), Cassandra’s throughput was 4054 and 5868 – quite a bit of variability! Note the max value for a put was 174 seconds – must have been one heck of a GC! Mongo’s two insert batches ranged from 1753 to 2197 puts per second. 90% of Cassandra puts are finishing under 4ms, whereas 90% of Mongo puts are  under 17ms.

Cassandra’s superior write numbers are  not surprising since it is optimized for faster writes than reads. See here. In order to avoid expensive disk seeks, writes are appended to sequential files called SSTables. However the hit is taken on reads, where potentially the system has to examine several SSTables for updated values. Unfortunately, for our needs it fast writes were of more importance.

For reads the situation is quite different and Mongo comes out ahead. For the first three get batches Cassandra is giving us between 2300-2600 gets per second. Mongo is in the predictable 3100s. Both of these fall within our target SLA and are roughly comparable.

However, its a completely different story for reads after we do inserts! Look at the numbers: Mongo is slightly slower at 2999 and 3099, but Cassandra drops tenfold to 294 and 350!  Que caramba! I’m assuming this is because of the need to access multiple SSTables, but I would’ve assumed that at some point the values were merged into one file.

Perhaps there’s some tuning that needs to be done for Cassandra though I saw nothing obvious in my initial look at the documentation. All I can say with confidence is, is that un-tuned Cassandra get operations drastically slow down. I did invoke an explicit compaction using nodetool, and throughput for next 10 millions gets did improve to 1000/sec. But the caveat was that Cassandra documentation did not recommend this since automated compactions will not be run after a manual one! By the way, one of Mongo’s strong suits is that they strive to minimize the amount of configuration required to get a system running.

Context

In order to establish a baseline for the vendor’s capabilities, a non-clustered server was used. For Cassandra this was single node cluster, for Mongo simply one server with no replication. Cluster tests were run for functionality.

Test Configuration

  • Number of requests: 10,000,000
  • Object Mapper: Jackson JSON
  • Key size: 32
  • Value size: average is 1000 bytes, range from 100 to 1900
Server specs
  • Centos 5.5 Linux
  • Intel Xeon 2.4 MHz 64 bit processor
  • 8 processors, 2 cores each
  • 8 GB RAM
  • The client pinging the server averaged 0.2 ms
The columns are for the most part self-descriptive. The ValSeed column indicates the random seed for the value size. This is important since we want to exercise the vendor’s compaction algorithm by assigning differently-sized values to an updated key. Values are generated  by creating an N-sized String from a randomly set of  ASCII characters. Since we want to avoid equally sized value, we want to vary N  from 100 to 1800.
  int baseSize = 100 ;
  int range = 1800 ;
  Random random = new Random(valueSizeSeed)
  int valueSize = random.nextInt(range) + baseSize ;

MongoDB

  • Version: 2.0.0
  • Journaling: turned on
Test     Req/Sec   Millis    50%    90%    99%  99.5%  99.9%    Max     Mean  Err  Fail Thr ValSeed
Put         2114  4729454      1     20     43     60   1576  36232    9.442    0     0  20    1776
Get         3182  3142258      2     10     18     28    273   4684    6.268    0     0  20
Get         3182  3142850      2     10     18     26    229   3950    6.269    0     0  20
Get         3106  3219539      2     33     88    144   2120   3885   16.060    0     0  50
Put         1753  5706060      1     17     76    278   1985  44472   11.395    0     0  20    1812
Put         2197  4552045      1     17     52    182   1293  37299    9.087    0     0  20    1846
Get         2999  3333966      2     11     19     39    308   4380    6.651    0     0  20
Get         3039  3290874      2     10     19     41    289   4676    6.565    0     0  20
PutGet       907 11027045     14     28     70    150   2531   8886   22.036    0     0  20    1861
Get         2992  3342034      2     11     20     40    299   4299    6.666    0     0  20
Get         2975  3361762      2     11     20     38    301   4478    6.707    0     0  20
Get         2957  3381393      2     34    112    166   2160   4363   16.871    0     0  50

Cassandra – Version: 0.8.6

Test     Req/Sec   Millis    50%    90%    99%  99.5%  99.9%    Max     Mean  Err  Fail Thr ValSeed
Put         6045  1654219      2      4     16     24    160   4169    3.288    0     0  20    1776
Get         2334  4285208      1     14    111    150    427   9003    8.552    0     0  20
Get         2380  4202468      2     10    110    144    290   8380    8.387    0     0  20
Get         2616  3822749      8     40    177    226   1559  24092   19.069    0     0  50
Put         4054  2466583      2      4     17     26    194 174614    4.915    0     0  20    1812
Put         5768  1733745      2      4     16     24    172   4190    3.446    0     0  20    1846
Get          294 33972166     43    141    371    567   1737  67276   67.928    0     0  20
Get          350 28551712     39    116    256    354   1629  37371   57.087    0     0  20
PutGet       285 35124479     40    124    388    801   2384 456321   70.232    0     0  20    1861
Get          210 47730677     79    182    341    418   1141  36368   95.446    0     0  20
Get          211 47305663     79    180    335    409   1097 104592   94.595    0     0  20
Get          249 40157243    175    339    598    730   2164  77675  200.751    0     0  50

Variability Examples

Here are some examples demonstrating the variability in results. The examples are from identical tests in the same context.

Mongo

First 10M puts

Test     Req/Sec   Millis    50%    90%    99%  99.5%  99.9%    Max     Mean  Err  Fail  Thr
Put         2137  4679117      1     21     41     49   1388  40931    9.338    0     0  20
Put         2156  4639027      1     21     41     48   1450  36412    9.258    0     0  20
Put         2114  4729454      1     20     43     60   1576  36232    9.442    0     0  20
Put         2079  4810938      1     20     42     55   1354  38436    9.605    0     0  20

First 10M gets

Test     Req/Sec   Millis    50%    90%    99%  99.5%  99.9%    Max     Mean  Err  Fail  Thr

Get         3187  3138189      2     10     18     25    225   4665    6.260    0     0  20
Get         3115  3210200      2     10     18     26    258  13211    6.403    0     0  20
Get         3182  3142258      2     10     18     28    273   4684    6.268    0     0  20
Get         3141  3184198      2     10     18     27    271  22285    6.352    0     0  20

No SQL Data Store Evaluation

October 18, 2011

Recently I’ve been doing some extensive and intensive evaluation of various No SQL implementations capable of storing 600,000 key/value items with a latency preferably under 50 milliseconds. Extensive in the sense of examining many providers and intensive in drilling down to test key features of each system. The specific requirements precluded looking at all stores, but a number of quite unique systems – each with its strong personality – were examined in substantial detail.

The initial requirements were:

  • Eventually be able to store six months of user profiles for ad engagements
  • Insertion of bout 3-5 million records daily – an equivalent amount had to be deleted
  • Key/value access – no need for range or index queries
  • All keys were random MDA hashes
  • Retrieval latency under 50 ms
  • Timestamp expiration of records
  • Reads needed to be very fast – writes could be batched and delayed
  • Ability to manipulate nodes of to live cluster, e.g. add a new node without restart
In order to execute the evaluation I developed a suite of structured high load tests that could be run against all providers.  Besides raw benchmarks there were also tests for key technical issues such as failover behavior. It was not possible for all implementations to be comparable for all features. Obviously providers made different design and feature trade-offs – that’s their unique personality.
Some topics examined:
  • Predictability – we need low variability in response time – latency percentiles are helpful here.
  • Repeatibility – we want to run the same test in the same context repeatedly with same results subject to an obvious margin or error. Establishing a consistent margin is a difficult chore.
  • Consistency model – eventual consistency and/or immediate. How is the CAP theorem handled?
  • Failover – when a node dies what happens to reads and writes?
  • Sharding – ability to shard data and key partitioning strategies.
  • Compaction policy – how is data stored, how are updates and deletions handled (tombstones). If implemented in Java, how much of a hist is garbage collection? I
The following systems were evaluated, with the first two as  the final candidates.
  • Cassandra
  • MongoDB
  • Membase
  • Riak
  • Redis
In addition the idea of a bank of caching servers fronting the NoSQL stores was examined – Memcached, Ehcache REST server, and custom REST servers with a pluggable cache engine (Ehcache, Infinispan). In some cases the NoSQL stores were so performant (MongoDB) that a separate caching layer was not immediately necessary. Perhaps under greater volume it would be. They key was to have a provisional caching strategy as a reserve.

MongoDB is very fast and reliable and it made the first cut. Membase also proved fast, but it had some reliability issues especially in server restarts. Cassandra provided the full set of features we needed though it was significantly slower than MongoDB.  Since we were testing with a constant client thread count of 20 or 50, being able to stay alive and keep up with this rate was important.

One of the frustrating aspects was the difficulty in obtaining adequate or minimal hardware. Since the raison de etre of the project was high volume, it was important to test with as large numbers as possible. Differences between low and high volume scenarios can give you drastically different results and lead you to costly wrong conclusions. There is also a natural tension between reasonable turnaround time for results and the volume and rate of data testing.  For example, to run a standard  melange of CRUD operations (inserts, gets, updates) 14 million times, MongoDB takes 20 hours versus 73 hours for Cassandra. That’s a lot of time to wait for the results of a configuration toggle switch!

One of the great parts of the project was the ability to study in detail key features of  systems and then implement test scenarios that exercised these features. A good example of a praxis: applying solidly founded ideas into practice. One has to first discover and understand what has to be tested. Comparing eventual consistency systems such as Cassandra and Riak is a challenging endeavor that stimulates the intellect. For instance, how do Bloom filters (slight risk of false positives) obviate the need for disk access? How do go about testing to see if this is true? Bloom filters are even used in DNA sequencing – see here. Then throw in MongoDB’s differing but overlapping approach, and you get quite a rich NoSQL stew. Documentation and articulation of key features was an important evaluation criterion.

VTest Testing Framework

April 12, 2010

In order to test basic Voldemort API methods under specified realistic load scenarios, I leveraged the “VTest” framework that I had previously written for load testing. VTest is a light-weight Spring-based framework that separates the execution strategy from the business tasks and provides cross-cutting features such as statistics gathering and reporting.

The main features of VTest are:

  • Declarative workflow-based testing framework based on Spring
  • Separation of concerns: framework, executor, job, task, key and value generation strategies
  • Implementations of these concerns are all pluggable and configurable via Spring dependency injection and bean wiring
  • Framework handles cross-cutting concerns: error handling, call statistics, result reporting, and result persistence
  • Conceptually inspired by java.util.concurrent’s Executor
  • Executors: SequentialExecutor, FixedThreadPoolExecutor, ScheduledThreadPoolExecutor
  • Executor invokes a job or an individual task
  • A task is a unit of work – a job is a collection of tasks
  • Tasks are implemented as Java classes
  • Jobs are specified as lists of tasks in Spring XML configuration file
  • VTest configuration acts as a high-level testing DSL (Domain Specific Language)

Sample Result

Here is a sample output for a CRUD job that puts one million key/value pairs, gets them, updates them and finally deletes them. Each task is executed for N requests – N being one million – with a thread pool of 200. The pool acts as a Leaky Bucket (thanks to Joe for this handy reference). The job is executed for five iterations and both the details of each individual run and the aggregated result are displayed.

Description of columns:

  • Req/Sec – requests per second or throughput
  • Ratio – the fraction of total time for the task. The ratio is an inverse of the throughput – the higher the ratio, the lower the throughput.
  • The five % columns represent standard latency percentiles. For example, in the first PutCreate 99-th percentile means that 99% of the requests were 384 milliseconds or less.
  • Max – maximum latency. It is instructive to see that for large request sets, the 99.9 percentile doesn’t accurately portray the slowest requests. Notice that for the first PutCreate the Max is over five seconds whereas the 99.9 percentile is only 610 milliseconds. There’s a lot going on this in 0.01 % of requests! In fact Vogels makes a point that  Amazon doesn’t focus so much on averages but on reducing these exterme “outliers”.
  • Errors – number of exceptions thrown by the server. There is an example in the third PutUpdate.
  • Fails – number of failures. A failure is when the server does not throw an exception but the business logic deems the result incorrect. For example, if the retrieved value does not match its expected value, a failure is noted. Observe that there are 29,832 failures for the third Get – a rather worrisome occurrence.
  • StdDev – standard deviation
==== DETAIL STATUS ============ 

Test         Req/Sec    50%    90%    99%  99.5%  99.9%    Max  Errors  Fails  StdDev
PutCreate       9921      7     29    384    454    610   5022       0      0   61.31
PutCreate       9790      7     31    358    427    516    707       0      0   55.23
PutCreate       8727      7     32    398    457    558    980       0      0   63.98
PutCreate      14354      7     26    122    213    375    613       0      0   27.51
PutCreate       8862      7     31    402    461    577    876       0      0   63.65
Total           9639      7     30    376    442    547   5022       0      0   58.03  

Test         Req/Sec    50%    90%    99%  99.5%  99.9%    Max  Errors  Fails  StdDev
Get            24364      6     10     78     88    114    440       0      0   11.35
Get            23568      6     11     81     89    159    320       0      0   12.31
Get            22769      7     11     81     89    109    381       0  28932   11.93
Get            23174      7     10     80     87     99    372       0      0   11.78
Get            22919      7     10     80     89    216    369       0      0   13.33
Total          23264      7     10     80     88    110    440       0  28932   12.15  

Test         Req/Sec    50%    90%    99%  99.5%  99.9%    Max  Errors  Fails  StdDev
PutUpdate       6555     11     32    554    943   1115   2272       0      0  101.49
PutUpdate       6412     11     32    574    900   1083   2040       0      0  101.99
PutUpdate       2945      3     10   4007   4009   4020   6010       1      0  494.14
PutUpdate       6365     11     35    537    746   1101   2118       0      0   97.55
PutUpdate       6634     11     32    537    853   1095   1293       0      0   98.18
Total           5668     10     31    554    978   4008   6010       1      0  197.87  

Count  Exception
1      class voldemort.store.InsufficientSuccessfulNodesException  

Test         Req/Sec    50%    90%    99%  99.5%  99.9%    Max  Errors  Fails  StdDev
Delete          6888     17     46    266    342    442    860       0      0   44.37
Delete          7649     17     43    176    263    395    619       0      0   34.11
Delete          8156     17     43    133    153    244    423       0   8544   25.03
Delete          7539     17     44    180    276    447    759       0      0   36.53
Delete          7457     17     43    218    285    420    714       0      0   38.02
Total           7494     17     44    203    280    410    860       0   8544   36.44  

=== SUMMARY STATUS ============
Test         Req/Sec  Ratio    50%    90%    99%  99.5%  99.9%    Max  Errors  Fails  StdDev
DeleteTable   307456  0.01       0      0      0      0      0      2       0      0    0.01
StoreCreate     9639  0.23       7     30    376    442    547   5022       0      0   58.03
Retrieve       23264  0.09       7     10     80     88    110    440       0  28932   12.15
StoreUpdate     5668  0.38      10     31    554    978   4008   6010       1      0  197.87
Delete          7494  0.29      17     44    203    280    410    860       0   8544   36.44
Total                                                                       1  37476         

Count  Exception
1      voldemort.store.InsufficientSuccessfulNodesException  

Config Parameters:
  requests           : 1000000
  threadPoolSize     : 200
  valueSize          : 1000

Sample Chart

Since call statistics are persisted in a structured XML file, the results can be post-processed and charts can be generated. The example below compares the throughput for four different record sizes: 1k, 2k, 3k and 5k. It is implemented using the popular open-source JFreeChart package .

VTest Job Configuration File

The jobs and tasks are defined and configured in a standard Spring configuration file. For ease-of-use, the dynamically varying properties are externalized in the vtest.properties file.

<beans>
  <bean id="propertyConfigurer"
        class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
    <property name="locations" value="classpath:vtest.properties" />
    <property name="systemPropertiesMode" value="2" />
  </bean>

<!-- ** Jobs/Tasks ************************ -->

  <util:list id="crud.job"  >
    <ref bean="putCreate.task" />
    <ref bean="get.task" />
    <ref bean="putUpdate.task" />
    <ref bean="delete.task" />
  </util:list>

  <bean id="putCreate.task" class="com.amm.vtest.tasks.voldemort.PutTask" scope="prototype" >
    <constructor-arg ref="taskConfig" />
    <constructor-arg value="PutCreate" />
  </bean>

  <bean id="putUpdate.task" class="com.amm.vtest.tasks.voldemort.PutTask" scope="prototype" >
    <constructor-arg ref="taskConfig" />
    <constructor-arg value="PutUpdate" />
  </bean>

  <bean id="get.task" class="com.amm.vtest.tasks.voldemort.GetTask" scope="prototype" >
    <constructor-arg ref="taskConfig" />
  </bean>

  <bean id="delete.task" class="com.amm.vtest.tasks.voldemort.DeleteTask" scope="prototype" >
    <constructor-arg ref="taskConfig" />
  </bean>

  <bean id="taskConfig" class="com.amm.vtest.tasks.voldemort.VoldemortTaskConfig" scope="prototype" >
    <constructor-arg value="${cfg.store}" />
    <constructor-arg value="${cfg.urls}" />
    <constructor-arg value="${cfg.clientConfigFile}" />
    <property name="valueSize"      value="${cfg.valueSize}" />
    <property name="valueGenerator" ref="valueGenerator" />
    <property name="keyGenerator"   ref="keyGenerator" />
    <property name="checkValue"     value="${cfg.checkRetrieveValue}" />
  </bean>

<!-- ** VTest **************** -->

  <bean id="vtestProcessor"
        class="com.amm.vtest.VTestProcessor" scope="prototype">
    <constructor-arg ref="executor" />
    <constructor-arg ref="callStatsReporter" />
    <property name="warmup"          value="${cfg.warmup}" />
    <property name="logDetails"      value="true" />
    <property name="logDetailsAsXml" value="true" />
  </bean>

  <bean id="callStatsReporter"
        class="com.amm.vtest.services.callstats.CallStatsReporter" scope="prototype">
    <property name="properties" ref="configProperties" />
  </bean>

  <util:map id="configProperties">
    <entry key="requests" value="${cfg.requests}" />
    <entry key="threadPoolSize" value="${cfg.threadPoolSize}" />
    <entry key="valueSize" value="${cfg.valueSize}" />
  </util:map >

<!-- ** Executors **************** -->

  <alias alias="executor" name="fixedThreadPool.executor" />

  <bean id="sequential.executor"
        class="com.amm.vtest.SequentialExecutor" scope="prototype">
    <property name="numRequests" value="${cfg.requests}" />
  </bean>

  <bean id="fixedThreadPool.executor"
        class="com.amm.vtest.FixedThreadPoolExecutor" scope="prototype">
    <property name="numRequests"     value="${cfg.requests}" />
    <property name="threadPoolSize"  value="${cfg.threadPoolSize}" />
    <property name="logModulo"       value="${cfg.logModulo}" />
  </bean>

</beans>

VTest Properties

cfg.urls=tcp://10.22.48.50:6666,tcp://10.22.48.51:6666,tcp://10.22.48.52:6666
cfg.store=test_mysql
cfg.requests=1000000
cfg.valueSize=1000
cfg.threadPoolSize=200
cfg.clientConfigFile=client.properties
cfg.checkRetrieveValue=false
cfg.warmup=false
cfg.logModulo=1000
cfg.fixedKeyGenerator.size=36
cfg.fixedKeyGenerator.reset=true

Run Script

. common.env

CPATH="$CPATH;config"
PGM=com.amm.vtest.VTestDriver
STORE=test_bdb
CONFIG=vtest.xml

job=crud.job
iterations=1
requests=1000000
threadPoolSize=200
valueSize=1000

opts="r:t:v:i:"
while getopts $opts opt
  do
  case $opt in
    r) requests=$OPTARG ;;
    t) threadPoolSize=$OPTARG ;;
    v) valueSize=$OPTARG ;;
    i) iterations=$OPTARG ;;
    \?) echo $USAGE " Error"
        exit;;
    esac
  done
shift `expr $OPTIND - 1`
if [ $# -gt 0 ] ; then
  job=$1
  fi

tstamp=`date "+%F_%H-%M"` ; logdir=logs-$job-$tstamp ; mkdir $logdir

PROPS=
PROPS="$PROPS -Dcfg.requests=$requests"
PROPS="$PROPS -Dcfg.threadPoolSize=$threadPoolSize"
PROPS="$PROPS -Dcfg.valueSize=$valueSize"

time -p java $PROPS -cp $CPATH $PGM $* \
  --config $CONFIG --iterations $iterations --job $job \
  | tee log.txt

cp -p log.txt log-*.xml times-*.txt *.log $logdir

XML Logging Output

The call statistics for each task run are stored in an XML files for future reference and possible post-processing, e.g. charts, database persistences, cross-run aggregation. JAXB and a XSD schema are used to process the XML.

<callStats>
    <taskName>task-Put</taskName>
    <date>2010-04-04T21:50:21.459-04:00</date>
    <callsPerSecond>13215.975471149524</callsPerSecond>
    <elapsedTime>75666</elapsedTime>
    <standardDeviation>27.547028113708425</standardDeviation>
    <callRatio>0.34021105261028106</callRatio>
    <calls failures="0" errors="0" all="1000000"/>
    <percentiles>
        <percentile50>7.0</percentile50>
        <percentile90>31.0</percentile90>
        <percentile99>124.0</percentile99>
        <percentile995>163.0</percentile995>
        <percentile999>269.0</percentile999>
    </percentiles>
</callStats>

Eventual Consistency Testing

April 12, 2010

I’ve been recently involved in testing a massively scalable application based on an eventual consistency  framework called Voldemort.

The key articles on Dynamo and eventual consistency are:

Dynamo has inspired a variety of frameworks based on distributed hash table principles such as Cassandra, Voldemort, Mongo etc. What all these tools strive to address is the inherent limit to massive scalability with traditional relational databases. Hence the name “No SQL”.

How is Dynamo tested?

All this sounds fine, but the real question is: does this work in real life? Unfortunately Amazon has not exposed the Dynamo source code, and except that it is written in Java, little is known. As a pragmatic sort of fellow, I am always keen on knowing the nuts and bolts of new-fangled solutions. How does Amazon certify builds? What is Amazon’s test framework for such a massively scalable framework such as Dynamo? What sort of tests do they have? How do they specify and test their SLAs? How do they test the intricate and complex logic associated with quorum-based logic as cluster nodes are brought up and down? I could well imagine that the complexity of such a test environment exceeding the complexity of the application itself.

Embedded and Standalone Execution Contexts

One of the nice things about the Voldemort project is its strong emphasis on modularity and mockable objects. The Voldemort server has the capability of being launched in an embedded mode, and this greatly facilitates many testing scenarios. However, this in no way replaces the need to test against an actual standalone server. Embedded vs. standalone testing is a false dilemma. The vast majority of test cases can and should be run in both modes. Embedded for ease of use, but standalone for truer validation since it more closely approximates the target production environment. So the first step was to create an “Execution Context” object that encapsulated the different bootstrapping logic.

InitContext Interface.

public interface InitContext {
  public void start() throws IOException ;
  public void stop() throws IOException ;
  public VoldemortStoreDao getTestStoreDao() ;
}

EmptyContext for standalone server. Nothing much needs to be done since server is launched externally.

public class EmptyInitContext implements InitContext
{
  private VoldemortStoreDao storeDao ;

  public EmptyInitContext(VoldemortStoreDao storeDao) {
    this.storeDao = storeDao ;
  }

  public void start() throws IOException {
  }

  public void stop() throws IOException {
  }

  public VoldemortStoreDao getTestStoreDao() {
    return storeDao ;
  }
}

EmbeddedContext for an embedded Voldemort server that uses an embedded Berkeley DB store.

public class EmbeddedServerInitContext implements InitContext
{
  private VoldemortServer server ;
  private TestConfig testConfig ;
  private VoldemortStoreDao testDao ;
  private boolean useNio = false ;

  public EmbeddedServerInitContext(TestConfig testConfig) {
    this.testConfig = testConfig ;
  }

  public void start() throws IOException {
    String configDir = testConfig.getVoldemortConfigDir();
    String dataDir = testConfig.getVoldemortDataDir();
    int nodeId = 0 ;
    server = new VoldemortServer(
      ServerTestUtils.createServerConfig(useNio, nodeId, dataDir,
        configDir + "/cluster.xml", configDir + "/stores.xml",
        new Properties() ));
    server.start();

    StoreRepository srep = server.getStoreRepository();
    List stores = srep.getAllLocalStores() ;
    for (Store store : stores) {
      Store lstore = VoldemortTestUtils.getLeafStore(store);
      if (lstore instanceof StorageEngine) {
        if (store.getName().equals(testConfig.getStoreName())) {
          StorageEngine engine = (StorageEngine) lstore ;
          StorageEngineDaoImpl dao = new StorageEngineDaoImpl(engine);
          testDao = dao ;
          break;
          }
        }
      }
  }

  public void stop() throws IOException {
    ServerTestUtils.stopVoldemortServer(server);
  }

  public VoldemortStoreDao getTestStoreDao() {
    return testDao ;
  }
}

Server Cycling

One important place where embedded and standalone testing logic do differ is in server cycling. This is especially important when testing eventual consistency scenarios. Server cycling refers to the starting and stopping of server nodes. In embedded mode this is no problem since everything is executing inside one JVM process. When the servers are separate processes, the problem becomes significantly more difficult. Stopping a remote Voldemort server actually turns out to be easy since Voldemort exposes a JMX MBean with a stop operation. Needless to say this technique can not be used to start a server! In order to launch a server, the test client has to somehow invoke a script on a remote machine. The following steps need to done:

  • Use Java Runtime.exec to ssh a script on remote machine
  • Script must first check that a server is not running – if it is an error is returned
  • Script calls voldemort-server.sh
  • Script waits an indeterminate amount of time to allow the server to start
  • Script invokes “some operation” to ascertain that the server is ready to accept requests

As you can see each step is fraught with problems. In local embedded mode this series of complex steps is subsumed in the blocking call to simply create a new in-process object. In standalone mode, the wait step is problematic since there is no precise amount of time to wait. Wait and then do what to determine server liveliness? Invoke an operation? This would/could affect the integrity of the very operation we are testing! One potential solution is to invoke a JMX operation that would serve the purpose of a liveliness check. Assuming all goes well, all of this takes time and for a large battery of tests the overall execution time is significantly increased.

Eventual Consistency Test Example

Let us look at some examples. Assume we have a three node cluster with N=3,W=2,R=2. N is the number of nodes, W is the number of writes that must succeed and R is the required reads. For example, for a write operation the system will try to write the data to all nodes. If two (or three) succeed the operation is considered as succesful.

Get – One Node Down

  • Call put(K,V)
  • For each node in cluster
    • Stop node
    • Call V2=get(K)
    • Assert that V==V2
    • Start node

This logic needs to be executed against all thirteen Voldemort operations: get, put, putIfNotObsolete, delete, etc. Whew! Now imagine if requirements are to test against two different cluster configurations, a 3/2/2 and 5/3/3!