//*********************************************************************************************************
// © 2013 jakemdrew.com. All rights reserved.
// This source code is licensed under The GNU General Public License (GPLv3):
// http://opensource.org/licenses/gpl-3.0.html
//*********************************************************************************************************
//*********************************************************************************************************
//WordReducer - Example Map Reduction process that counts unique words in a body of text.
//Created By - Jake Drew
//Version - 1.0, 01/08/2013
//*********************************************************************************************************
using System.Collections.Generic;
using System.Threading.Tasks;
using System.Collections.Concurrent;
using System.Text;
using System.Threading;
namespace MapReduceWords
{
public class WordReducer
{
public static ConcurrentBag wordBag = new ConcurrentBag();
public BlockingCollection wordChunks = new BlockingCollection(wordBag);
///
/// 1. Produce 250 character or less chunks of text.
/// 2. Break chunks on the first space encountered before 250 characters.
///
///
///
public IEnumerable produceWordBlocks(string fileText)
{
int blockSize = 250;
int startPos = 0;
int len = 0;
for (int i = 0; i < fileText.Length; i++)
{
i = i + blockSize > fileText.Length -1 ? fileText.Length -1 : i + blockSize;
while (i >= startPos && fileText[i] != ' ')
{
i--;
}
if (i == startPos)
{
i = i + blockSize > (fileText.Length - 1) ? fileText.Length - 1 : i + blockSize;
len = (i - startPos) + 1;
}
else
{
len = i - startPos;
}
yield return fileText.Substring(startPos, len).Trim();
startPos = i;
}
}
public void mapWords(string fileText)
{
Parallel.ForEach(produceWordBlocks(fileText), wordBlock =>
{ //split the block into words
string[] words = wordBlock.Split(' ');
StringBuilder wordBuffer = new StringBuilder();
//cleanup each word and map it
foreach (string word in words)
{ //Remove all spaces and punctuation
foreach (char c in word)
{
if (char.IsLetterOrDigit(c) || c == '\'' || c == '-')
wordBuffer.Append(c);
}
//Send word to the wordChunks Blocking Collection
if (wordBuffer.Length > 0)
{
wordChunks.Add(wordBuffer.ToString());
wordBuffer.Clear();
}
}
});
wordChunks.CompleteAdding();
}
public ConcurrentDictionary wordStore = new ConcurrentDictionary();
public void reduceWords()
{
Parallel.ForEach(wordChunks.GetConsumingEnumerable(), word =>
{ //if the word exists, use a thread safe delegate to increment the value by 1
//otherwise, add the word with a default value of 1
wordStore.AddOrUpdate(word, 1, (key, oldValue) => Interlocked.Increment(ref oldValue));
});
}
public void mapReduce(string fileText)
{ //Reset the Blocking Collection, if already used
if (wordChunks.IsAddingCompleted)
{
wordBag = new ConcurrentBag();
wordChunks = new BlockingCollection(wordBag);
}
//Create background process to map input data to words
System.Threading.ThreadPool.QueueUserWorkItem(delegate(object state)
{
mapWords(fileText);
});
//Reduce mapped words
reduceWords();
}
}
}