Following our next step on our Apache Hadoop discovery trip, the next thing we are going to check is Apache Pig. Apache Pig is built on top of Hadoop and MapReduce and allows us to use a scripting language called Pig Latin to create MapReduce jobs without the need of writing MapReduce code.
The definition on the project page is:
Apache Pig is a platform for analysing large data sets that consists of a high-level language for expressing data analysis programs, coupled with infrastructure for evaluating these programs. […]
At the present time, Pig’s infrastructure layer consists of a compiler that produces sequences of Map-Reduce programs, […]. Pig’s language layer currently consists of a textual language called Pig Latin, which has the following key properties:
Apache Pig product description
- Ease of programming. […]
- Optimisation opportunities. […]
- Extensibility. […]
The purpose of this article is to execute a basic script on Apache Pig using the data used in the previous article. Concretely, in this case, we are going to need the files: u.data and u.item.
As mentioned before, the first file has the next format:
user id | item id | rating | timestamp
While the second file has the next format (only fields to be used are shown):
movie id | movie title | release date | video release date | ...
The first thing we need to do is upload the data files to the Hadoop file system. Consider the first file is already there if you followed the previous article.
$ hadoop fs -mkdir /ml-100k
$ hadoop fs -copyFromLocal u.data /ml-100k/u.data
$ hadoop fs -copyFromLocal u.item /ml-100k/u.item
$ hadoop fs -ls /ml-100k
The next thing we need to do is to write our first Pig Latin script. While this article does not present to teach you how to write scripts but how to use and run them, let’s see a brief introduction to the scripting language commands. If you are familiar with any programming language or SQL, you will not find any difficulty understanding them. For more information, we can use the official documentation.
- LOAD, STORE, DUMP - FILTER, DISTINCT, FOREACH/GENERATE, MAPREDUCE, STREAM, SAMPLE - JOIN, COGROUP, GROUP, CROSS, CUBE - ORDER, RANK, LIMIT - UNION, SPLIT
We will understand them a bit more with an example.
Now, let’s built something. Based on the data we have, we can try to respond to the question “What is the most-voted one-star movie?”. To answer this we can build the next script:
// We load from a file the data
// It is worth mentioning that a relative path will work too
// For example, LOAD '/ml-100k/u.data'
ratings = LOAD 'hdfs://localhost:9000/ml-100k/u.data'
AS (userID:int, movieID:int, rating:int, ratingTime:int);
// We load from a file the data
metadata = LOAD 'hdfs://localhost:9000/ml-100k/u.item' USING PigStorage('|')
AS (movieID:int, movieTitle:chararray, releaseDate:chararray, videoRealese:chararray, imdblink:chararray);
// For each loaded row we store the id and title association
nameLookup = FOREACH metadata GENERATE movieID, movieTitle;
// We group the loaded ratings by movie
groupedRatings = GROUP ratings BY movieID;
// We calculate the rating average and ratings per movie
averageRatings = FOREACH groupedRatings GENERATE group AS movieID,
AVG(ratings.rating) AS avgRating, COUNT(ratings.rating) AS numRatings;
// We are just interested in the worst movies
badMovies = FILTER averageRatings BY avgRating < 2.0;
// Once we have the worst movies, we resolved their names previously captured
namedBadMovies = JOIN badMovies BY movieID, nameLookup BY movieID;
// We generate the response rows
finalResults = FOREACH namedBadMovies GENERATE nameLookup::movieTitle AS movieName,
badMovies::avgRating AS avgRating, badMovies::numRatings AS numRatings;
// We sort the response rows
finalResultsSorted = ORDER finalResults by numRatings DESC;
// We print the response
DUMP finalResultsSorted;
The only thing remaining is to check our results and if our script does its job properly. For this, we just need to execute it and see what happens.
Hadoop history server
Before we can execute our script, we need to start the Hadoop history server that offers REST APIs allowing users to get the status of finished applications.
Why does not run when the ‘start-all.sh‘ command is run? I do not know why is not the default option, I assume it is because it is not necessary to work with HDFS or MapReduce. We have a few options here:
- To run the deprecated command: mr-jobhistory-daemon.sh start|stop historyserver
- To run the new command: mapred –daemon start|stop historyserver
- Add it to the start-all.sh and stop-all.sh scripts:
# add to the start-all.sh script
# start jobhistory daemons if jobhistory is present
if [ -f "${hadoop}"/sbin/mr-jobhistory-daemon.sh ]; then
"${hadoop}"/sbin/mr-jobhistory-daemon.sh start historyserver
fi
# add to the stop-all.sh script
# stop jobhistory daemons if jobhistory is present
if [ -f "${hadoop}"/sbin/mr-jobhistory-daemon.sh ]; then
"${hadoop}"/sbin/mr-jobhistory-daemon.sh stop historyserver
fi
If we forget to run the history server, there will be a pretty clear error on the logs when executing the script. We will see multiple attempts of connection to localhost:10020.
Apache Pig has different execution modes. We are going to see the Local Mode execution and the MapReduce Mode execution.
Local Mode
This execution allows us to execute the script using our local files. As we have written the script, we can just go and run it, but it would not be technically a local execution because we are using the files stored in our HDFS. To run with all local files, we can use a relative path to the files in our local hard drive. I find it easier to just upload all my test data to the HDFS and not worry about where my data lives when executing locally or remotely. But, it is good to know we have the option to do a quick test with small files or files covering specific test cases. For my execution, I am going to use the script s it is but feel free to find your more comfortable way of working. To run the script we execute:
pig -x local most-rated-one-star-movie.pig

MapReduce Mode
This execution will run with the files stored on HDFS. We already have the data there but we need to upload the script.
$ hadoop fs -mkdir /pig-scripts
$ hadoop fs -copyFromLocal most-rated-one-star-movie.pig /pig-scripts/most-rated-one-star-movie.pig
After that, we just need to execute our script:
pig -x mapreduce hdfs://localhost:9000/pig-scripts/most-rated-one-star-movie.pig

The results should be the same.
Whit this, we can finish the introduction to Apache Pig.