Skip to main content

Situational Aware Mappers with Jaql

Adapting MapReduce for a higher performance has been one of the popular discussion topics. Let’s continue with our series on Adaptive MapReduce and explore the feature available via Jaql in IBM BigInsights commercial offering. This implementation also points to a much more vital corollary that enterprise offerings of Apache Hadoop are not just mere packaging and re-sell but have a bigger research initiative going on beneath the covers.

First a snapshot from a IBM Infosphere BigInsights presentation on what Adaptive MapReduce means in the product context.

To implement this, the guidance offered to IBM InfoSphere BigInsights v 1.3 developers is :

Normal Hadoop map tasks take exactly one split of data each.
Adaptive mappers start fewer map tasks and each mapper decides at run time
how many splits to take to process all splits. This process is beneficial if the task
startup is very expensive (for example, the task has to load some reference data),
or if a job has many small splits (for example, the input directory has many small files).
In both cases, adaptive mappers minimize the task startup cost, yet they balance the
workload across the cluster.

Adaptive MapReduce is off by default. 
You can turn it on in your Jaql query by using the setOptions function 
to set = true, as shown in the following example:
conf = {
  "": true

Now, let us dig deeper and explore 2 papers which demonstrate how Adaptive MapReduce can be leveraged using Situation Aware Mappers (SAM) and physical transparency feature of Jaql.
[1] Rares Vernica, Andrey Balmin, Kevin S. Beyer, Vuk Ercegovac: Adaptive MapReduce using situation-aware mappers. EDBT 2012: 420-431
[2] Andrey Balmin, Vuk Ercegovac, Rares Vernica, Kevin S. Beyer: Adaptive Processing of User-Defined Aggregates in Jaql. IEEE Data Eng. Bull. 34(4): 36-43 (2011)

The papers describe Adaptive Mapper, Adatpive Combiner, Adadptive Sampling and Adaptive  Partitioning.

Adaptive Mappers (AMs) dynamically stitch together multiple splits to be processed by a single mapper while changing the checkpoint interval at run time. (discussed in more detail below).

Adaptive Combiners (ACs) use best-effort hash-based aggregation of map outputs in
a fixed-size hash table kept in a mapper like Hive. Adaptive Combiner has been claimed to speed up user defined aggregates like determining top million pages with most number of out links. Leveraging Jaql’s physical transparency, the global thresholds are updated and coordinated between mappers which ensure the records below the threshold lower bound are discarded by mapper itself and not sent to reducer. This implies quicker sort, shuffle and merge along with lesser data travel.

Adaptive Sampling (AS) uses some early map outputs to produce a global sample of their keys which helps to determine when to stop sampling at runtime. Adaptive Partitioning (AP) dynamically partitions map outputs based on the sample. Mappers co-ordinate in parallel and the partitioning function is decided by one of them based on the sample. As soon as the partitioning function is decided, the mappers can start outputting data, which triggers the start of their partitioners.

One of the main components of this Situational Aware Mapper(SAM) technique is a distributed meta-data store (DMDS) which is implemented using Apache ZooKeeper. DMDS is a asynchronous communication channel between mappers and enables the mappers to post some metadata about their state and see state of all other mappers. This enables all mappers to be situational aware about other mappers and the job states thus enabling a global coordination and decision structure.

Unlike the usual MapReduce technique, where there is a one-to-one correspondence of map tasks and splits, an Adaptive Mappers (AM) makes a decision after every split to either checkpoint or take another split and “stitch” it to already processed one. The split location information is stored in ZooKeeper. Every time an AM finishes processing a split, it makes a decision to stop or to take a new split from DMDS and concatenate it to the existing one, transparently to the map function.

The key steps in an Adaptive Mapper local split processing include:
1: Create a locations and an assigned node for each job.
(where locations stores the metadata while assigned maps the split to mapper)
2: Use virtual splits to start mappers.
3: Connect to ZooKeeper and retrieve a list of the real splits, which are local to the current host
4: AM picks a random split from the list and locks it
5: Process
When mappers finish processing local splits, they will process unprocessed remote splits (as in steps 4, 5) which are determined by subtracting the list of assigned splits from the list of available splits at that host.

In a usual MapReduce scenario, having more mappers increases task scheduling and starting overhead, like running user code to perform job-specific setup tasks. However, having smaller splits tends to reduce the benefit from applying a combiner. The AM tends to decouple the number of splits from the number of mappers thus tries to achieve load balancing, reduced scheduling and starting overhead, and combiner benefit. AMs, however, do not use speculative execution and instead rely on relatively small split size. AM failure resolution involves automatically restarting failed mappers. While the merits may vary by use-case, one of the attractions of this approach seems to be no dependence on prior learning or modeling of job execution. Plus with code already imbibed in the Hadoop offerings, this approach is definitely a pick among various adaptive MapReduce approach.

Adaptive MapReduce using Situation-Aware Mappers

Top Image theme: Situational Aware Jaql ; courtesy:


Popular posts from this blog

In-memory data model with Apache Gora

Open source in-memory data model and persistence for big data framework Apache Gora™ version 0.3, was released in May 2013. The 0.3 release offers significant improvements and changes to a number of modules including a number of bug fixes. However, what may be of significant interest to the DynamoDB community will be the addition of a gora-dynamodb datastore for mapping and persisting objects to Amazon's DynamoDB. Additionally the release includes various improvements to the gora-core and gora-cassandra modules as well as a new Web Services API implementation which enables users to extend Gora to any cloud storage platform of their choice. This 2-part post provides commentary on all of the above and a whole lot more, expanding to cover where Gora fits in within the NoSQL and Big Data space, the development challenges and features which have been baked into Gora 0.3 and finally what we have on the road map for the 0.4 development drive.
Introducing Apache Gora Although there are var…

Data deduplication tactics with HDFS and MapReduce

As the amount of data continues to grow exponentially, there has been increased focus on stored data reduction methods. Data compression, single instance store and data deduplication are among the common techniques employed for stored data reduction.
Deduplication often refers to elimination of redundant subfiles (also known as chunks, blocks, or extents). Unlike compression, data is not changed and eliminates storage capacity for identical data. Data deduplication offers significant advantage in terms of reduction in storage, network bandwidth and promises increased scalability.
From a simplistic use case perspective, we can see application in removing duplicates in Call Detail Record (CDR) for a Telecom carrier. Similarly, we may apply the technique to optimize on network traffic carrying the same data packets.
Some of the common methods for data deduplication in storage architecture include hashing, binary comparison and delta differencing. In this post, we focus on how MapReduce and…

Amazon DynamoDB datastore for Gora

What was initially suggested during causal conversation at ApacheCon2011 in November 2011 as a “neat idea”, would soon become prime ground for Gora's first taste of participation within Google's Summer of Code program. Initially, the project, titled Amazon DynamoDB datastore for Gora, merely aimed to extend the Gora framework to Amazon DynamoDB. However, it seem became obvious that the issue would include much more than that simple vision.

The Gora 0.3 Toolbox We briefly digress to discuss some other noticeable additions to Gora in 0.3, namely: Modification of the Query interface: The Query interface was amended from Query<K, T> to Query<K, T extends Persistent> to be more precise and explicit for developers. Consequently all implementors and users of the Query interface can only pass object's of Persistent type. Logging improvements for data store mappings: A key aspect of using Gora well is the establishment and accurate definitio…