Skip to main content

Apache Crunch: Laying the pipelines for the Hadoop refinery

Apache Crunch recently graduated from an incubator to an ASF top level project. Let’s take a better look at this Java library which aims to make writing, testing and running MapReduce pipelines easy.

Real world problems require a chain of Map, Shuffle, Reduce, Combine phases sequenced in parallel or one after the another. This chain of MapReduce phases for processing can be simply termed as a MapReduce pipeline. To coordinate this pipeline as well as provide data and implementation abstraction, Apache Crunch provides a library for Hadoop programmers. Hadoop, as we know, has often been compared to be a Data refinery and it makes sort of literal analogous extension for Crunch to fit in the role of pipeline for the Hadoop data refinery.

At the core of Apache Crunch is a data model which consists of classes like PCollection<T> and PTable<K,V> representing distributed, immutable collection of objects. Further, it has a defined set of operators which support primitive operations for parallel processing, grouping, sorting etc.
(hover over elements below to view description)

Data ModelOperators
Pipeline
DoFn
MRPipeline
CombineFn
MemPipeline
FilterFn
PCollection
Joins
PTable
Cartesian
PGroupTable
Sort
Source
Secondary Sort
Target
PObject
Emitter
BloomFilters
PType


For instance, to implement the common Word Count example, an Apache Crunch program would have the following steps:

  1. Create a Pipeline object
  2. Read input (e.g. text file) into a PCollection
  3. Execute various functions on input data
    e.g.
     
    PTable<String, Long> counts =
      lines.parallelDo(extractWord,
        Writables.strings())
            .count();
    Or
    PCollection<String> words = lines.parallelDo(new DoFn<String, String>() {
    public void process(String line, Emitter<String> emitter) {
            for (String word : line.split("\\s+")) {
              emitter.emit(word);
            }
          }
        }, Writables.strings());

     PTable<String, Long> counts = words.count();

  1. Persist the output collection and execute the pipeline


Beyond Pig and Hive, there are a couple of competing tools which have emerged in this space including Cascading, Scalding, Cascalog, Scoobi, Spark.  Apache Crunch, however like its parent FlumeJava, uses a multiple serializable type(MST) model instead of single serializable type(SST) model. MST is claimed by Crunch team to provide compile time verification, easy writing of user defined functions and support jobs using complex data types. Further, its is claimed to be a “better fit for data sets that do not naturally fit into the Tuple model, such as images, time series, audio files and seismograms”.

Crunch's MST serialization model currently has two different implementations, one based on Writables and the other based on Avro records. It has been acknowledged based on independent benchmarking that Avro implementation is much faster than Writables and should be preferred implementation for Crunch to gain performance advantage. Now that Crunch has matured out as top level project, we hope to see more committers and contributors beyond the currently pro-active pool. There is still some more usability journey for the product to cover like better documentation, easy tutorials, and manageable code. However, all these would be routine release activities as it registers its presence as part of commercial distributions as well. Meanwhile, welcome the new graduate. Cheers.

Comments

Popular posts from this blog

In-memory data model with Apache Gora

Open source in-memory data model and persistence for big data framework Apache Gora™ version 0.3, was released in May 2013. The 0.3 release offers significant improvements and changes to a number of modules including a number of bug fixes. However, what may be of significant interest to the DynamoDB community will be the addition of a gora-dynamodb datastore for mapping and persisting objects to Amazon's DynamoDB. Additionally the release includes various improvements to the gora-core and gora-cassandra modules as well as a new Web Services API implementation which enables users to extend Gora to any cloud storage platform of their choice. This 2-part post provides commentary on all of the above and a whole lot more, expanding to cover where Gora fits in within the NoSQL and Big Data space, the development challenges and features which have been baked into Gora 0.3 and finally what we have on the road map for the 0.4 development drive.
Introducing Apache Gora Although there are var…

Amazon DynamoDB datastore for Gora

What was initially suggested during causal conversation at ApacheCon2011 in November 2011 as a “neat idea”, would soon become prime ground for Gora's first taste of participation within Google's Summer of Code program. Initially, the project, titled Amazon DynamoDB datastore for Gora, merely aimed to extend the Gora framework to Amazon DynamoDB. However, it seem became obvious that the issue would include much more than that simple vision.

The Gora 0.3 Toolbox We briefly digress to discuss some other noticeable additions to Gora in 0.3, namely: Modification of the Query interface: The Query interface was amended from Query<K, T> to Query<K, T extends Persistent> to be more precise and explicit for developers. Consequently all implementors and users of the Query interface can only pass object's of Persistent type. Logging improvements for data store mappings: A key aspect of using Gora well is the establishment and accurate definitio…

Data deduplication tactics with HDFS and MapReduce

As the amount of data continues to grow exponentially, there has been increased focus on stored data reduction methods. Data compression, single instance store and data deduplication are among the common techniques employed for stored data reduction.
Deduplication often refers to elimination of redundant subfiles (also known as chunks, blocks, or extents). Unlike compression, data is not changed and eliminates storage capacity for identical data. Data deduplication offers significant advantage in terms of reduction in storage, network bandwidth and promises increased scalability.
From a simplistic use case perspective, we can see application in removing duplicates in Call Detail Record (CDR) for a Telecom carrier. Similarly, we may apply the technique to optimize on network traffic carrying the same data packets.
Some of the common methods for data deduplication in storage architecture include hashing, binary comparison and delta differencing. In this post, we focus on how MapReduce and…