Friday, November 20, 2009

Bigdata, HA, and the clouds

Mike and I have been doing some heavy thinking about the High Availability (HA) architecture for bigdata, and we think that we have a really interesting new dimension to the total story. The basic idea is that we are going to decouple the index read/write service from the distributed file system. This is going to change things in a big way. Here are some for instances:

- Add or drop bigdata services based on demand, dynamically changing the throughput for data write. This is entirely in line with the notions of cloud and will allow resource sharing on private clouds or pay per use on public clouds.

- Many machines can serve from the same shard. This means that we can hit HPC levels of concurrency for query answering. We can even partition the machines so long running queries execute against a dedicated set of hosts without interfering with normal low latency query processing.

- Blindingly fast parallel closure. There are some new algorithms out there which describe simple techniques for computing the RDFS closure of each database partition independently by replicating the "ontology" onto each node. For bigdata, this works out as computing the closure for each POS index shard independently. Using just these techniques we can probably obtain a 10x improvement when computing the database closure. However, by decoupling the index read/write services from the distributed file system, we can actually put as much as an entire machine on each POS shard, computing the closure of an arbitrarily large graph in minutes.

There are a lot of ways to handle the distributed file system, ranging from refactoring our existing code to produce a custom API, to running over any of the various distributed file systems for commodity hardware, to running over a parallel file system for an HPC cluster, SAN or NAS. Our preference is a distributed file system for commodity hardware because that supports the most reuse across both bigdata and other cloud applications, but the distributed file system must support sync to disk.

Bigdata IO is already highly optimized for both write and read operations. When we open an index segment, we fully buffer the B+Tree nodes, which are conveniently arranged in a contiguous region in the file. Likewise, when we read on an index segment, we can go directly to the leaf since the nodes are in memory. The leaves are double-linked as well, and we have bloom filters in from of the B+Tree as well as record level caching.

Journal writes are append only, occur at group commit points, and are generally 1 or more megabytes in size. Still, it might make sense to use a hybrid design where we tie the journal to a data service and use a failover chain for master and secondary data service to absorb writes. Once we close out the journal, we can blast its 200MB extent directly onto the distributed file system. Likewise, when reading against a historical commit point, we could slurp the corresponding journal onto the machine paired with the shard eliminating any remote IO for small data records on the journal. Cold shards can simply be dropped. They can be reassigned to data services on an as needed basis and brought back online nearly instantly. This will allow the number of machines dedicated to bigdata to fluctuate with the demand, which is exactly in keeping with the cloud paradigm.

This really opens up the possibilities that we see for bigdata tremendously. By tossing out the idea that the CPU/RAM of at most a single machine was available to absorb writes or compute closure, we can gain tremendous leverage from pooled computational resources.

Labels: , ,

Wednesday, October 28, 2009

Parallel materialization of the RDFS closure

There were two excellent presentations yesterday at ISWC 2009 on using parallel techniques to materialize the RDFS closure at extremely high rates (for example, the RDFS closure of U8000 in 15 minutes). This is something that we are going to try out as soon as possible. Unlike either of the systems described in these papers, bigdata using automatic dynamic sharding based on key-ranges of the data. These techniques can be adapted by mapping the computation onto the POS index shards, bringing them to fixed point, and then reusing our high-throughput data loader to quickly relocate the entailments onto the distributed indices. There is clearly a surge in parallel and distributed algorithms for the semantic web, which is extremely exciting.

[1] Jesse Weaver, James A. Hendler. Parallel Materialization of the Finite RDFS Closure for Hundreds of Millions of Triples, In Proceedings of the 8th International Semantic Web Conference, pp. 682--697, 2009.

[2] Jacopo Urbani, Spyros Kotoulas, Eyal Oren, and Frank van Harmelen. Department of Computer Science, Vrije Universiteit Amsterdam, the Netherlands, Scalable Distributed Reasoning using MapReduce, In Proceedings of the 8th International Semantic Web Conference, 2009.

Tuesday, October 27, 2009

Release 0.81b

We've just released a new version of bigdata. This release is capable of loading 1B triples in under one hour on a 15 node cluster and has been used to load up to 13B triples on the same cluster. JDK 1.6 is required. See [1] for instructions on installing bigdata(R), [2] for the javadoc and [3] and [4] for news, questions, and the latest developments.

Please note that we recommend checking out the code from SVN using the tag for this release. The code will build automatically under eclipse. You can also build the code using the ant script. The cluster installer requires the use of the ant script. You can checkout this release from the following URL:

https://bigdata.svn.sourceforge.net/svnroot/bigdata/tags/RELEASE_0.81b

New features:
  • Support for quads.
  • The B+Tree data record is now the same on the disk and in memory, eliminating de-serialization costs for immutable B+Tree records.
  • Shared LRU for all B+Tree instances in the same JVM. This provides competition across those B+Tree instances for RAM. The LRU is configured by default to use 10% of the JVM heap, but that value may be increased to 20% even on very heavy workloads with large heaps.
  • Parallel iterator for distributed access paths.
  • 3x improvement in distributed query performance. We have only just begun to optimize distributed query. This performance improvement is mainly due to the parallel access path iterator, the shared LRU, and the selection of a better chunk size for distributed query. We expect substantial improvements in query performance over the next several months.
  • Some query hotspot elimination.

The roadmap for the next release includes:
  • Full transactions for the SAIL.
  • Record level compression.
  • Query optimizations.
For more information, please see the following links:

[1] http://bigdata.wiki.sourceforge.net/GettingStarted
[2] http://www.bigdata.com/bigdata/docs/api/
[3] http://sourceforge.net/projects/bigdata/
[4] http://www.bigdata.com/blog

About bigdata:

Bigdata® is a horizontally-scaled, general purpose storage and computing fabric for ordered data (B+Trees), designed to operate on either a single server or a cluster of commodity hardware. Bigdata® uses dynamically partitioned key-range shards in order to remove any realistic scaling limits - in principle, bigdata® may be deployed on 10s, 100s, or even thousands of machines and new capacity may be added incrementally without requiring the full reload of all data. The bigdata® RDF database supports RDFS and OWL Lite reasoning, high-level query (SPARQL), and datum level provenance.

Sunday, October 18, 2009

ISWC 2009 : The Semantic Web at Web Scale

We will be presenting on Thurs, 10/29, 11:20am - 11:50am at ISWC 2009 in Washington DC. The talk will cover the bigdata architecture, present performance results on a 15 node cluster, and the bigdata roadmap. Come on out!

Thursday, October 1, 2009

Quad support for Sesame 2.x

We've recently added quad support to bigdata and integrated that support with Sesame 2.x. There are now three major database modes for RDF data: plain triples, plain triples with provenance (statement identifiers), and quads. Inference is not supported yet for quads. We plan to introduce query time inference support instead of eager closure.

The support for quads is in the development branch [1], which you can check out from SVN. This branch is fairly solid and we plan to create a release from it soon [2].

[2] https://sourceforge.net/apps/trac/bigdata/roadmap

Wednesday, August 19, 2009

B+Tree compression and buffering

We've been working on big performance improvements for the B+Tree. The goal is to drastically increase the amount of data that bigdata(R) can buffer in memory so we can really take advantage of those 64-bit JVMs while making search faster, reducing the in-memory footprint and GC churn, and reducing the network size of RMI payloads.



First, we have a new binary image for the B+Tree nodes and leaves with pluggable compression schemes that support search and value retrieval directly on the coded (compressed) representation. This will reduce the footprint for data on the disk, in memory, and on the wire and should boost search performance. We will roll out with prefix-compression (front-coding) for keys and with canonical huffman encoding options for keys and values. These compression schemes all based on fastutils[1] and dsiutils[2], which are fantastic libraries.



Second, we are moving away from a per-B+Tree buffer for persistent leaves and nodes to a shared buffer using a mixture of a LRU and a hash map with weak reference values. Combined with the smaller in-memory footprint for leaves and nodes, this will allow bigdata(R) to buffer vastly more data in RAM.


Finally, we are examining options for record-level compression to reduce the on disk footprint even further. Our thinking here is to offer a pluggable scheme, with initial support for Deflate, zip, gzip and bmz[3]. We are already coding (compressing) the keys and values, but there is an additional win to be had from a fast compression pass over the entire B+Tree node or leaf. Most compression libraries are accessed via JNI and will generally do better on larger records, so we may wind up increasing the default branching factor for the index segments to compensate. You should be able to choose a record level compression provider for the Journal, another for the index segments, and have the option to specify the compression provider separately for all shards for a specific index. Different compression schemes make sense for the journal and the index segments because the journal is geared towards fast write absoption while the index segments are geared towards read-only data. Likewise, some indices may derive more benefit from specific compression schemes.


We should have some new performance results based on these changes in a few weeks.

[1] http://fastutil.dsi.unimi.it/
[2] http://dsiutils.dsi.unimi.it/
[3] http://www.hypertable.org/doxygen/dir_636d9f0bc773a215f705e2de9f182c4e.html

Tuesday, August 4, 2009

"Mapped" RDF data loader

We've just introduced a new "mapped" data loader. The previous data loader assumed that the files were located on specific hosts in a known directory. This was designed map/reduce task in mind, where the RDF files were being dumped into the local directory by the reduce task.



With the mapped data loader you specify a scanner which is executed by the master. The default scanner knows how to identify files to be processed in file system, but it would be easy enough to write a scanner that consumed an HDFS block structured file whose contents were RDF data. This is more like the "map" stage of a map/reduce job, which is why we call it the "mapped" data loader. Regardless of how the scanner identifies the resources to be processed, the master "maps" those resources across client tasks running on the cluster.



Let us know if you are interested in loading data from HDFS or a hadoop map/reduce jobs into a massive distributed semantic web graph and we can help you work through the integration glue.