Enterprise IT Consultant Views on Technologies and Trends

Dec 1 2010   12:29AM GMT

Hadoop Database for Big Data – Part II

Sasirekha R Profile: Sasirekha R

Hadoop Database for Big Data – Part II


HBase uses a data model similar to that of Google’s Bigtable. Data is logically organized into tables, rows and columns. A data row has a sortable row key and an arbitrary number of columns. The tables are stored sparsely so that the rows in the same table can have widely varying number of columns. Any particular column may have multiple versions for the same row key.

A table enforces its set of column families and a column name has the form “<family>:<label>”. The items in a given column family is except to contain similar data and roughly the same read/write characteristics. HBase stores column families physically close on disk.

Conceptually a table may be thought of a collection of rows that are located by a row key (and optional timestamp) and where any column may not have a value for a particular row key (sparse). Physically they are stored on a per-column family basis. This is an important consideration for schema and application designers. An example that clearly shows the difference between the conceptual and physical views is available at http://wiki.apache.org/hadoop/Hbase/HbaseArchitecture.

Physically, tables are broken up into row ranges called regions (equivalent to Bigtable tablet). Each row range contains rows from start-key (inclusive) to end-key (exclusive). HBase identifies a row range by the table name and start-key (Bigtable uses the endkey). A set of regions, sorted appropriately, forms an entire table.

An iterator-like interface is available for scanning through a row range and the ability to retrieve a column value for a specific row key are provided.

By default, a single row at a time may be locked. Row writes are always atomic, but locking a row allows performing both read and write operations on that row atomically. Multi-row locking is allowed in the recent versions.

HBase includes:

  • Convenient base classes for backing Hadoop MapReduce jobs with HBase tables
  • Query predicate push down via server side scan and get filters
  • Optimizations for real time queries
  • A high performance Thrift gateway
  • A REST-ful Web service gateway that supports XML, Protobuf, and binary data encoding options
  • Cascading, hive, and pig source and sink modules
  • Extensible jruby-based (JIRB) shell
  • Support for exporting metrics via the Hadoop metrics subsystem to files or Ganglia; or via JMX

Hadoop Common

Hadoop Common includes FileSystem, RPC, and serialization libraries. All the Hadoop commands are invoked by the bin/Hadoop script.

Usage: hadoop [–config confdir] [COMMAND] [GENERIC_OPTIONS] [COMMAND_OPTIONS]

Hadoop has an option parsing framework that employs parsing generic options as well as running classes.

 Hadoop cluster can be started in one of the three supported modes:

  • Standalone mode – runs in a non-distributed mode, as a single Java process. Useful for debugging.
  • Pseudo-distributed mode – Runs on a single-node where each Hadoop daemon runs in a separate Java process
  • Fully-distributed mode – In a distributed setup, there is a master node and number of slave nodes

The master node is the node that is used to manage the cluster (using Hadoop utility and browser) and contains those daemons that provide management and coordination of the Hadoop cluster. Whereas the slave node(s) contain the deamons that implement the storage functions for the Hadoop file system (HDFS) and MapReduce functionality.

 Hadoop Configuration involves two types of configuration files:

1. Read-only default configuration – src/core/core-default.xml, src/hdfs/hdfs-default.xml, src/mapred/mapred-default.xml and conf/mapred-queues.xml.template.

2. Site-specific configuration – conf/core-site.xml, conf/hdfs-site.xml, conf/mapred-site.xml and conf/mapred-queues.xml.

For configuring Hadoop for distributed nodes refer: http://www.ibm.com/developerworks/linux/library/l-hadoop-2/index.html


Coordination services are notoriously hard to get right and are prone to errors such as race conditions and deadlock. The motivation behind ZooKeeper is to relieve distributed applications the responsibility of implementing coordination services from scratch.

ZooKeeper provides a set of guarantees:

  • Sequential Consistency – Updates from a client will be applied in the order that they were sent.
  • Atomicity – Updates either succeed or fail. No partial results.
  • Single System Image – A client will see the same view of the service regardless of the server that it connects to.
  • Reliability – Once an update has been applied, it will persist from that time forward until a client overwrites the update.
  • Timeliness – The client’s view of the system is guaranteed to be up-to-date within a certain time bound.

ZooKeeper exposes a simple set of primitives that distributed applications can build upon to implement higher level services for synchronization, configuration maintenance, and groups and naming. It uses a data model styled after the familiar directory tree structure of file systems. It runs in Java and has bindings for both Java and C.


Large distributed systems like Hadoop are very complex and can fail in complicated and subtle ways and extensively instrumented to handle this (A two-thousand node cluster configured for normal operation generates nearly half a terabyte of monitoring data per day, mostly application level log files). This data is invaluable for debugging, performance measurement and monitoring. Large deployments typically use local storage for logging data (deleted periodically) and to users must first aggregate this to centralized location for analysis.

Chukwa was developed to be the system that would process large volumes of data in minutes (and not seconds) with the intent of detecting subtle conditions and aiding failure diagnosis (and not meant for real-time failure detection). Chukwa used HDFS as its data store and MapReduce jobs to process the data. Thus it inherits Hadoop’s scalability and robustness. Understanding Chukwa gives an idea of the various uses to which Hadoop can be put to.

Chukwa can be used by four different sets of people with different functional (overlapping) requirements:

1. Hadoop users – Would want to know how far along their jobs are and what resources are available for future jobs. They need access to the logs and output from their jobs.

2. Cluster Operators – Want to be notified of hardware failures and performance anomalies. Also need to be warned about resource shortages such as storage exhaustion.

3. Cluster Managers – Need guidance in provisioning and in apportioning costs. They wants tools for analyzing past usage by users and groups and for projecting future demands. They also need SLA related information like average job waiting time.

4. Hadoop developers – Need information about the performance in operation, bottlenecks within Hadoop, common failure patterns etc.

Chukwa’s technical architecture involves:

1. Adaptors – The dynamically controllable data sources, called adaptors as they generally wrap some other data source (file or Unix tool). Chukwa includes adaptors to collect Hadoop logs, application metrics and system telemetry. More adaptors are getting added.

2. Storage – Uses HDFS as its data store

3. Collectors and Agents – Data is sent across the network to the collector process. Each collector receives data from several hundred hosts and writes all this to a single sink file. These sink files are closed periodically and made available for processing. Collectors, placed between adapters and HDFS, reduces the number of HDFS files generator to a handful per cluster and also hides the detail of the HDFS file system in use. Agents are long-running processes on each machine providing various services to adaptors (making adapter writing easier). The agent is responsible for starting and stopping adaptors, for forwarding chunks to the collectors, and for making regular checkpoints of adapter state and restarting adaptors at the appropriate position after a crash.

4. Demux and Archiving – A pair of MapReduce jobs runs every few minutes, taking all the available sink files as input. The first job archives all the collected data, without processing or interpreting it. The second job parses out structured data from some of the logs and loads them into a data store. Chukwa splits this target data stores depending on the content – as one file per cluster, per data type, and time period. These small data files in HDFS with structured format can be easily and quickly processed to get details like common failure modes, correlating events in the logs with slowdowns, discovering flakey machines etc.

To ease the analysis of collected data, a flexible, configurable, “portal-style” web interface termed Hadoop Infrastructure Care Center (HICC) is provided.


GNU/Linux is supported as a development and production platform whereas Win32 is supported as a development platform only. Required software includes:

1. Java 1.6x, preferably from Sun, must be installed

2. Ssh must be installed and sshd must be running to use the Hadoop scripts that manage remote Hadoop daemons.

3. For Windows, additionally Cygwin required for shell support.

 Comment on this Post

There was an error processing your information. Please try again later.
Thanks. We'll let you know when a new response is added.
Send me notifications when other members comment.

Forgot Password

No problem! Submit your e-mail address below. We'll send you an e-mail containing your password.

Your password has been sent to:

Share this item with your network: