Skip to main content

Situational Aware Mappers with Jaql

Adapting MapReduce for a higher performance has been one of the popular discussion topics. Let’s continue with our series on Adaptive MapReduce and explore the feature available via Jaql in IBM BigInsights commercial offering. This implementation also points to a much more vital corollary that enterprise offerings of Apache Hadoop are not just mere packaging and re-sell but have a bigger research initiative going on beneath the covers.

First a snapshot from a IBM Infosphere BigInsights presentation on what Adaptive MapReduce means in the product context.

To implement this, the guidance offered to IBM InfoSphere BigInsights v 1.3 developers is :

Normal Hadoop map tasks take exactly one split of data each.
Adaptive mappers start fewer map tasks and each mapper decides at run time
how many splits to take to process all splits. This process is beneficial if the task
startup is very expensive (for example, the task has to load some reference data),
or if a job has many small splits (for example, the input directory has many small files).
In both cases, adaptive mappers minimize the task startup cost, yet they balance the
workload across the cluster.

Adaptive MapReduce is off by default. 
You can turn it on in your Jaql query by using the setOptions function 
to set = true, as shown in the following example:
conf = {
  "": true

Now, let us dig deeper and explore 2 papers which demonstrate how Adaptive MapReduce can be leveraged using Situation Aware Mappers (SAM) and physical transparency feature of Jaql.
[1] Rares Vernica, Andrey Balmin, Kevin S. Beyer, Vuk Ercegovac: Adaptive MapReduce using situation-aware mappers. EDBT 2012: 420-431
[2] Andrey Balmin, Vuk Ercegovac, Rares Vernica, Kevin S. Beyer: Adaptive Processing of User-Defined Aggregates in Jaql. IEEE Data Eng. Bull. 34(4): 36-43 (2011)

The papers describe Adaptive Mapper, Adatpive Combiner, Adadptive Sampling and Adaptive  Partitioning.

Adaptive Mappers (AMs) dynamically stitch together multiple splits to be processed by a single mapper while changing the checkpoint interval at run time. (discussed in more detail below).

Adaptive Combiners (ACs) use best-effort hash-based aggregation of map outputs in
a fixed-size hash table kept in a mapper like Hive. Adaptive Combiner has been claimed to speed up user defined aggregates like determining top million pages with most number of out links. Leveraging Jaql’s physical transparency, the global thresholds are updated and coordinated between mappers which ensure the records below the threshold lower bound are discarded by mapper itself and not sent to reducer. This implies quicker sort, shuffle and merge along with lesser data travel.

Adaptive Sampling (AS) uses some early map outputs to produce a global sample of their keys which helps to determine when to stop sampling at runtime. Adaptive Partitioning (AP) dynamically partitions map outputs based on the sample. Mappers co-ordinate in parallel and the partitioning function is decided by one of them based on the sample. As soon as the partitioning function is decided, the mappers can start outputting data, which triggers the start of their partitioners.

One of the main components of this Situational Aware Mapper(SAM) technique is a distributed meta-data store (DMDS) which is implemented using Apache ZooKeeper. DMDS is a asynchronous communication channel between mappers and enables the mappers to post some metadata about their state and see state of all other mappers. This enables all mappers to be situational aware about other mappers and the job states thus enabling a global coordination and decision structure.

Unlike the usual MapReduce technique, where there is a one-to-one correspondence of map tasks and splits, an Adaptive Mappers (AM) makes a decision after every split to either checkpoint or take another split and “stitch” it to already processed one. The split location information is stored in ZooKeeper. Every time an AM finishes processing a split, it makes a decision to stop or to take a new split from DMDS and concatenate it to the existing one, transparently to the map function.

The key steps in an Adaptive Mapper local split processing include:
1: Create a locations and an assigned node for each job.
(where locations stores the metadata while assigned maps the split to mapper)
2: Use virtual splits to start mappers.
3: Connect to ZooKeeper and retrieve a list of the real splits, which are local to the current host
4: AM picks a random split from the list and locks it
5: Process
When mappers finish processing local splits, they will process unprocessed remote splits (as in steps 4, 5) which are determined by subtracting the list of assigned splits from the list of available splits at that host.

In a usual MapReduce scenario, having more mappers increases task scheduling and starting overhead, like running user code to perform job-specific setup tasks. However, having smaller splits tends to reduce the benefit from applying a combiner. The AM tends to decouple the number of splits from the number of mappers thus tries to achieve load balancing, reduced scheduling and starting overhead, and combiner benefit. AMs, however, do not use speculative execution and instead rely on relatively small split size. AM failure resolution involves automatically restarting failed mappers. While the merits may vary by use-case, one of the attractions of this approach seems to be no dependence on prior learning or modeling of job execution. Plus with code already imbibed in the Hadoop offerings, this approach is definitely a pick among various adaptive MapReduce approach.

Adaptive MapReduce using Situation-Aware Mappers

Top Image theme: Situational Aware Jaql ; courtesy:


Popular articles

5 online tools in data visualization playground

While building up an analytics dashboard, one of the major decision points is regarding the type of charts and graphs that would provide better insight into the data. To avoid a lot of re-work later, it makes sense to try the various chart options during the requirement and design phase. It is probably a well known myth that existing tool options in any product can serve all the user requirements with just minor configuration changes. We all know and realize that code needs to be written to serve each customer’s individual needs. To that effect, here are 5 tools that could empower your technical and business teams to decide on visualization options during the requirement phase. Listed below are online tools for you to add data and use as playground. 1)      Many Eyes : Many Eyes is a data visualization experiment by IBM Research and the IBM Cognos software group. This tool provides option to upload data sets and create visualizations including Scatter Plot, Tree Ma

Data deduplication tactics with HDFS and MapReduce

As the amount of data continues to grow exponentially, there has been increased focus on stored data reduction methods. Data compression, single instance store and data deduplication are among the common techniques employed for stored data reduction. Deduplication often refers to elimination of redundant subfiles (also known as chunks, blocks, or extents). Unlike compression, data is not changed and eliminates storage capacity for identical data. Data deduplication offers significant advantage in terms of reduction in storage, network bandwidth and promises increased scalability. From a simplistic use case perspective, we can see application in removing duplicates in Call Detail Record (CDR) for a Telecom carrier. Similarly, we may apply the technique to optimize on network traffic carrying the same data packets. Some of the common methods for data deduplication in storage architecture include hashing, binary comparison and delta differencing. In this post, we focus o

In-memory data model with Apache Gora

Open source in-memory data model and persistence for big data framework Apache Gora™ version 0.3, was released in May 2013. The 0.3 release offers significant improvements and changes to a number of modules including a number of bug fixes. However, what may be of significant interest to the DynamoDB community will be the addition of a gora-dynamodb datastore for mapping and persisting objects to Amazon's DynamoDB . Additionally the release includes various improvements to the gora-core and gora-cassandra modules as well as a new Web Services API implementation which enables users to extend Gora to any cloud storage platform of their choice. This 2-part post provides commentary on all of the above and a whole lot more, expanding to cover where Gora fits in within the NoSQL and Big Data space, the development challenges and features which have been baked into Gora 0.3 and finally what we have on the road map for the 0.4 development drive. Introducing Apache Gora Although