Hadoop: First MapReduce example

In the previous article, we installed Hadoop, now it is time to play with it. In this article, we are going to take some example data, store it in Hadoop FS, and see how we can run a simple MapReduce task.

The data we are going to use is called MovieLens. It is a list of one hundred thousand movie ratings from one thousand users on one thousand seven hundred movies. The dataset is quite old, but the advantage is that it has been tested multiple times and it is very stable which will prevent any differences when following this article.

The dataset can be found here. We just need to download the zip file and extract its content, and we are interested in the file called u.data.

Using the file system

Let’s check how our file system looks like. We can do this using the UI or from the console. In this article, I am going to focus on working with the console, but I will try to point to the UI when something is interesting or that can be easily checked there in addition to the console command.

As I have said, let’s check the file system. To use the UI, we can open the Resource Manager Web UI, probably http://localhost:9870 and go to “Menu -> Utilities -> Browse the file system“. We should see something like the following screenshot because our file system is empty:

To achieve the same on the console, we can execute the next command:

hadoop fs -ls /

It is important to notice that we need to specify the path. If it is not specified it will throw an error. For more information about the error, we can read this link.

After executing the command, we will see that nothing is listed, but we know it is working. Let’s know upload our data. First, we will create a folder, and after that, we will upload the data file:

$ hadoop fs -mkdir /ml-100k
$ hadoop fs -copyFromLocal u.data /ml-100k/u.data
$ hadoop fs -ls /
$ hadoop fs -ls /ml-100k

Easy and simple. As we can see it works as we would expect from any file system. We can remove the file and folder we have just added:

$ hadoop fs -rm ml-100k/u.data
$ hadoop fs -rmdir ml-100k

Using MapReduce

The file we are using has the format:

user id | item id | rating | timestamp. 

To process it, we are going to be using Python and a library called MRJob. More information can be found here.

We need to install it to be able to use it:

pip3 install mrjob

And, we need to build a very basic MapReduce job, for example, one that lists the count of given ratings on the data.

The code should look like this:

from mrjob.job import MRJob
from mrjob.step import MRStep

class RatingsBreakdown(MRJob):
 
  def mapper(self, _, line):
    (userID, movieID, rating, timestam) = line.split('\t')
    yield rating, 1

  def reducer(self, key, values):
    yield key, sum(values)

if __name__ == '__main__':
  RatingsBreakdown.run()

The code is pretty simple and easy to understand. A job is defined by a class that inherits from MRJob. This class contains methods that define the steps of your job. A “step” consists of a mapper, a combiner, and a reducer, but all of those are optional, though we must have at least one. The mapper() method takes a key and a value as args (in this case, the key is ignored, and a single line of text input is the value) and yields as many key-value pairs as it likes. The reduce() method takes a key and an iterator of values and also yields as many key-value pairs as it likes. (In this case, it sums the values for each key). The final required component of a job file is to invoke the run() method.

We can run it in different ways which are very useful depending on the type of goal we have. For example, if we have just a small file with a small sample of the data we are just using to test the code, we can run the Python script with a local file. If we want to run a similar sample data but see how it behaves in Hadoop, we can upload a local file on-demand while executing the script. Finally, if we want to execute the whole shebang, we can run the script against the big bulky file we have on the file system. For the different options, we have the following commands. Considering the whole file, in the case, is small we can run the three types of executions using the same file.

$ python3 ratings-breakdown.py u.data
$ python3 ratings-breakdown.py -r hadoop u.data
$ python3 ratings-breakdown.py -r hadoop hdfs:///ml-100k/u.data

The result, no matter what execution we have decided to use, would be:

"1"	6110
"2"	11370
"3"	27145
"4"	34174
"5"	21201

As a curiosity, if we now take a look at the ResourceManager Web UI probably http://localhost:8088, we will see the jobs executed for the last two commands:

This is all for today. We have stored something in our Hadoop file system and we have executed our first MapReduce job.

Hadoop: First MapReduce example

Hadoop: Installation on macOS

In previous articles, we have seen a quick introduction to Big Data and Hadoop and its ecosystem. Now it is time to install Hadoop in our local machines to be able to start playing with Hadoop and with some of the tools in its ecosystem.

In my case, I am going to install it on a macOS system, if any of you is running a GNU/Linux system, you can ignore the initial step (the homebrew installation) and take a look at the configurations because they should be similar.

Installing Hadoop

As I have said before, we are going to use homebrew to install Hadoop, concretely version 3.3.3 is going to be installed. Because we are using the package manager the installation is going to be pretty simple but, if you prefer to do it manually, you just need to go to the Hadoop download page, and download and extract the desired version. The configuration steps after that should be the same.

To install Hadoop using homebrew we just need to execute:

brew install hadoop

A very important consideration is that Hadoop does not work with the latest version of Java neither 17 nor 18. It is necessary to have the Java 11 version installed. We can still use whatever version we want on our system, but to execute the Hadoop commands we should set the correct Java version in our terminal. As a side note, as far as I know, it should work with Java 8 too but I have not tested it.

Allowing SSH connections to localhost

One previous consideration we need to have is that for Hadoop to start all nodes properly it needs to be able to connect using SSH to our machine, in this case, localhost. By default, in macOS this setting is disabled, to be able to enable it, we just need to go to “System Preferences -> Shared -> Remote Session“.

Setting environment variables

Reading about this I have only found confusion, some tutorials do not even mention the addition of the environment variables, others only declare the HADOOP_HOME variable, and others declare a bunch of stuff. After a few tests, I have collected a list of environment variables we can (we should) define. They allow us to start Hadoop without problems. Is it possible my list has more than required? Yes, absolutely. But, for now, I think the list is comprehensive and short enough, my plan as I have said before is to play with Hadoop and tools around it, not to become a Hadoop administrator (yet). There are some of them, I have just as a reminder that they exist, for example, the two related to the native libraries: HADOOP_COMMON_LIB_NATIVE_DIR and HADOOP_OPTS, they can be skipped if we want, there will be a warning when running Hadoop but nothing important (see comment below about native libraries). We can add the variables to our “~/.zshrc” file.

The list contains the next variables:

# HADOOP env variables                                                                                                  export HADOOP_HOME="/usr/local/Cellar/hadoop/3.3.3/libexec"                                                              
export PATH=$PATH:$HADOOP_HOME/bin                                                                                       
export PATH=$PATH:$HADOOP_HOME/sbin                                                                                      
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop                                                                           
export HADOOP_MAPRED_HOME=$HADOOP_HOME                                                                                   
export HADOOP_COMMON_HOME=$HADOOP_HOME                                                                                   
export HADOOP_HDFS_HOME=$HADOOP_HOME                                                                                     
export YARN_HOME=$HADOOP_HOME                                                                                            
export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native                                                              
export HADOOP_OPTS="-Djava.library.path=$HADOOP_HOME/lib"                                                                
export HADOOP_CLASSPATH=${JAVA_HOME}/lib/tools.jar

Configuring Hadoop:

File $HADOOP_HOME/etc/hadoop/hadoop-env.sh

In this file, we need to find the JAVA_HOME path and fill it with the appropriate value, but if we carefully read the file, one of the comments says “variable is REQUIRED on ALL platforms except OS X!“. Despite the comment, we are going to set it. If we do not set it, we will have problems running the NodeManager. It will not start and it will show in the logs an error related to the Java version in use.

File $HADOOP_HOME/etc/hadoop/core-site.xml

Here, we need to configure the HDFS port and a folder for temporary directories. This folder can be wherever you want, in my case, it will reside in a sub-folder of my $HOME directory.

<configuration>                                                                                                          
  <property>                                                                                                             
    <name>fs.defaultFS</name>                                                                                            
    <value>hdfs://localhost:9000</value>                                                                                 
  </property>                                                                                                            
  <property>                                                                                                             
    <name>hadoop.tmp.dir</name>                                                                                          
    <value>/Users/user/.hadoop/hdfs/tmp</value>                                                                      
    <description>A base for other temporary directories</description>                                                    
  </property>                                                                                                            
</configuration> 

File $HADOOP_HOME/etc/hadoop/mapred-site.xml

We need to add a couple of extra properties:

<configuration>                                                                                                          
  <property>                                                                                                             
    <name>mapreduce.framework.name</name>                                                                                
    <value>yarn</value>                                                                                                  
  </property>                                                                                                            
  <property>                                                                                                             
    <name>mapreduce.application.classpath</name>                                                                         
    <value>$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*:$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*</value>         
  </property>                                                                                                            
</configuration>

File $HADOOP_HOME/etc/hadoop/yarn-site.xml

In a similar way, we need to add a couple of extra properties:

<configuration>                                                                                                          
  <property>                                                                                                             
    <name>yarn.nodemanager.aux-services</name>                                                                           
    <value>mapreduce_shuffle</value>                                                                                     
  </property>                                                                                                            
  <property>                                                                                                             
    <name>yarn.nodemanager.env-whitelist</name>
    <value>JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_MAPRED_HOME</value>
  </property>                                                                                                            
</configuration>

File $HADOOP_HOME/etc/hadoop/hdfs-site.xml

And, again, one more property:

<configuration>                                                                                                          
  <property>                                                                                                             
    <name>dfs.replication</name>                                                                                         
    <value>1</value>                                                                                                     
  </property>                                                                                                            
</configuration>

Formatting the HDFS Filesystem

Before we can start Hadoop to be able to use HDFS, we need to format the file system. We can do this by running the following command:

hdfs namenode -format

Starting Hadoop

Now, finally, we can start Hadoop. If we have added $HADOOP_HOME/bin and $HADOOP_HOME/sbin properly to our $PATH, we should be able to execute the command start-all.sh. If the command is not available, have you restarted the terminal after editing your .zshrc file or executed the command source .zshrc?

If everything works, we should be able to see something like:

And, to verify that everything is running accordingly, we can execute the jps command:

Some useful URLs are:

NameNode Web UI
ResourceManager Web UI
NodeManager Web UI

What are the native libraries?

If someone has paid close attention to the screenshot representing the starting of Hadoop, I am sure they have observed a warning line like:

WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

Despite us setting the variables HADOOP_COMMON_LIB_NATIVE_DIR and HADOOP_OPTS on the environment, the truth is the lib folder is not present in our installation.

Hadoop has native implementations of certain components for performance reasons and the non-availability of Java implementations. These components are available in a single, dynamically-linked native library called the native Hadoop library. On the *nix platforms the library is named libhadoop.so. On mac systems it is named libhadoop.a. Hadoop runs fine while using the java classes. But you will gain speed with native. More importantly, some compression codecs are only supported in native, if any other application is dependent on Hadoop or your jars depend on these codecs then jobs will fail unless native libraries are available.

The installation of the native Java libraries is beyond this article.

The last thing is to stop Hadoop, to do this we just need to run the command stop-all.sh:

Hadoop: Installation on macOS

Hadoop and its ecosystem

In the previous article, we had a brief introduction to Big Data. To be able to take advantage of the emerging opportunities for organisations that it creates, some frameworks or tools have appeared in the last few years. One of these frameworks is Hadoop and all the ecosystem of tools created around it.

The Apache Hadoop software library is a framework that allows for the distributed processing of large data sets across clusters of computers using simple programming models. It is designed to scale up from single servers to thousands of machines, each offering local computation and storage. Rather than rely on hardware to deliver high availability, the library itself is designed to detect and handle failures at the application layer, so delivering a highly-available service on top of a cluster of computers, each of which may be prone to failures.

Apache Hadoop website

The project includes these modules:

  • Hadoop Common: The common utilities that support the other Hadoop modules.
  • Hadoop Distributed File System (HDFS™): A distributed file system that provides high-throughput access to application data.
  • Hadoop YARN: A framework for job scheduling and cluster resource management.
  • Hadoop MapReduce: A YARN-based system for parallel processing of large data sets.

Hadoop Distributed File System (HDFS™)

HDFS creates an abstraction that allows users to see HDFS logically as a single storage unit, but the data is stored across multiple nodes in a distributed fashion. HDFS follows master-slave architecture where the master is the NameNode and the slaves – there can be multiple slaves – are known as DataNodes.

NameNodes have the next characteristics or functions:

  • It is the master daemon that maintains and manages the DataNodes.
  • It records the metadata of all the blocks stored in the cluster, e.g. location of blocks stored, size of the files, permissions, hierarchy, etc.
  • It records every change that takes place to the file system metadata.
  • If a file is deleted in HDFS, the NameNode will immediately record this in the EditLog.
  • It regularly receives a heartbeat and a block report from all the DataNodes in the cluster to ensure that the DataNodes are alive.
  • It keeps a record of all the blocks in the HDFS and DataNode in which they are stored.
  • It has high availability and federation features.

DataNodes have the next characteristics or functions:

  • It is the slave daemon which runs on each slave machine.
  • The actual data is stored on DataNodes.
  • It is responsible for serving read and write requests from the clients.
  • It is also responsible for creating blocks, deleting blocks and replicating the same based on the decisions taken by the NameNode.
  • It sends heartbeats to the NameNode periodically to report the overall health of HDFS, by default, this frequency is set to 3 seconds.

Some of the problems that it fixes are:

  • Storage space problems: HDFS provides a distributed way of storing Big Data. The data is stored in blocks across the available DataNodes. In addition, it will also replicate the data blocks on different DataNodes. Taking advantage of this architecture allows for horizontal scaling allowing the addition of some extra data nodes to the HDFS cluster when required increasing the available storage space.
  • Variety of data: HDFS can store all kinds of data whether it is structured, semi-structured or unstructured.
  • Access and process velocity: To increase the access and process velocity HDFS moves processing to data and not data to processing. Instead of moving data to the master nodes and then processing the data, the processing logic is sent to the various data nodes where the data is processed in parallel across different nodes. Once they are done, the processed results are sent to the master node where the results are merged and the response is sent back to the client.

Hadoop YARN

YARN performs all the processing activities by allocating resources and scheduling tasks. YARN has a ResourceManager and one or more NodeManagers.

The ResourceManager has the next characteristics or functions:

  • It is a cluster-level (one for each cluster) component and runs on the master machine.
  • It manages resources and schedules applications running on top of YARN.
  • It has two components: Scheduler & ApplicationManager.
  • The Scheduler is responsible for allocating resources to the various running applications.
  • The ApplicationManager is responsible for accepting job submissions and negotiating the first container for executing the application.
  • It keeps a track of the heartbeats from the Node Manager.

The NodeManagers have the next characteristics or functions:

  • It is a node-level component (one on each node) and runs on each slave machine.
  • It is responsible for managing containers and monitoring resource utilisation in each container.
  • It also keeps track of node health and log management.
  • It continuously communicates with ResourceManager to remain up-to-date.

Despite all the benefits Hadoop offers, it is not a silver bullet and its use needs to be carefully considered. Some of the use cases where it is not recommended are:

  • Low Latency data access: Queries that have a maximum and short response time.
  • Constant data modifications: Hadoop is a better fit when the primarily concerned is reading data.
  • Lots of small files: While Hadoop can store multiple small datasets, it is much more suitable for scenarios where there are few but large files.

Hadoop MapReduce

Hadoop MapReduce is the core Hadoop ecosystem component which provides data processing. MapReduce is a software framework for easily writing applications that process the vast amount of structured and unstructured data stored in the Hadoop Distributed File system.
MapReduce programs are parallel in nature, and thus are very useful for performing large-scale data analysis using multiple machines in the cluster. Thus, it improves the speed and reliability of cluster this parallel processing.

Hadoop ecosystem

Hadoop was designed as both a computing (MapReduce) and storage (HDFS) platform from the very beginning. With the increasing need for big data analysis, Hadoop attracts lots of other software to resolve big data questions and merges into a Hadoop-centric big data ecosystem. The following diagram gives a brief overview of the Hadoop big data ecosystem:

O’Reilly – Overview of the Hadoop ecosystem

We will go into more detail on some of the elements in future articles.

Hadoop and its ecosystem

Big Data

Big data refers to data sets that are too large or complex to be dealt with by traditional data-processing application software. In general, the data contains greater variety, has been originated from more sources and arrives in increasing volumes and with more velocity. The advantage and why Big Data is important is that these massive volumes of data can be used to address business problems we would not have been able to tackle before, and gain insights and make predictions that were out of our reach before.

While initially, Big Data was defined around the concepts of volume, velocity and variety, lately two more concepts have been added: value and veracity. All data can be initially considered noise till its intrinsic value has been discovered. In addition, to be able to discover this value and utilise the data, we need to consider how reliable and truthful is.

A few factors have influenced the growth of the importance of Big Data:

  • The cheap storage possibilities.
  • Cloud Computing offers truly elastic scalability.
  • The advent of the Internet of Things (IoT) and the gathering of data related to customer usage patterns and product performance.
  • The emergence of Machine Learning.

Big Data offers challenges not only around data storage but around curation. For data to be useful needs to be clean and relevant to the organisations in a way that enables meaningful analysis. Data scientists spend a big chunk of their time curating the data to allow data analysts to extract proper conclusions from it.

Part of this curation process is influenced by the source of the data and its format. We can categorise our data into three main types:

  • Structured data: Any data that can be processed, accessed and stored in a fixed format is named structured data.
  • Unstructured data: Any data which has an unfamiliar structure or model is named unstructured data.
  • Semi-structured data: It refers to the data that, even though it has not been ordered under a specific database, yet contains essential tags or information that isolate singular components inside the data.

As we have mentioned before, Big data gives us new insights that open up new opportunities and business models. Getting started involves three key actions:

  • Integration: Big data brings together data from many disparate sources and applications. Traditional data integration mechanisms, such as extract, transform, and load (ETL) generally are not up to the task. It requires new strategies and technologies to analyze big data sets at terabyte, or even petabyte, scale. During integration, you need to bring in the data, process it, and make sure it’s formatted and available in a form that your business analysts can get started with.
  • Management: Big data requires storage. Where to store the data? In the cloud, on-premises or a hybrid solution. How to store the data? In which form the data is going to be stored that is going to allow on-demand processes to work with it.
  • Analysis: To obtain any kind of insight the data needs to be analysed and act on. We can build visualisation to clarify the meaning of data sets, mix and explore different datasets to make new discoveries, share our findings with others, or build data models with machine learning and artificial intelligence.

Some best practices when working in the Big Data space are:

  • Align big data with specific business goals: More extensive data sets enable us to make new discoveries. To that end, it is important to base new investments in skills, organization, or infrastructure with a strong business-driven context to guarantee ongoing project investments and funding. To determine if we are on the right track, ask how big data supports and enables your top business and IT priorities. Examples include understanding how to filter web logs to understand e-commerce behaviour, deriving sentiment from social media and customer support interactions, and understanding statistical correlation methods and their relevance for the customer, product, manufacturing, and engineering data.
  • Ease skills shortage with standards and governance: One of the biggest obstacles to benefiting from our investment in big data is a skills shortage. You can mitigate this risk by ensuring that big data technologies, considerations, and decisions are added to your IT governance program. Standardizing your approach will allow you to manage costs and leverage resources. Organizations implementing big data solutions and strategies should assess their skill requirements early and often and should proactively identify any potential skill gaps. These can be addressed by training/cross-training existing resources, hiring new resources, and leveraging consulting firms.
  • Optimise knowledge transfer with a centre of excellence: Use a centre of excellence approach to sharing knowledge, control oversight, and manage project communications. Whether big data is a new or expanding investment, the soft and hard costs can be shared across the enterprise. Leveraging this approach can help increase big data capabilities and overall information architecture maturity in a more structured and systematic way.
  • The top payoff is aligning unstructured with structured data: It is certainly valuable to analyse big data on its own. But you can bring even greater business insights by connecting and integrating low-density big data with the structured data you are already using today. Whether you are capturing customer, product, equipment, or environmental big data, the goal is to add more relevant data points to your core master and analytical summaries, leading to better conclusions. For example, there is a difference in distinguishing all customer sentiment from that of only your best customers. This is why many see big data as an integral extension of their existing business intelligence capabilities, data warehousing platform, and information architecture. Keep in mind that the big data analytical processes and models can be both human – and machine-based. Big data analytical capabilities include statistics, spatial analysis, semantics, interactive discovery, and visualization. Using analytical models, you can correlate different types and sources of data to make associations and meaningful discoveries.
  • Plan your discovery lab for performance: Discovering meaning in your data is not always straightforward. Sometimes we do not even know what we are looking for. That is expected. Management and IT needs to support this “lack of direction” or “lack of clear requirement”.
  • Align with the cloud operating model: Big data processes and users require access to a broad array of resources for both iterative experimentation and running production jobs. A big data solution includes all data realms including transactions, master data, reference data, and summarized data. Analytical sandboxes should be created on demand. Resource management is critical to ensure control of the entire data flow including pre- and post-processing, integration, in-database summarization, and analytical modelling. A well-planned private and public cloud provisioning and security strategy plays an integral role in supporting these changing requirements.
Big Data

Big data: Introduction

The act of gathering and storing large amounts of information for eventual analysis is ages old. But some years ago the term “Big Data” was created to define data sets that are so large or complex that traditional applications are not enough to process it.

The variety of challenges included in this term is large. It contains things like information gathering, analysis, storage and search. In general all the tasks that are related with the organization, management and analysis of the data.

We are living in a connected world, there is no question about that. Almost everything nowadays is connected to the Internet and if it is not, probably, it is going to be connected soon, ths is the tendency. We are not talking just about computers or laptops, we are talking about mobile devices, wearables, cars, home appliances… In addition, we do an extensive use of the Internet like social networks where we have our friends, our favourite things. We use different devices to control our health parameters or our activities. Calendars, contacts, schedules, searched information, readed newspapers, online shopping are a few examples of all the information we have online and reachable for some or all the companies outside. They have our profile as an individual person and as a group, what we like or dislike, what we want, what we do. It is true that it is mixed with a lot of noise, a lot of useless information, but here it is when it comes the Big Data. A way to do something productive with this information, a way to find a ROI (Return Of Inversion) for the time and the money they expend analysing the data.

Nowadays, the data growth driven by unstructured data. This is true, there are no standard formats, there are thousands of devices generating informacion growing fast in numbers (IoT), ourselves are generating huge amounts of unstructured data, and this is precisely one of the Big Data challenges.

As a more formal definition, the concept gained momentum in the early 2000s when industry analyst Doug Laney articulated the now-mainstream definition of Big Data as the three Vs:

  • Volume: Organizations collect data from a variety of sources, including business transactions, social media and information from sensor or machine-to-machine data. In the past, storing it would’ve been a problem – but new technologies (such as Hadoop) have eased the burden.
  • Velocity: Data streams in at an unprecedented speed and must be dealt with in a timely manner. RFID tags, sensors and smart metering are driving the need to deal with torrents of data in near-real time.
  • Variety: Data comes in all types of formats – from structured, numeric data in traditional databases to unstructured text documents, email, video, audio, stock ticker data and financial transactions.

A couple more can be added, like:

  • Variability: Inconsistency of the data set can hamper processes to handle and manage it. Daily, seasonal and event-triggered peak data loads can be challenging to manage. Even more so with unstructured data.
  • Veracity: The quality of captured data can vary greatly, affecting accurate analysis.

From a business point of view, any company involved in Big Data needs to:

  • Collect the information: Probably from multiple sources.
  • Integrate the information: All the collected unstructured information and their own information.
  • Analyze the information: Extract concrete tendencies, spot business trends, find patterns, or any conclusions they want to obtain.
  • Take actions or decisions: Based on the analysis.

For all the reasons exposed the systems needs to be real time, scalable and high performance systems, been not enough with the tradicional systems.

This article is just a little introduction to what is Big Data. It is planned to go deeper in this topic and in technologies related with it.

See you.

Big data: Introduction