Tuesday, November 12, 2013

MapReduce on .NET Steroids

Ok, so everyone's talking about MapReduce, so I'm gonna chime in, .NET Style!

Wikipedia defines "MapReduce" as a programming model for processing large data sets with a parallel, distributed algorithm on a cluster.

"The actual origins of MapReduce are arguable, but the paper that most cite as the one that started us down this journey is “MapReduce: Simplified Data Processing on Large Clusters” by Jeffrey Dean and Sanjay Ghemawat in 2004. This paper described how Google split, processed, and aggregated their data set of mind-boggling size." (O'reilly publishing)

If you're thinking about distributed parallel computing, think again, this blog post is only about the Functional Programming Map and Reduce model.
I'm going to show you how the basics are done using .net and Linq as an amazing functional programming framework, this is the basics we need in order to take the next step towards distributed computing easily.

So what is "Map-Reduce" ?

Well, it's basically a data processing model that's aimed at large datasets, where you need as much horsepower as you can get in order  to get results at a reasonable time frame.
If you can split your data source into X parts, that have no dependency among each other, OR if you have a list of data items (which is basically the same thing) you are good to go.

So go where? Let's think of a "simple" problem/example.
Let's say we have a set of newspaper articles and we want to find out the number of times certain key words show up, e.g. "Peace", "War", "Love", "Hate"...

First step, mapping step, give every intern (assuming you have intern to spend on this stupid project) one newspaper, and tell him: "You! take this newspaper and write down on a blank piece of paper every time a key word shows up!"

Once, your eager minions are done you need to take their mapped data and send it to the next step, groung step. You take one of your loved workers and tell him/her to take new sheets of blank paper, one sheet will be for "Peace", one for "War" and so on. The grouper needs to go through every word found and write them again but on the appropriately designated sheet.

After this we have sheets of "War", sheets of "Peace", of "Love" and of "Hate".
But we still don't know how many words of each type. (yes, if you're thinking that our grouper couldv'e already counted while grouping you are correct, but remember that this is a simple example :) ).

Next and final step, Reduce!
Find a bunch of workers that are bored and give each a sheet from the grouper's sheets.
Tell them to find the data we want from the grouped data, in this simple case, just count the words.

In the end they will give you the result:
"War" - X times
"Peace" - Y times
and so on for the rest of the key words.

What we have do here is essentially MapReduce in all of it's blazing parallel distributed glory!!
This is what Google DOES!
They distributed the data sheets to many workers (computers) let them work in parallel, grouped the results, and reduced back the answers with more distributed parallel computing! no need to bother fellow interns! :)

I ran across a cute example on the web for map reduce concerning how Facebook tells us what friends in common we have with our own friends.
http://stevekrenzel.com/finding-friends-with-mapreduce
(Facebook probably does something different, but it's a great example)

You'd think that this would be an easy task... you'd think that it's simple using SQL. However please remind yourself that Facebook does not use relational databases for it's 1 Billions users!!
So it has to be either key value or graph databases and probably combined together :) I would probably use SQL-MapReduce or something of that nature.

So, in order to fully understand the theory behind this paradigm shift in distributed computing, we need to fully grasp the Functional Programming theory underlying this methodology. (I cannot stress this enough).

Let's try to accomplish this word counting task in C#.
Remember, we need to Map -> Group -> Reduce.

Well first off we need some sort of Data Source. let's say a list of strings, where each string represents a sheet of words.

And so:

var sheets = new List();
sheets.AddRange(some sheets);

So, we're generally thinking of an enumerable data structure for now.

How would we go about defining our mapper?
A mapper needs to take a sheet of paper and write down each time a key word is encountered.

public static IList Map(String sheet)
{
       string pattern = "war|peace|love|hate";

       var matches = (from Match m in Regex.Matches(sheet, pattern) select m.Value).ToList();

       return matches;
}

As you can see it's pretty much hard wired to our specific task, not a good programming practice.
Let's deal with that in a moment.

Okay, now we have our data mapped, that's good but we need to group it somehow, and for any grouping action to occur one needs a Key.
In this simple example, it's pretty obvious that we need to group the mapped data by the words we're looking for.

How about:

var groups = sheets.SelectMany(Map).GroupBy(y => y);

That's pretty simple and adequate.
After we have our groups, we want to get the final results we've been waiting for, and that's the number of repetitions of every key word.

So our reducers can work independently, each can take a group for instance and solve our task.

public static KeyValuePair Reduce(IGrouping group)
{
     return new KeyValuePair(group.Key, group.Count());
}

That's it! All we have to do now is combine it all.

var mapped = sheets.SelectMany(Map);
var groups = mapped.GroupBy(y => y);
var results = groups.Select(Reduce);

Our results look something like:
[love:2]
[war: 4]
and so on...

To simplify things we could just write:

var results = sheets.SelectMany(Map).GroupBy(y => y, Reduce);

Which is exactly the same thing.
Pretty simple isn't it!
This is map reduce by functional programming. The map and reduce stages both can run in parallel.

So what's missing? ... oh yeah, it's really non generic, or is it...

Well if you look at that one liner just above, the only thing that's not generic is the "sheets" source, but that's easily transformed to any source.

To make things generic, let's put on the table the types we need.

TSource - for our datasource type
TMapper - for our mapping results
TKeySelector - for our grouping key
TResult - for the end result type we want

If we create some sort of generic Map-Reduce method, it will need all of these parameters to work.
And what are they?

TSource - is the type of an IEnumerable or some sort.
TMapper - is a part of a function, TSource->IEnumerable
TKeySelector - is part of our key selecting  function TMapper->TKeySelector
TResult - we be returned from a function TKeySelector , IEnumerable -> TResult

And so as written as an extension method, it would look something like:

public static IEnumerable<TResult> MapReduce<TSource, TMapper, TKeySelector, TResult>(
           this IEnumerable<TSource> source,
           Func<TSource, IEnumerable<TMapper>> mapper,
           Func<TMapper, TKeySelector> keySelector,
           Func<TKeySelector, IEnumerable<TMapper>, TResult> reducer)
        {
            return source.SelectMany(mapper).GroupBy(keySelector, reducer);
        }

wow, you really have to look at it for a second to realize that it's this simple using Linq.
Without it, think of the amount of code needed to loop and group...
This is the amazing strength of modern C#, simplicity.

Now just provide the needed delegates and this function will mapreduce and problem.

Just to get a taste of distribution of processing, just add PLinq.
You can add .AsParallel() to the mappers and reducers and voila.
So easy.

But wait...

Delegates are so constrictive.
What if our datasource was IQueryable... and let's say that it manages it's own distributed computing mechanism or selecting and grouping...
For this we need to modify our method by just a bit:

public static IQueryable<TResult> MapReduce<TSource, TMapper, TKeySelector, TResult>(
            this IQueryable<TSource> source,
            Expression<Func<TSource, IEnumerable<TMapper>>> mapper,
            Expression<Func<TMapper, TKeySelector>> keySelector,
            Expression<Func<TKeySelector, IEnumerable<TMapper>, TResult>> reducer)
        {
            return source.SelectMany(mapper).GroupBy(keySelector, reducer);
        }

Wait.. are you serious? that's it? just add expression around the func?
YES!

Now if you have a query provider that supports distributed computing, this method will do the job.

And there is: http://research.microsoft.com/en-us/projects/dryadlinq/

unfortunately it's a discontinued project, but interesting nonetheless.

http://hadoopsdk.codeplex.com/ is what microsoft is focusing on.