Hadoop: Apache Pig

Following our next step on our Apache Hadoop discovery trip, the next thing we are going to check is Apache Pig. Apache Pig is built on top of Hadoop and MapReduce and allows us to use a scripting language called Pig Latin to create MapReduce jobs without the need of writing MapReduce code.

The definition on the project page is:

Apache Pig is a platform for analysing large data sets that consists of a high-level language for expressing data analysis programs, coupled with infrastructure for evaluating these programs. […] 

At the present time, Pig’s infrastructure layer consists of a compiler that produces sequences of Map-Reduce programs, […]. Pig’s language layer currently consists of a textual language called Pig Latin, which has the following key properties: 

  • Ease of programming. […]
  • Optimisation opportunities. […]
  • Extensibility. […]
Apache Pig product description

The purpose of this article is to execute a basic script on Apache Pig using the data used in the previous article. Concretely, in this case, we are going to need the files: u.data and u.item.

As mentioned before, the first file has the next format:

user id | item id | rating | timestamp

While the second file has the next format (only fields to be used are shown):

movie id | movie title | release date | video release date | ...

The first thing we need to do is upload the data files to the Hadoop file system. Consider the first file is already there if you followed the previous article.

$ hadoop fs -mkdir /ml-100k
$ hadoop fs -copyFromLocal u.data /ml-100k/u.data
$ hadoop fs -copyFromLocal u.item /ml-100k/u.item
$ hadoop fs -ls /ml-100k

The next thing we need to do is to write our first Pig Latin script. While this article does not present to teach you how to write scripts but how to use and run them, let’s see a brief introduction to the scripting language commands. If you are familiar with any programming language or SQL, you will not find any difficulty understanding them. For more information, we can use the official documentation.

- LOAD, STORE, DUMP
- FILTER, DISTINCT, FOREACH/GENERATE, MAPREDUCE, STREAM, SAMPLE
- JOIN, COGROUP, GROUP, CROSS, CUBE
- ORDER, RANK, LIMIT
- UNION, SPLIT

We will understand them a bit more with an example.

Now, let’s built something. Based on the data we have, we can try to respond to the question “What is the most-voted one-star movie?”. To answer this we can build the next script:

// We load from a file the data
// It is worth mentioning that a relative path will work too
// For example, LOAD '/ml-100k/u.data'
ratings = LOAD 'hdfs://localhost:9000/ml-100k/u.data'
    AS (userID:int, movieID:int, rating:int, ratingTime:int);

// We load from a file the data
metadata = LOAD 'hdfs://localhost:9000/ml-100k/u.item' USING PigStorage('|')
	AS (movieID:int, movieTitle:chararray, releaseDate:chararray, videoRealese:chararray, imdblink:chararray);

// For each loaded row we store the id and title association
nameLookup = FOREACH metadata GENERATE movieID, movieTitle;

// We group the loaded ratings by movie
groupedRatings = GROUP ratings BY movieID;

// We calculate the rating average and ratings per movie
averageRatings = FOREACH groupedRatings GENERATE group AS movieID,
    AVG(ratings.rating) AS avgRating, COUNT(ratings.rating) AS numRatings;

// We are just interested in the worst movies
badMovies = FILTER averageRatings BY avgRating < 2.0;

// Once we have the worst movies, we resolved their names previously captured
namedBadMovies = JOIN badMovies BY movieID, nameLookup BY movieID;

// We generate the response rows
finalResults = FOREACH namedBadMovies GENERATE nameLookup::movieTitle AS movieName,
    badMovies::avgRating AS avgRating, badMovies::numRatings AS numRatings;

// We sort the response rows
finalResultsSorted = ORDER finalResults by numRatings DESC;

// We print the response
DUMP finalResultsSorted;

The only thing remaining is to check our results and if our script does its job properly. For this, we just need to execute it and see what happens.

Hadoop history server

Before we can execute our script, we need to start the Hadoop history server that offers REST APIs allowing users to get the status of finished applications.

Why does not run when the ‘start-all.sh‘ command is run? I do not know why is not the default option, I assume it is because it is not necessary to work with HDFS or MapReduce. We have a few options here:

  1. To run the deprecated command: mr-jobhistory-daemon.sh start|stop historyserver
  2. To run the new command: mapred –daemon start|stop historyserver
  3. Add it to the start-all.sh and stop-all.sh scripts:
# add to the start-all.sh script
# start jobhistory daemons if jobhistory is present
if [ -f "${hadoop}"/sbin/mr-jobhistory-daemon.sh ]; then
  "${hadoop}"/sbin/mr-jobhistory-daemon.sh start historyserver
fi

# add to the stop-all.sh script
# stop jobhistory daemons if jobhistory is present
if [ -f "${hadoop}"/sbin/mr-jobhistory-daemon.sh ]; then
  "${hadoop}"/sbin/mr-jobhistory-daemon.sh stop historyserver
fi

If we forget to run the history server, there will be a pretty clear error on the logs when executing the script. We will see multiple attempts of connection to localhost:10020.

Apache Pig has different execution modes. We are going to see the Local Mode execution and the MapReduce Mode execution.

Local Mode

This execution allows us to execute the script using our local files. As we have written the script, we can just go and run it, but it would not be technically a local execution because we are using the files stored in our HDFS. To run with all local files, we can use a relative path to the files in our local hard drive. I find it easier to just upload all my test data to the HDFS and not worry about where my data lives when executing locally or remotely. But, it is good to know we have the option to do a quick test with small files or files covering specific test cases. For my execution, I am going to use the script s it is but feel free to find your more comfortable way of working. To run the script we execute:

pig -x local most-rated-one-star-movie.pig
We can see the execution has been completed successfully and the results are dumped into the console.

MapReduce Mode

This execution will run with the files stored on HDFS. We already have the data there but we need to upload the script.

$ hadoop fs -mkdir /pig-scripts
$ hadoop fs -copyFromLocal most-rated-one-star-movie.pig /pig-scripts/most-rated-one-star-movie.pig

After that, we just need to execute our script:

pig -x mapreduce hdfs://localhost:9000/pig-scripts/most-rated-one-star-movie.pig

The results should be the same.

Whit this, we can finish the introduction to Apache Pig.

Hadoop: Apache Pig

Hadoop: First MapReduce example

In the previous article, we installed Hadoop, now it is time to play with it. In this article, we are going to take some example data, store it in Hadoop FS, and see how we can run a simple MapReduce task.

The data we are going to use is called MovieLens. It is a list of one hundred thousand movie ratings from one thousand users on one thousand seven hundred movies. The dataset is quite old, but the advantage is that it has been tested multiple times and it is very stable which will prevent any differences when following this article.

The dataset can be found here. We just need to download the zip file and extract its content, and we are interested in the file called u.data.

Using the file system

Let’s check how our file system looks like. We can do this using the UI or from the console. In this article, I am going to focus on working with the console, but I will try to point to the UI when something is interesting or that can be easily checked there in addition to the console command.

As I have said, let’s check the file system. To use the UI, we can open the Resource Manager Web UI, probably http://localhost:9870 and go to “Menu -> Utilities -> Browse the file system“. We should see something like the following screenshot because our file system is empty:

To achieve the same on the console, we can execute the next command:

hadoop fs -ls /

It is important to notice that we need to specify the path. If it is not specified it will throw an error. For more information about the error, we can read this link.

After executing the command, we will see that nothing is listed, but we know it is working. Let’s know upload our data. First, we will create a folder, and after that, we will upload the data file:

$ hadoop fs -mkdir /ml-100k
$ hadoop fs -copyFromLocal u.data /ml-100k/u.data
$ hadoop fs -ls /
$ hadoop fs -ls /ml-100k

Easy and simple. As we can see it works as we would expect from any file system. We can remove the file and folder we have just added:

$ hadoop fs -rm /ml-100k/u.data
$ hadoop fs -rmdir /ml-100k

Using MapReduce

The file we are using has the format:

user id | item id | rating | timestamp. 

To process it, we are going to be using Python and a library called MRJob. More information can be found here.

We need to install it to be able to use it:

pip3 install mrjob

And, we need to build a very basic MapReduce job, for example, one that lists the count of given ratings on the data.

The code should look like this:

from mrjob.job import MRJob
from mrjob.step import MRStep

class RatingsBreakdown(MRJob):
 
  def mapper(self, _, line):
    (userID, movieID, rating, timestam) = line.split('\t')
    yield rating, 1

  def reducer(self, key, values):
    yield key, sum(values)

if __name__ == '__main__':
  RatingsBreakdown.run()

The code is pretty simple and easy to understand. A job is defined by a class that inherits from MRJob. This class contains methods that define the steps of your job. A “step” consists of a mapper, a combiner, and a reducer, but all of those are optional, though we must have at least one. The mapper() method takes a key and a value as args (in this case, the key is ignored, and a single line of text input is the value) and yields as many key-value pairs as it likes. The reduce() method takes a key and an iterator of values and also yields as many key-value pairs as it likes. (In this case, it sums the values for each key). The final required component of a job file is to invoke the run() method.

We can run it in different ways which are very useful depending on the type of goal we have. For example, if we have just a small file with a small sample of the data we are just using to test the code, we can run the Python script with a local file. If we want to run a similar sample data but see how it behaves in Hadoop, we can upload a local file on-demand while executing the script. Finally, if we want to execute the whole shebang, we can run the script against the big bulky file we have on the file system. For the different options, we have the following commands. Considering the whole file, in the case, is small we can run the three types of executions using the same file.

$ python3 ratings-breakdown.py u.data
$ python3 ratings-breakdown.py -r hadoop u.data
$ python3 ratings-breakdown.py -r hadoop hdfs:///ml-100k/u.data

The result, no matter what execution we have decided to use, would be:

"1"	6110
"2"	11370
"3"	27145
"4"	34174
"5"	21201

As a curiosity, if we now take a look at the ResourceManager Web UI probably http://localhost:8088, we will see the jobs executed for the last two commands:

This is all for today. We have stored something in our Hadoop file system and we have executed our first MapReduce job.

Hadoop: First MapReduce example

Hadoop: Installation on macOS

In previous articles, we have seen a quick introduction to Big Data and Hadoop and its ecosystem. Now it is time to install Hadoop in our local machines to be able to start playing with Hadoop and with some of the tools in its ecosystem.

In my case, I am going to install it on a macOS system, if any of you is running a GNU/Linux system, you can ignore the initial step (the homebrew installation) and take a look at the configurations because they should be similar.

Installing Hadoop

As I have said before, we are going to use homebrew to install Hadoop, concretely version 3.3.3 is going to be installed. Because we are using the package manager the installation is going to be pretty simple but, if you prefer to do it manually, you just need to go to the Hadoop download page, and download and extract the desired version. The configuration steps after that should be the same.

To install Hadoop using homebrew we just need to execute:

brew install hadoop

A very important consideration is that Hadoop does not work with the latest version of Java neither 17 nor 18. It is necessary to have the Java 11 version installed. We can still use whatever version we want on our system, but to execute the Hadoop commands we should set the correct Java version in our terminal. As a side note, as far as I know, it should work with Java 8 too but I have not tested it.

Allowing SSH connections to localhost

One previous consideration we need to have is that for Hadoop to start all nodes properly it needs to be able to connect using SSH to our machine, in this case, localhost. By default, in macOS this setting is disabled, to be able to enable it, we just need to go to “System Preferences -> Shared -> Remote Session“.

Setting environment variables

Reading about this I have only found confusion, some tutorials do not even mention the addition of the environment variables, others only declare the HADOOP_HOME variable, and others declare a bunch of stuff. After a few tests, I have collected a list of environment variables we can (we should) define. They allow us to start Hadoop without problems. Is it possible my list has more than required? Yes, absolutely. But, for now, I think the list is comprehensive and short enough, my plan as I have said before is to play with Hadoop and tools around it, not to become a Hadoop administrator (yet). There are some of them, I have just as a reminder that they exist, for example, the two related to the native libraries: HADOOP_COMMON_LIB_NATIVE_DIR and HADOOP_OPTS, they can be skipped if we want, there will be a warning when running Hadoop but nothing important (see comment below about native libraries). We can add the variables to our “~/.zshrc” file.

The list contains the next variables:

# HADOOP env variables                                                                                                  export HADOOP_HOME="/usr/local/Cellar/hadoop/3.3.3/libexec"                                                              
export PATH=$PATH:$HADOOP_HOME/bin                                                                                       
export PATH=$PATH:$HADOOP_HOME/sbin                                                                                      
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop                                                                           
export HADOOP_MAPRED_HOME=$HADOOP_HOME                                                                                   
export HADOOP_COMMON_HOME=$HADOOP_HOME                                                                                   
export HADOOP_HDFS_HOME=$HADOOP_HOME                                                                                     
export YARN_HOME=$HADOOP_HOME                                                                                            
export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native                                                              
export HADOOP_OPTS="-Djava.library.path=$HADOOP_HOME/lib"                                                                
export HADOOP_CLASSPATH=${JAVA_HOME}/lib/tools.jar

Configuring Hadoop:

File $HADOOP_HOME/etc/hadoop/hadoop-env.sh

In this file, we need to find the JAVA_HOME path and fill it with the appropriate value, but if we carefully read the file, one of the comments says “variable is REQUIRED on ALL platforms except OS X!“. Despite the comment, we are going to set it. If we do not set it, we will have problems running the NodeManager. It will not start and it will show in the logs an error related to the Java version in use.

File $HADOOP_HOME/etc/hadoop/core-site.xml

Here, we need to configure the HDFS port and a folder for temporary directories. This folder can be wherever you want, in my case, it will reside in a sub-folder of my $HOME directory.

<configuration>                                                                                                          
  <property>                                                                                                             
    <name>fs.defaultFS</name>                                                                                            
    <value>hdfs://localhost:9000</value>                                                                                 
  </property>                                                                                                            
  <property>                                                                                                             
    <name>hadoop.tmp.dir</name>                                                                                          
    <value>/Users/user/.hadoop/hdfs/tmp</value>                                                                      
    <description>A base for other temporary directories</description>                                                    
  </property>                                                                                                            
</configuration> 

File $HADOOP_HOME/etc/hadoop/mapred-site.xml

We need to add a couple of extra properties:

<configuration>                                                                                                          
  <property>                                                                                                             
    <name>mapreduce.framework.name</name>                                                                                
    <value>yarn</value>                                                                                                  
  </property>                                                                                                            
  <property>                                                                                                             
    <name>mapreduce.application.classpath</name>                                                                         
    <value>$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*:$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*</value>         
  </property>                                                                                                            
</configuration>

File $HADOOP_HOME/etc/hadoop/yarn-site.xml

In a similar way, we need to add a couple of extra properties:

<configuration>                                                                                                          
  <property>                                                                                                             
    <name>yarn.nodemanager.aux-services</name>                                                                           
    <value>mapreduce_shuffle</value>                                                                                     
  </property>                                                                                                            
  <property>                                                                                                             
    <name>yarn.nodemanager.env-whitelist</name>
    <value>JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_MAPRED_HOME</value>
  </property>                                                                                                            
</configuration>

File $HADOOP_HOME/etc/hadoop/hdfs-site.xml

And, again, one more property:

<configuration>                                                                                                          
  <property>                                                                                                             
    <name>dfs.replication</name>                                                                                         
    <value>1</value>                                                                                                     
  </property>                                                                                                            
</configuration>

Formatting the HDFS Filesystem

Before we can start Hadoop to be able to use HDFS, we need to format the file system. We can do this by running the following command:

hdfs namenode -format

Starting Hadoop

Now, finally, we can start Hadoop. If we have added $HADOOP_HOME/bin and $HADOOP_HOME/sbin properly to our $PATH, we should be able to execute the command start-all.sh. If the command is not available, have you restarted the terminal after editing your .zshrc file or executed the command source .zshrc?

If everything works, we should be able to see something like:

And, to verify that everything is running accordingly, we can execute the jps command:

Some useful URLs are:

NameNode Web UI
ResourceManager Web UI
NodeManager Web UI

What are the native libraries?

If someone has paid close attention to the screenshot representing the starting of Hadoop, I am sure they have observed a warning line like:

WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

Despite us setting the variables HADOOP_COMMON_LIB_NATIVE_DIR and HADOOP_OPTS on the environment, the truth is the lib folder is not present in our installation.

Hadoop has native implementations of certain components for performance reasons and the non-availability of Java implementations. These components are available in a single, dynamically-linked native library called the native Hadoop library. On the *nix platforms the library is named libhadoop.so. On mac systems it is named libhadoop.a. Hadoop runs fine while using the java classes. But you will gain speed with native. More importantly, some compression codecs are only supported in native, if any other application is dependent on Hadoop or your jars depend on these codecs then jobs will fail unless native libraries are available.

The installation of the native Java libraries is beyond this article.

The last thing is to stop Hadoop, to do this we just need to run the command stop-all.sh:

Hadoop: Installation on macOS

Hadoop and its ecosystem

In the previous article, we had a brief introduction to Big Data. To be able to take advantage of the emerging opportunities for organisations that it creates, some frameworks or tools have appeared in the last few years. One of these frameworks is Hadoop and all the ecosystem of tools created around it.

The Apache Hadoop software library is a framework that allows for the distributed processing of large data sets across clusters of computers using simple programming models. It is designed to scale up from single servers to thousands of machines, each offering local computation and storage. Rather than rely on hardware to deliver high availability, the library itself is designed to detect and handle failures at the application layer, so delivering a highly-available service on top of a cluster of computers, each of which may be prone to failures.

Apache Hadoop website

The project includes these modules:

  • Hadoop Common: The common utilities that support the other Hadoop modules.
  • Hadoop Distributed File System (HDFS™): A distributed file system that provides high-throughput access to application data.
  • Hadoop YARN: A framework for job scheduling and cluster resource management.
  • Hadoop MapReduce: A YARN-based system for parallel processing of large data sets.

Hadoop Distributed File System (HDFS™)

HDFS creates an abstraction that allows users to see HDFS logically as a single storage unit, but the data is stored across multiple nodes in a distributed fashion. HDFS follows master-slave architecture where the master is the NameNode and the slaves – there can be multiple slaves – are known as DataNodes.

NameNodes have the next characteristics or functions:

  • It is the master daemon that maintains and manages the DataNodes.
  • It records the metadata of all the blocks stored in the cluster, e.g. location of blocks stored, size of the files, permissions, hierarchy, etc.
  • It records every change that takes place to the file system metadata.
  • If a file is deleted in HDFS, the NameNode will immediately record this in the EditLog.
  • It regularly receives a heartbeat and a block report from all the DataNodes in the cluster to ensure that the DataNodes are alive.
  • It keeps a record of all the blocks in the HDFS and DataNode in which they are stored.
  • It has high availability and federation features.

DataNodes have the next characteristics or functions:

  • It is the slave daemon which runs on each slave machine.
  • The actual data is stored on DataNodes.
  • It is responsible for serving read and write requests from the clients.
  • It is also responsible for creating blocks, deleting blocks and replicating the same based on the decisions taken by the NameNode.
  • It sends heartbeats to the NameNode periodically to report the overall health of HDFS, by default, this frequency is set to 3 seconds.

Some of the problems that it fixes are:

  • Storage space problems: HDFS provides a distributed way of storing Big Data. The data is stored in blocks across the available DataNodes. In addition, it will also replicate the data blocks on different DataNodes. Taking advantage of this architecture allows for horizontal scaling allowing the addition of some extra data nodes to the HDFS cluster when required increasing the available storage space.
  • Variety of data: HDFS can store all kinds of data whether it is structured, semi-structured or unstructured.
  • Access and process velocity: To increase the access and process velocity HDFS moves processing to data and not data to processing. Instead of moving data to the master nodes and then processing the data, the processing logic is sent to the various data nodes where the data is processed in parallel across different nodes. Once they are done, the processed results are sent to the master node where the results are merged and the response is sent back to the client.

Hadoop YARN

YARN performs all the processing activities by allocating resources and scheduling tasks. YARN has a ResourceManager and one or more NodeManagers.

The ResourceManager has the next characteristics or functions:

  • It is a cluster-level (one for each cluster) component and runs on the master machine.
  • It manages resources and schedules applications running on top of YARN.
  • It has two components: Scheduler & ApplicationManager.
  • The Scheduler is responsible for allocating resources to the various running applications.
  • The ApplicationManager is responsible for accepting job submissions and negotiating the first container for executing the application.
  • It keeps a track of the heartbeats from the Node Manager.

The NodeManagers have the next characteristics or functions:

  • It is a node-level component (one on each node) and runs on each slave machine.
  • It is responsible for managing containers and monitoring resource utilisation in each container.
  • It also keeps track of node health and log management.
  • It continuously communicates with ResourceManager to remain up-to-date.

Despite all the benefits Hadoop offers, it is not a silver bullet and its use needs to be carefully considered. Some of the use cases where it is not recommended are:

  • Low Latency data access: Queries that have a maximum and short response time.
  • Constant data modifications: Hadoop is a better fit when the primarily concerned is reading data.
  • Lots of small files: While Hadoop can store multiple small datasets, it is much more suitable for scenarios where there are few but large files.

Hadoop MapReduce

Hadoop MapReduce is the core Hadoop ecosystem component which provides data processing. MapReduce is a software framework for easily writing applications that process the vast amount of structured and unstructured data stored in the Hadoop Distributed File system.
MapReduce programs are parallel in nature, and thus are very useful for performing large-scale data analysis using multiple machines in the cluster. Thus, it improves the speed and reliability of cluster this parallel processing.

Hadoop ecosystem

Hadoop was designed as both a computing (MapReduce) and storage (HDFS) platform from the very beginning. With the increasing need for big data analysis, Hadoop attracts lots of other software to resolve big data questions and merges into a Hadoop-centric big data ecosystem. The following diagram gives a brief overview of the Hadoop big data ecosystem:

O’Reilly – Overview of the Hadoop ecosystem

We will go into more detail on some of the elements in future articles.

Hadoop and its ecosystem