Analyzing Hadoop’s internals with Analytics

As part of our Big Data efforts, we have a team focused on Hadoop that is working hard to ensure Hadoop runs well on vSphere. We published a paper last year on Hadoop performance, and have a lot more in the pipeline.

More recently, I took up a challenge to see how much we could learn about Hadoop I/O in a very short time, using our dynamic tracing framework. The results were quite interesting.

To ensure I position this work correctly, it’s really a work-in-progress study that I’d like to engage a discussion around.  It’s what we’ve learned by digging into the architecture, to help us make engineering decisions. I hope it’s helpful, and really want to get your feedback, so we can steer future iterations of these investigations.

The data in this post is a lead up to some simple modeling of Hadoop in a cluster, so we can rationalize how Hadoop will scale in different topologies.

Why Analyze Hadoop?

To design our Hadoop architecture, we have conducted many studies of the Hadoop stack. In this post, I want to share an initial analysis to build a better understanding the resource and topology requirements are of a distributed Hadoop system. The more we discussed with Hadoop committers, it became clear that there needed to be a shared view on what Hadoop I/O actually is. For example, there is much discussion about the need for bringing compute and data together, but when you dig in, we find that there are several types of I/O in the map-reduce and Hadoop model, with very different needs for locality, performance and underlying storage.

To learn more, I decided to invoke some of my dynamic tracing lessons learned, applied to a distributed Hadoop stack – by using the vProbes introspection framework that is built into the VMware hypervisor.

Before we get into the details, lets review some of the questions we are trying to answer. At the architecture level, we are evaluating some key options – for example what happens if Map-Reduce and Datanode are running in different VMs or on different hosts. In this case, some of the data is accessed remote, but as shown by this analysis, much of the access is to ephemeral data, which can always be local. More on this topic later.

To fully understand these dynamics, I’m seeking to categorize the major types of I/O into groups, and then be able to describe the types of I/O for each.

Some of the questions that we are looking for answers are:

  • What are the major categories of Hadoop I/O?
  • How much I/O is generated per unit of compute?
  • How much I/O should we design for on each host?
  • What is the nature of that I/O — is it sequential, random, what block size?
  • For Hadoop HDFS, what is the impact of I/O being remote from the host?
  • What categories of I/O are remote vs. local if the task is run on a remote node?
  • Would 10Gb Ethernet help?

This resulted in a mini-study of Hadoop’s I/O, which I’d like to open as a starting point for discussion. Again, this is early observations, and I’d like to solicit opinion from the community and progress the analysis with feedback.

Keeping it Simple

To enable us to get visibility into the I/O categories, I’ve deliberately simplified the workload down to very basic configuration. In fact, I’ve used a ridiculously small example of Terasort. This allows us to track and visualize the I/O across all tasks, and zoom in on a task to analyze and categorize the detailed I/O.

The Hadoop example I’m using for analysis is Terasort with 1G/byte of input data, using two map slots and one reduce slot.

Map-Reduce Recap

As background to this analysis, lets recap the architecture of Hadoop Map-Reduce and the phases we expect the workload to go through. For our Terasort example, Hadoop Map-Reduce will implement several phases as shown in the following diagram:

  • The input file will be split into 16 chunks, each of 64MByte by default.
  • Each chunk will be processed by a Map Task. The Map tasks will sort the key-values within the 64MByte chunk of data assigned to it. These key-values are written to intermediate map-output files.
  • The shuffle phase will then sort the groups of key-values across the maps into intermediate files on the reducer, and then combine those into the output file.

 

Terasort in Action

Running Terasort in our small controlled study takes just under 10 minutes, with a 1 billion byte input. Nothing stellar, this configuration isn’t optimized for massive throughput, rather we’re hoping to introspect each step.

I have just two map slots configured, which means that at most, two tasks will run in parallel. Since each task processes 64MB (the default HDFS block size), then we should expect 16 tasks.

Parsing the logs of the job output, we can see the detail of what tasks ran. To make it simple to digest, I ran the job output log through a simple perl script to reformat into per-step stages:

$ log.pl job_201201261301_0005_1327649126255_rmc_TeraSort 
           Item    Time  Jobname           Taskname Phase Start-Time End-Time Elapsed
            Job    0.000 201201261301_0005
            Job          201201261301_0005
            Job    0.475 201201261301_0005 PREP
           Task    1.932 201201261301_0005 m_000017 SETUP
     MapAttempt    3.066 201201261301_0005 m_000017 SETUP
     MapAttempt   10.409 201201261301_0005 m_000017 SETUP SUCCESS 1.932 10.409 8.477 "setup"
           Task   10.966 201201261301_0005 m_000017 SETUP SUCCESS 1.932 10.966 9.034
            Job          201201261301_0005 RUNNING
           Task   10.970 201201261301_0005 m_000000 MAP
           Task   10.972 201201261301_0005 m_000001 MAP
     MapAttempt   10.981 201201261301_0005 m_000000 MAP
     MapAttempt   65.819 201201261301_0005 m_000000 MAP SUCCESS 10.970 65.819 54.849 ""
           Task   68.063 201201261301_0005 m_000000 MAP SUCCESS 10.970 68.063 57.093
     MapAttempt   10.998 201201261301_0005 m_000001 MAP
     MapAttempt   65.363 201201261301_0005 m_000001 MAP SUCCESS 10.972 65.363 54.391 ""
           Task   68.065 201201261301_0005 m_000001 MAP SUCCESS 10.972 68.065 57.093
           Task   68.066 201201261301_0005 m_000002 MAP
           Task   68.067 201201261301_0005 m_000003 MAP
           Task   68.068 201201261301_0005 r_000000 REDUCE
     MapAttempt   68.075 201201261301_0005 m_000002 MAP
     MapAttempt  139.789 201201261301_0005 m_000002 MAP SUCCESS 68.066 139.789 71.723 ""
           Task  140.193 201201261301_0005 m_000002 MAP SUCCESS 68.066 140.193 72.127
     MapAttempt   68.076 201201261301_0005 m_000003 MAP
     MapAttempt  139.927 201201261301_0005 m_000003 MAP SUCCESS 68.067 139.927 71.860 ""
           Task  140.198 201201261301_0005 m_000003 MAP SUCCESS 68.067 140.198 72.131
[snip…]
  ReduceAttempt   68.112 201201261301_0005 r_000000 REDUCE
  ReduceAttempt  795.299 201201261301_0005 r_000000 REDUCE SUCCESS 68.068 795.299 727.231 "reduce > reduce"
           Task  798.223 201201261301_0005 r_000000 REDUCE SUCCESS 68.068 798.223 730.155
           Task  798.226 201201261301_0005 m_000016 CLEANUP
     MapAttempt  798.241 201201261301_0005 m_000016 CLEANUP
     MapAttempt  806.113 201201261301_0005 m_000016 CLEANUP SUCCESS 798.226 806.113 7.887 "cleanup"
           Task  807.252 201201261301_0005 m_000016 CLEANUP SUCCESS 798.226 807.252 9.026
            Job  807.253 201201261301_0005 SUCCESS 0.000 807.253 807.253

From the log, we can see that we actually run nineteen tasks. There are 16 map tasks and one reduce tasks, plus two extra admin tasks (task 000017 and 000016), which are launched to prep for the job and to cleanup after the job.

 

The same information is better represented by a swim-lanes diagram. This allows us to see the time graph of the tasks as they are launched and when they complete. We can see that at most there are three tasks running, the two map tasks, and a reducer that is started after the completion of the first task.

We’re going to dig into what the individual map and reduce tasks do.

Enter vProbes

In 2008, vProbes first discussed in public — as a dynamic tracing framework that allows dynamic instrumentation of a running virtualization system without code modification. vProbes allows probe points  to be inserted into the virtualization kernel, the guest operating system kernel, and the guest application code.

For the purpose of this discussion, it’s a unique way of probing multiple guest operating systems at once. For this experiment, we’re looking at the system calls of the task trackers and datanodes in the mini Hadoop cluster. This lets us see what I/O is written by OS calls on each service, giving resolution of the I/O timing, size distributions and detail of the pathnames of each operation.

The vProbes framework is implemented in several VMware products, including VMware fusion – this data is from an experiment running vProbes on Fusion 4.1.1. It was enabled by setting vprobes.allow in the Fusion configuration file /Library/Application Support/VMware Fusion/config. I built the vprobes binary from the community source repository.

Once installed, we can probe the guest operating system.

Probing our Hadoop Nodes

Using probes around the system call entry and return functions, we can gather information about I/O, and reach into the arguments of each system call to get further detail about each operation.

GUEST:ENTER:system_call {
    string path;
    comm = curprocname();
    tid = curtid();
    pid = curpid();
    ppid = curppid();
    syscall_num = sysnum;
    if(syscall_num == NR_open) {
	path = guestloadstr(sys_arg0);
      syscall_name = "open";
      sprintf(syscall_args, ""%s", %x, %x", path, sys_arg1, sys_arg2);
    …
}
GUEST:OFFSET:ret_from_sys_call:0 {
    printf("%s/%d/%d/%d %s(%s) = %d n", comm, pid, rtid, ppid, syscall_name,
                                              syscall_args, getgpr(REG_RAX));
}

vProbes allows a powerful set of aggregations and reductions at the probe site, but in this case I’m brute forcing the system to give me a trace of all I/O, which we can analyze later. We yield the following output from each Hadoop node:

java/14774/15467/1 open("/host/hadoop/hdfs/data/current/subdir0/blk_1719908349220085071_1649.meta", 0, 1b6) = 144 <0>
java/14774/15467/1 stat("/host/hadoop/hdfs/data/current/subdir0/blk_1719908349220085071_1649.meta", 7f0b80a4e590) = 0 <0>
java/14774/15467/1 read(144, 7f0b80a4c470, 4096) = 167 <0>

Each line gives us the process name, pid, thread-id, system call name, arguments and return code for each system call of the node. Tracing with vProbes made this much simpler than using regular system call tracing, since we could gather all information from one place, and didn’t need to worry about trying to attach to child processes which are spawned by the multiple layers inside Hadoop that creates JVMs (Hadoop creates an on-the-fly script named taskjvm.sh to launch new task JVMs).

High Level Summary of Hadoop’s I/O

As mentioned, we’re running terasort on a billion bytes of input. For completeness, the command lines used for the study are:

$ hadoop jar hadoop-examples-0.20.204.0.jar teragen 10000000 teradata

<begin trace>

$ hadoop jar hadoop-examples-0.20.204.0.jar terasort teradata teraout

We are just studying the output of the second command – the terasort.

The Hadoop job output shows us a summary of the Job:

Job Counters
     Launched reduce tasks=1
     SLOTS_MILLIS_MAPS=1146887
     Launched map tasks=16
     Data-local map tasks=16
     SLOTS_MILLIS_REDUCES=766823
   File Input Format Counters
     Bytes Read=1000057358
   File Output Format Counters
     Bytes Written=1000000000
   FileSystemCounters
     FILE_BYTES_READ=2382257412
     HDFS_BYTES_READ=1000059070
     FILE_BYTES_WRITTEN=3402627838
     HDFS_BYTES_WRITTEN=1000000000
   Map-Reduce Framework
     Map output materialized bytes=1020000096
     Map input records=10000000
     Reduce shuffle bytes=1020000096
     Spilled Records=33355441
     Map output bytes=1000000000
     Map input bytes=1000000000
     Combine input records=0
     SPLIT_RAW_BYTES=1712
     Reduce input records=10000000
     Reduce input groups=10000000
     Combine output records=0
     Reduce output records=10000000
     Map output records=10000000

The key items for our I/O study are the byte counters. Recall that we’re sorting one billion bytes of input data, so we would expect at least double this amount of I/O – for the read + output. This is faithfully represented by the HDFS_BYTES counters, showing 1B bytes read and 1B bytes written. We are however also seeing another 5,784,885,250 bytes read and written to  files. So our 1B byte sort resulted in 7,784,885,250 bytes of actual I/O. The reason for this is the extra ephemeral I/O that Hadoop does during sorting, staging and copying data between phases. Keep in mind this not a tuned Hadoop instance, so arguably we may be spilling during sort more often than we should, but there is still a significant amount of temporary I/O going on in even a well tuned system.

Digging into the breakdown between persistent data (HDFS) and ephemeral I/O teaches us a lot about the overall Hadoop I/O model, and helps us understand the implications of the storage bandwidth required as we scale-out the workload across multiple hosts and racks in a distributed system.

Making Sense of the Ephemeral Data

Categorizing the ephemeral data helps us understand the nature of the I/O, and rationalize how it might change if scaled up, or if the task was run on a remote node. It also brings to light some future optimizations that may be possible, and possibly raises some questions about why some of this data isn’t stored in HDFS.

I circulated the I/O observations around several Hadoop committers and we came to the following initial grouping of I/O categories:

  • DFS Input data: Map input data that is read from the distributed file system (HDFS)
  • DFS Output data: The output written by the reducers as the output of the job
  • Sort Spills: Data that is written and read from disk as a result of overflow from a sort operation that didn’t fit in memory. Spills may occur on the map and reduce tasks.
  • Logs: Output of the jobs, stdout, etc.
  • Job staging: The environment for the job. Copies of shared libraries, jar files, config files etc.
  • Map Output: Output of the map into an intermediate file on the Map task, that is later read during the shuffle phase
  • Shuffled map output: The shuffled map output on the reducer, that is read from multiple map tasks. The Map output is held temporarily in the map output intermediate file on the map task, and made available by the task-tracker that hosted the job through a HTTP service with a private Jetty server.
  • Combined, reduced output: the output after the reducer combines the shuffled key-values.
Hadoop Distro 236
Hadoop Logs 132
Hadoop clienttmp unjar 1
Mappers files jobcache – spills 1753
Mappers files jobcache – output 1777
Reducer Intermediate 764
Reducers Shuffle and Intermediate 1744
Jobcache class files and shell scripts 1
Hadoop Datanode 1690
JVM – /usr/lib/jvm… 98
Total MB 7987

 

The Map Task

Each Map task reads it’s split of the input data from HDFS and implements it’s map function on the data. In the Terasort case, the map stage simply reads the input into key-values and sorts its split of the key-values. This results in reads across the network from HDFS, I/O to temporary spill files, and write of the output into the mapper’s intermediate output file (the one that the task-tracker will expose through HTTP to the reducer for the shuffle phase).

By aggregating and sorting the I/O’s by pathname, we can see that most of the I/O is to these well known destinations, and with a longer tail of I/O to the task staging, log, etc,… The path file.out is our map output, which we would expect to be 64Mbytes. The spillx.out is our sort spills.

By looking at the I/O attributes, we can learn a little about the I/O patterns for each component. In this case, some aggregation of I/O sizes reveals the logical I/O sizes before they are serviced by Linux ext4 fs.

Since this is a bandwidth-intensive app, we’d expect larger I/O sizes. Interestingly, we’re seeing most of the I/O performed in small I/O sizes. The data in the I/O size graph is by bucket, and per the traces, most of the I/O is happening at an odd size – 4090 bytes, mostly likely a multiple of the key-value size?

Since there is a large number of I/Os at a smaller size, it makes more sense to view the total bytes by I/O size, so that we can see how much I/O was generated in any one I/O bucket. This way, if a few very large I/Os are performed, we’ll be able to see if they represent any reasonable fraction of the total. However, we see that the majority of the bytes transferred are those with smaller sizes.

Since it wasn’t really evident that Hadoop’s I/O is sequential from this data, we went a step further and sliced the data another way. By looking at the offsets per file descriptor, you can aggregate adjacent I/Os, allowing a similar graph to be drawn of logical I/Os – those which are identified by any connected sequential I/O. This time the data looks quite different.

Through the Logical I/O graph, we can see what was hidden, that the actual I/Os are very sequential. In fact, the entire output is written sequentially, and much of the sort spill I/O is in I/O sizes between 128k and 4Mbyte.

The Reducer

The reducer starts it’s work by copying key-values from multiple map-nodes, in a phase known as shuffle. Data from map output is kept around and exported by the task-tracker, and read by the reducer over the network. It’s interesting that this data doesn’t just go into HDFS.

 

In this Terasort, there are no reducer spills. Data is read from the mappers into the mapx.out files, and then combined into the intermediate.1 file, sorted and then written back into HDFS.

The Data Node

The HDFS data-node is where the majority of the distributed I/O occurs. A client read from the distributed file system first consults the name-node to locate the relevant data-node for the block in question, then connects directly to the data-node to read that block.

The datanode stores that block in to key files – a Linux file containing up to 64Mbytes of the block, and a smaller file used to store the checksum of that data. For terasort, we see two clusters of I/O sizes, based on the I/O to/from the block and the read/write of the checksum.


Most of the I/O is performed in 64Kbyte chunks. This is because the I/O is initiated to/from the client using sendfile, with a buffer size of 64k. As a result, the data node is relying on the underlying file system to aggregate these requests into bigger I/Os.

When we perform the same logical I/O sequentially tests, we can see that the majority of the I/O is in fact sequential.


We haven’t done any experiments with Direct I/O yet, but it is something that has been asked. One would speculate that we’d not be fulfilling the true I/O bandwidth of the I/O subsystem if we used Direct I/O, since 64k is typically not large enough to get the best sequential bandwidth from a modern disk.

 Summary

Tying all of the data together, we begin to get a model view of a typical Map-Reduce job and it’s I/O requirements. Our job goes through these phases:

  • Reads it’s input data from HDFS (ideally from a local datanode, but only 12% of overall I/O)
  • Spills at high bandwidth during initial sort (guaranteed to be local, since it’s using local temporary storage on local disks)
  • Writes map output (also guaranteed to be local, on temporary disks)
  • The reducer reads that across the network (almost 100% probability it’s from a remote host)
  • The reducer spills (guaranteed to be local disk)
  • The reducer creates an intermediate file (also local)
  • The reducer writes back to HDFS (one copy should be local through the replica placement engine, two more remote copies)

 

 

Interestingly, we think of Hadoop as always needing to have it’s data local as litterally meaning it’s task must run where the block’s data node is, however the consequence of running the task on a remote datanode is much more diluted that we might think. Running remote in this example results in only 25% of the total storage being accessed across the network.

This leaves a few interesting questions open:

  • Is there really such a big impact for running jobs remote? Perhaps it’s only an issue with 1Gb networks, but with 10GB networks and fast top of rack switches, locality doesn’t matter any longer?
  • We’re making extra copies of data and using a different transfer framework for map output. Why doesn’t map data simply go into HDFS, removing the need for the additional web proxy and making it easier to optimize locating the reducer?
  • What are the actual bandwidth requirements of Hadoop on local storage and network, both at the host level and when we aggregate into rack and system? From this data, it’s possible to create some back of the envelope sizing models that allows us to rationalize how much actual bandwidth is required for local disks, host network, top-of-rack switch and between racks.

Let me know your thoughts. I’ll followup next with some implications for the storage and network, and a simple model to point towards answers to the above.

 

Other posts by

The Dawn of Virtual SAN

Today I’m proud to announce the availability of VMware Virtual SAN 5.5. This milestone represents a disruptive event for the storage industry and a major achievement for VMware’s strategy. We are bringing together the key third block in the trio of virtualized compute, network and storage. With the fusion of these three resources upon industry […]

Storage and Big Data in 2014

Last year, I made several predictions about storage and big data – including one about the mad dash for software-defined storage (SDS). Well that’s certainly turned out to be true, with literally dozens of new products hitting the market this year with various attributes of software-defined storage! VMware also made some big news in this space […]

We need your input: Big Data Survey

  Our Big Data team is running a survey to gather information on the key facets of your Big Data and Hadoop environments. Please grab some time to provide us your inputs. More details are available here…