Skip to main content

MapReduce on Hadoop - Big Data in Action

In the previous post we’ve discovered what the secret of Hadoop is when it needs to store hundredths of TB. Based on a simple master-slave architecture, Hadoop is a system that can store and manipulate big amount of data very easily.
Hadoop contains two types of nodes for storing data. The NameNode is the node that plays the role of master. It knows the name and locations of each file that Hadoop stores.  It is the only node that can identify the location of a file based on the file name. Around this node we can have 1 to n nodes that store the files content. The name of this kind of nodes is DataNode.
Data processing
Hadoop stores big data without any kind of problems. But it became known as the system that can process big data in a simple, fast and stable way. It is a system that can process and extract the information that we want from hundredths of TB of data. This is why Hadoop is the king of big data. In this post we will discover the secret of data processing – how Hadoop manage to do this.
The secret that give us the ability to process data is called Hadoop. This paradigm it was not invented by Hadoop, but Hadoop managed to implement it very good. The first meeting with MapReduce will be hard for us. It will be pretty complicated to understand it. Each person that wants to use MapReduce needs to understand first the MapReduce paradigm.
Without understanding MapReduce we will not be able to know if Hadoop is the solution for our problem and what kind of data we should expect from Hadoop.
MapReduce and Tuples
Don’t expect Hadoop to be a system that stores data on tables. This system doesn’t have the concept of tables. It only works with tuples that are formed by a key and a value. This is the only thing that Hadoop uses to extract data. Each task that is executed in this system will accept as input this tuples. Of course the output of a task will be formed by (key, values) pairs. Each pair can contain one or more values.
Even if this tuple seems to be trivial, we will see that this is the only thing that we need if we want to process data.
Map
The MapReduce process is formed from two differents steps – Map and Reduce. The Map is the process used to convert the input data into a new set of data. The data that will be obtained after this step is only intermediate data that will be used in the next step. We have the option to persist this data, but generally this information is not relevant for the end user.
The Map action is not executed on only one node. This action is executed on 1 to m nodes of DataNode type. Each DataNode on which this action is executed will contain the input data – because of this on each node we execute the Map over a part of input data. The result size of this action is smaller than the input data. This data can be processed more easily. At this step we have the result in the memory. The result is not written to the disk.
We can image that the output of this step is like a summary of our data. Based on the input and how we want to map the input data we will obtain different results. At this step, the output data doesn’t need to have the same format as the input data. The result is partitioned based on the function that uses the key of the tuple. In general a hash function is applied, but we can define any kind of partitioning mechanism.
The intermediate result can be used by Hadoop for different operations. At this step we can execute actions like sorting or shuffle. This small steps can prepare the data for the next step. This operations can and are executed also after the Reduce step.
From the parallelize point of view, on each node where the Map reduce is executed we can have from 10 to 100-150 operations in the same time. The number of concurrent operations is dictated by the hardware performance and the complexity of the Map action.
Reduce
Once we have the intermediate results, we can start the next step of processing – Reduce. In comparison with the Map operation, the Reduce step operation cannot be executed on each node of Hadoop. This operation will be executed on only a small part of the nodes. This is happening because the size of data that we need to process was already reduced. Each data is partitioned for each Reducer.
If the Map reduce was formed by only one step, we will see that Reduce contains 3 main steps:

  • Shuffle
  • Sort
  • Reduce

In the moment when the Shuffle step is executed, each DataNode that was involved in the Map operation starts to send the results to the nodes that will run the Reduce operation. The data is send over an HTTP connection. Because Hadoop runs in a private network, we don’t have any kind of security problems.
All the key value pairs are send and sorted based on the key. This needs to be done because there are cases when we can have the same key from different nodes. In general, this step is done in parallel with the shuffle process.
Once the shuffle step ends, the Hadoop system will start to make another sort. In this moment Hadoop can control how the data is sorted and how the result will be grouped. This sort step give us the possibility to sort items not only by key, but also based by different parameters. This operation is executed on disk and also on memory.
The last step that needs to be executed is the Reduce. In the moment when this operation is executed, the final results will be written on disk. At this step, each tuple is formed from a key and a collection of values. From this tuple, the Reduce operation will select a key and only one value – the value will represent the final value.
Even if the Reduce step is very important, we can have cases when this step is not necessary. In this cases the intermediate data is the final result for the end user.
JobTracker, TaskTracker
The MapReduce operation requires two types of services - JobTracker and TaskTracker. This two types of services are in a master-slave relationship that is very similar with the one that we saw earlier on how the data is stored - NameNode and DataNode.
The main scope of the JobTracker is to schedule and monitor each action that is executed. If one of the operations failes, then the JobTracker is capable to rerun the action.
JobTracker discusses with the NameNode and programs the actions in a way that each job is executed on the DataNode that has the input data – in this way no input data is send over the wire.
TaskTracker is a node that accepts Map, Reduce and Suffle operations. Usually this is the DataNode where the input data can be found, but we can have exceptions from this rule. Each TaskTracker has a limited number of jobs that can be executed - slots. Because of this the JobTracker will try to execute jobs on the TaskTracker that has free slots.
From the execution model an interesting thing is the way how jobs are executed. Each job is executed on a separate JVM process. Because of this if something happens (an exception appears), only one job will be affected. The rest of the jobs will run without problems.  
Example
Until now we have discovered how MapReduce works. Theory is very good, but we need also to practice. I propose a small example that will help us to understand how MapReduce works. In this way we will be able to understand in a simple way how MapReduce is doing his magic.
We will start from the next problem. We have hundreds of files that contains the number of accidents from each city of Europe that happened every month. Because UE is formed from different countries that have different system we end up with a lot of files. Because of this, we have files that contains information from the cities of a country, others contains information for only one city and so on. Let’s assume that we have the following file format:
London, 2013 January, 120
Berlin, 2013 January, 300
Roma, 2013 February, 110
Berlin, 2013 March, 200

Based on the input data we need to calculate the maximum number of accidents that took place in each city during a month. This simple problem can become a pretty complicated one when we have 10 TB of input data. In this case Hadoop is the perfect solution for us.
The first operation from MapReduce process is Map. In this moment each file will be process and a key value collection will be obtained. In our case the key will be represented by the name of the city and the value will be the number of accidents. From each file we will extract the maximum number of accidents from each city during a month. This would represent the Map operation and the output would be something like this:
(London, 120), (Berlin, 300), (Roma, 110), (London, 100), (Roma, 210), …
This intermediate result has no value for us (yet). We need to extract the maximum number of accidents for each city. The Reduce operation will be applied now. The final output would be:
(London, 120)
(Berlin, 300)
(Roma, 210)
A similar mechanism is used by Hadoop. The power of this mechanism is the simplicity. Having a simple mechanism, it can be duplicated and controlled very easily over the network.
Conclusion
In this article we found out how Hadoop process the data using MapReduce. We discovered that the core of the mechanism is very simple, but is duplicated on all the available nodes. One each node we can have more than one job that can run in parallel. In conclusion we could say that all the tasks that are executed over the data are maximum paralyzed. Hadoop tried to use all the resources that are available.
As a remark, don’t forget that the native language for Hadoop is Java, but it has support for other languages like Python, C# or PHP.

Comments

Popular posts from this blog

Windows Docker Containers can make WIN32 API calls, use COM and ASP.NET WebForms

After the last post , I received two interesting questions related to Docker and Windows. People were interested if we do Win32 API calls from a Docker container and if there is support for COM. WIN32 Support To test calls to WIN32 API, let’s try to populate SYSTEM_INFO class. [StructLayout(LayoutKind.Sequential)] public struct SYSTEM_INFO { public uint dwOemId; public uint dwPageSize; public uint lpMinimumApplicationAddress; public uint lpMaximumApplicationAddress; public uint dwActiveProcessorMask; public uint dwNumberOfProcessors; public uint dwProcessorType; public uint dwAllocationGranularity; public uint dwProcessorLevel; public uint dwProcessorRevision; } ... [DllImport("kernel32")] static extern void GetSystemInfo(ref SYSTEM_INFO pSI); ... SYSTEM_INFO pSI = new SYSTEM_INFO(

Azure AD and AWS Cognito side-by-side

In the last few weeks, I was involved in multiple opportunities on Microsoft Azure and Amazon, where we had to analyse AWS Cognito, Azure AD and other solutions that are available on the market. I decided to consolidate in one post all features and differences that I identified for both of them that we should need to take into account. Take into account that Azure AD is an identity and access management services well integrated with Microsoft stack. In comparison, AWS Cognito is just a user sign-up, sign-in and access control and nothing more. The focus is not on the main features, is more on small things that can make a difference when you want to decide where we want to store and manage our users.  This information might be useful in the future when we need to decide where we want to keep and manage our users.  Feature Azure AD (B2C, B2C) AWS Cognito Access token lifetime Default 1h – the value is configurable 1h – cannot be modified

ADO.NET provider with invariant name 'System.Data.SqlClient' could not be loaded

Today blog post will be started with the following error when running DB tests on the CI machine: threw exception: System.InvalidOperationException: The Entity Framework provider type 'System.Data.Entity.SqlServer.SqlProviderServices, EntityFramework.SqlServer' registered in the application config file for the ADO.NET provider with invariant name 'System.Data.SqlClient' could not be loaded. Make sure that the assembly-qualified name is used and that the assembly is available to the running application. See http://go.microsoft.com/fwlink/?LinkId=260882 for more information. at System.Data.Entity.Infrastructure.DependencyResolution.ProviderServicesFactory.GetInstance(String providerTypeName, String providerInvariantName) This error happened only on the Continuous Integration machine. On the devs machines, everything has fine. The classic problem – on my machine it’s working. The CI has the following configuration: TeamCity .NET 4.51 EF 6.0.2 VS2013 It see