Stopping Garbage Collection in .NET Core 3.0 (part II)

Let’s see how it’s implemented. For why it is implemented, see Part I.

using System;
using System.Diagnostics.Tracing;
using System.Runtime;

The FxCop code analyzers get upset if I don’t declare this, which also impede me from using unsigned numeral types in interfaces.

[assembly: CLSCompliant(true)]

namespace LNativeMemory
{

The first piece of the puzzle is to implement an event listener. It is a not-obvious (for me) class. I don’t fully understand the lifetime semantics, but the code below seems to do the right thing.

The interesting piece is _started and the method Start(). The constructor for EventListener allocates plenty of stuff. I don’t want to do those allocations after calling TryStartNoGCRegion because they would use part of the GC Heap that I want for my program.

Instead, I create it before such call, but then I make it ‘switch on’ just after the Start() method is called.

    internal sealed class GcEventListener : EventListener
    {
        Action _action;
        EventSource _eventSource;
        bool _active = false;

        internal void Start() { _active = true; }
        internal void Stop() { _active = false; }


As described in part one, you pass a delegate at creation time, which is called when garbage collection is restarted.

        internal GcEventListener(Action action) => _action = action ?? throw new ArgumentNullException(nameof(action));


We register to all the events coming from .NET. We want to call the delegate at the exact point when garbage collection is turned on again.
We don’t have a clean way to do that (aka there is no runtime event we can hook up to, see here, so listening to every single GC event gives us the most chances of doing it right. Also it ties us the least to any pattern of events, which
might change in the future.

        // from https://docs.microsoft.com/en-us/dotnet/framework/performance/garbage-collection-etw-events
        private const int GC_KEYWORD = 0x0000001;
        private const int TYPE_KEYWORD = 0x0080000;
        private const int GCHEAPANDTYPENAMES_KEYWORD = 0x1000000;

        protected override void OnEventSourceCreated(EventSource eventSource)
        {
            if (eventSource.Name.Equals("Microsoft-Windows-DotNETRuntime", StringComparison.Ordinal))
            {
                _eventSource = eventSource;
                EnableEvents(eventSource, EventLevel.Verbose, (EventKeywords)(GC_KEYWORD | GCHEAPANDTYPENAMES_KEYWORD | TYPE_KEYWORD));
            }
        }


For each event, I check if the garbage collector has exited the NoGC region. If it has, then let’s invoke the delegate.

        protected override void OnEventWritten(EventWrittenEventArgs eventData)
        {
            var eventName = eventData.EventName;
            if(_active && GCSettings.LatencyMode != GCLatencyMode.NoGCRegion)
            {
                _action?.Invoke();
            }
        }
    }


Now that we have our event listener, we need to hook it up. The code below implements what I described earlier.
1. Do your allocations for the event listener
2. Start the NoGc region
3. Start monitoring the runtime for the start of the NoGC region

    public static class GC2
    {
        static private GcEventListener _evListener;

        public static bool TryStartNoGCRegion(long totalSize, Action actionWhenAllocatedMore)
        {

            _evListener = new GcEventListener(actionWhenAllocatedMore);
            var succeeded = GC.TryStartNoGCRegion(totalSize, disallowFullBlockingGC: false);
            _evListener.Start();

            return succeeded;
        }


As puzzling as this might be, I provisionally believe it to be correct. Apparently, even if the GC is not in a NoGC region, you still need to call
EndNoGCRegion if you have called TryStartNoGCRegion earlier, otherwise your next call to TryStartNoGCRegion will fail. EndNoGCRegion will throw an exception, but that’s OK. Your next call to TryStartNoGCRegion will now succeed.

Now read the above repeatedly until you got. Or just trust that it works somehow.

        public static void EndNoGCRegion()
        {
            _evListener.Stop();

            try
            {
                GC.EndNoGCRegion();
            } catch (Exception)
            {

            }
        }
    }


This is used as the default behavior for the delegate in the wrapper class below. I was made aware by the code analyzer that I shouldn’t be throwing an OOF exception here. At first, I dismissed it, but then it hit me. It is right.

We are not running out of memory here. We simply have allocated more memory than what we declared we would. There is likely plenty of memory left on the machine. Thinking more about it, I grew ashamed of my initial reaction. Think about a support engineer getting an OOM exception at that point and trying to figure out why. So, always listen to Lint …

    public class OutOfGCHeapMemoryException : OutOfMemoryException {
        public OutOfGCHeapMemoryException(string message) : base(message) { }
        public OutOfGCHeapMemoryException(string message, Exception innerException) : base(message, innerException) { }
        public OutOfGCHeapMemoryException() : base() { }

    }


This is an utility class that implements the IDisposable pattern for this scenario. The size of the default ephemeral segment comes fromhere.

    public sealed class NoGCRegion: IDisposable
    {
        static readonly Action defaultErrorF = () => throw new OutOfGCHeapMemoryException();
        const int safeEphemeralSegment = 16 * 1024 * 1024;

        public NoGCRegion(int totalSize, Action actionWhenAllocatedMore)
        {
            var succeeded = GC2.TryStartNoGCRegion(totalSize, actionWhenAllocatedMore);
            if (!succeeded)
                throw new InvalidOperationException("Cannot enter NoGCRegion");
        }

        public NoGCRegion(int totalSize) : this(totalSize, defaultErrorF) { }
        public NoGCRegion() : this(safeEphemeralSegment, defaultErrorF) { }

        public void Dispose() => GC2.EndNoGCRegion();
    }
}

Luca Bolognese leaves Microsoft

FYI I imported all the posts from the old MSDN blog to this one. Hence this post doesn’t make sense anymore. I left it here for memory sake. BTW: I am also back in msft now (30/11/2018).

This is my last post on this blog. My new blog is here: https://lucabolognese.wordpress.com/

I accepted a role as Director for Credit Suisse in London. I’m excited by the opportunity to work in the financial industry, a long-standing desire of mine. I’m also excited to write more F# code and to be closer to Italy, where my extended family is.

The past ten years in Microsoft have been a wild ride. I’m proud to have been part of ObjectSpaces, Generics, LINQ and F# (and much more …). I’ve been lucky to be able to post on this blog and present at conferences about such innovative technologies. It has all been a lot of fun. I’m sure the next ten years will be as good.

I’ll see you guys on my new blog.

LChart: displaying charts in F# – Part I

I want to use F# as a exploratory data analysis language (like R). But I don’t know how to get the same nice graphic capabilities. So I decided to create them. Here is a library to draw charts in F#. It steals ideas from this book and this R package. It is nothing more than a wrapper on top of the Microsoft Chart Controls to give it a more ‘exploratory’ one line calling syntax. It is also rough work in progress: I don’t wrap all the chart types and there are bugs in the ones I wrap. Also the architecture is all wrong (more on this in another post). But it’s a start and it kind of works. Attached the full code.


I will continue this series in my new blog at wordpress: https://lucabolognese.wordpress.com/. The reason I need a new blog will be explained in an upcoming post.


Part II is now here.


Ok, let’s start. How do I draw a chart?

let x = [1.;2.5;3.1;4.;4.8;6.0;7.5;8.;9.1;15.]
let y = [1.6;2.1;1.4;4.;2.3;1.9;2.4;1.4;5.;2.9]

lc.scatter(x, y) |> display


X and Y are just some make up data. lc is the name of a class (????) and scatter is a static method on it. scatter doesn’t display the chart, it just produces a an object that represents the chart. Display displays the chart. The reason for using the bizarre lc static class is that I want it to be short so that it is easy to type in the fsi.exe. At the same time it needs to support optional parameters (which are not supported on top level functions in F#).


You get a window with this chart on it. You can press CTRL+C to copy it (as I did to post it here).


image


You might want to customize the chart a bit by passing some of these famous optional parameters:

lc.scatter(x = x, y = y, markerSize = 10, markerStyle = MarkerStyle.Diamond,
xname = “Players”, yname = “Ratings”, title = “Players’ Ratings”) |> display

image


Or you might want to print different types of charts:

lc.line(y = y, markerSize = 10, markerStyle = MarkerStyle.Diamond, xname = “Players”, yname = “Ratings”, title = “Players’ Ratings”, isValueShownAsLabel = true,
color = Color.Red) |> display
image
lc.spline(x = x, y = y, markerSize = 10, markerStyle = MarkerStyle.Diamond, xname = “Players”, yname = “Ratings”,
title = “Players’ Ratings”, isValueShownAsLabel = true, color = Color.Red) |> display
image
lc.stepline(x = x, y = y, markerSize = 10, markerStyle = MarkerStyle.Diamond, xname = “Players”, yname = “Ratings”,
title = “Players’ Ratings”, isValueShownAsLabel = true, color = Color.Red) |> display
image
lc.bar(y = y, xname = “Players”, yname = “Ratings”, title = “Players’ Ratings”, isValueShownAsLabel = true,
drawingStyle = “Emboss”) |> display
image

 

lc.column(y = y, xname = “Players”, yname = “Ratings”, title = “Players’ Ratings”,
isValueShownAsLabel = true, drawingStyle = “Cylinder”) |> display
image
lc.boxplot(y = y, xname = “Players”, yname = “Ratings”, title = “Players’ Ratings”, color = Color.Blue, whiskerPercentile = 5, percentile = 30,
showAverage = false, showMedian = false, showUnusualValues = true) |> display

image


Ok, the last one is weird. You probably want more than one boxplot in a chart. I’ll show you how to do that in the next post.


The next post will be on how to have more than one series on the same chart and more than one chart in the same windows. Something like the below:


image

ChartPlotter.fsx

A simpler F# MailboxProcessor

I always forget the pattern to use to create a functioning MailboxProcessor in F#. I mean, which piece has to be async and how to structure the recursive loop. When I find myself in that kind of a situation situation, my instincts scream at me: “Wrap it and make it work how your mind expects it to work”. So here is a simplification of the paradigm.

Let’s see how some standard MailboxProcessor code looks like:

let counter0 =
    MailboxProcessor.Start(fun inbox ->
        let rec loop n =
            async { 
                    let! msg = inbox.Receive()
                    return! loop(n+msg) }
        loop 0)

This keeps a running sum of the messages it receives. The only part that is really unique to this guy is “n + msg”. All the rest is infrastructure.

You’d probably prefer to write code like the following:

let counter1 = MailboxProcessor.SpawnAgent( (fun msg n -> msg + n), 0)

Yep, just one line of code. But, is it possible? Let’s look at one way of doing it:

type AfterError<'state> =
| ContinueProcessing of 'state
| StopProcessing
| RestartProcessing
    
type MailboxProcessor<'a> with

    static member public SpawnAgent<'b>(messageHandler :'a->'b->'b,
initialState : 'b, ?timeout:'b -> int, ?timeoutHandler:'b -> AfterError<'b>,
?errorHandler:
Exception -> 'a option -> 'b -> AfterError<'b>)
: MailboxProcessor<'a> = let timeout = defaultArg timeout (fun _ -> -1) let timeoutHandler = defaultArg timeoutHandler (fun state –>
ContinueProcessing(state)) let errorHandler = defaultArg errorHandler (fun _ _ state –>
ContinueProcessing(state)) MailboxProcessor.Start(fun inbox -> let rec loop(state) = async { let! msg = inbox.TryReceive(timeout(state)) try match msg with | None -> match timeoutHandler state with | ContinueProcessing(newState) ->
return!
loop(newState) | StopProcessing -> return () | RestartProcessing -> return! loop(initialState) | Some(m) -> return! loop(messageHandler m state) with | ex -> match errorHandler ex msg state with | ContinueProcessing(newState) -> return! loop(newState) | StopProcessing -> return () | RestartProcessing -> return! loop(initialState) } loop(initialState))

The funny formatting is because I have to fit it in the small horizontal space of this blog. In any case, this is just a simple (?) wrapper for the MailboxProcessor pattern. The function takes two necessary parameters and two optional ones:

  • messageHandler: a function to execute when a message comes in, it takes the message and the current state as parameters and returns the new state.
  • initialState: the initial state for the MailboxProcessor
  • timeoutHandler: a function that is executed whenever a timeout occurs. It takes as a parameter the current state and returns one of ContinueProcessing(newState), StopProcessing or RestartProcessing
  • errorHandler: a function that gets call if an exception is generated inside the messageHandler function. It takes the exception, the message, the current state and returns ContinueProcessing(newState), StopProcessing or RestartProcessing

An example of how to use errorHandler to implement the CountingAgent in the Expert F# book follows:

type msg = Increment of int | Fetch of AsyncReplyChannel<int> | Stop

exception StopException

type CountingAgent() =
    let counter = MailboxProcessor.SpawnAgent((fun msg n ->
                    match msg with
                    | Increment m ->  n + m
                    | Stop -> raise(StopException)
                    | Fetch replyChannel ->
                        do replyChannel.Reply(n)
                        n
                  ), 0, errorHandler = (fun _ _ _ -> StopProcessing))
    member a.Increment(n) = counter.Post(Increment(n))
    member a.Stop() = counter.Post(Stop)
    member a.Fetch() = counter.PostAndReply(fun replyChannel -> Fetch(replyChannel))    
        
let counter2 = CountingAgent()
counter2.Increment(1)
counter2.Fetch()
counter2.Increment(2)
counter2.Fetch()
counter2.Stop()                             

Sometimes your agent doesn’t need a state, it is purely stateless. Something as simple as the following:

let echo = MailboxProcessor<_>.SpawnWorker(fun msg -> printfn "%s" msg)

You can easily make that happen by using this toned down version of an agent, called worker:

static member public SpawnWorker(messageHandler,  ?timeout, ?timeoutHandler,?errorHandler) =
    let timeout = defaultArg timeout (fun () -> -1)
    let timeoutHandler = defaultArg timeoutHandler (fun _ -> ContinueProcessing(()))
    let errorHandler = defaultArg errorHandler (fun _ _ -> ContinueProcessing(()))
    MailboxProcessor.SpawnAgent((fun msg _ -> messageHandler msg; ()),
(), timeout, timeoutHandler,
(fun ex msg _ -> errorHandler ex msg))

Given that they are parallel, you might want to run a whole bunch of them at the same time. You might want something that looks like a worker, but that, under the cover, execute each messageHandler in parallel. Something like:

type msg1 = Message1 | Message2 of int | Message3 of string
            
let a = MailboxProcessor.SpawnParallelWorker(function
                | Message1 -> printfn "Message1";
                | Message2 n -> printfn "Message2 %i" n;
                | Message3 _ -> failwith "I failed"
                , 10
                , errorHandler = (fun ex _ -> printfn "%A" ex; ContinueProcessing()))


a.Post(Message1)
a.Post(Message2(100))
a.Post(Message3("abc"))
a.Post(Message2(100))

In this example, the different messages, are likely to cause things to print out of order. Notice the number 10 above which is how many workers you want to process your messages. This is implemented by round-robin messages to the various workers:

static member public SpawnParallelWorker(messageHandler, howMany, ?timeout,
?timeoutHandler,?errorHandler) = let timeout = defaultArg timeout (fun () -> -1) let timeoutHandler = defaultArg timeoutHandler (fun _ -> ContinueProcessing(())) let errorHandler = defaultArg errorHandler (fun _ _ -> ContinueProcessing(())) MailboxProcessor<'a>.SpawnAgent((fun msg (workers:MailboxProcessor<'a> array, index) -> workers.[index].Post msg (workers, (index + 1) % howMany)) , (Array.init howMany
(fun _ -> MailboxProcessor<'a>.SpawnWorker(
messageHandler, timeout, timeoutHandler,
errorHandler)), 0))

One drawback with the current code is that it doesn’t supports cancellations. It should be possible to wrap that too, but I haven’t done it (yet). If you don’t want to cut and paste the code, it is inside the AgentSystem.fs file here.

LAgent: an agent framework in F# – Part X – ActiveObject

Download framework here.

All posts are here:

If you stare long enough at agents, you start to realize that they are just ‘glorified locks’. They are a convenient programming model to protect a resource from concurrent access. The programming model is convenient because both the client and the server can write their code without worrying about concurrency problems, and yet the program runs in parallel. Protecting a resource sounds a lot like state encapsulation and the concept of state encapsulation is what object orientation is all about.

So you start thinking if there is a way to enhance vanilla objects to make them agents. You want to reuse all the concepts that you are familiar with (i.e. inheritance, visibility rules, etc…) and you want your clients to call agents as if they were calling normal objects. Obviously, under the cover, the method calls won’t execute immediately, but they would be queued. Let’s look at an example.

This is our simple counter agent:

type CounterMessage =
| Add of int
| Print

let counterF = fun msg count ->
    match msg with
    | Add(i)    -> count + i
    | Print     -> printfn "The value is %i" count; count
    
let c1 = spawnAgent counterF 0
c1 <-- Add(3)
c1 <—Print

As nice as this looks, there are unfamiliar things in this model:

  1. The communication is through messages. This requires packing and unpacking which, albeit easy in F#, is unfamiliar and feels like machinery that we’d like to get rid off.
  2. The management of state is bizarre, it gets passed into the lambda and returned from it instead of being represented as fields and properties on the agent

My best attempt at creating an object-like syntax follows:

type Counter() =
    let w = new WorkQueue()
    let mutable count = 0
    member c.Add x = w.Queue (fun () -> 
        count <- count + x
        )
    member c.Print () = w.Queue (fun () -> 
        printfn "The value is %i" count
        )
let c = new Counter()
c.Add 3
c.Print

With this syntax, you write your agents like you write your vanilla classes except:

  1. You need a private field of type WorkQueue
  2. You need to write your methods as lambdas passed to the WorkQueue.Queue function
  3. Your methods cannot return values

The most worrisome of these constraints is 2. because you can easily forget about it. If you do forget, then everything compiles just fine, but it doesn’t do what you expect. That’s pure badness. I haven’t found a way to enforce it. This is a place where the language could help me. Other than that, the whole model works rather nicely.

Regarding the third point, you can concoct a programming model that allows you to return values from your methods. Here it is:

member c.CountTask = w.QueueWithTask(fun () ->
    count
    )
member c.CountAsync = w.QueueWithAsync(fun () ->
    count
    )
printfn "The count using Task is %i" (c.CountTask.Result)

The first method returns a Task; the second method returns an AsyncResultCell. Both are ways to represent a promise. The latter allows a natural integration with the async block in F# as in the following code:

Async.RunSynchronously (
            async {
                let! count = c.CountAsync
                printfn "The countusing Async is %i" count
            })

As for myself, I don’t like methods returning values. Every time I use them, I end up going back and thinking about my problem in a traditional way, aka as method calls that return results, instead of thinking about it in a more actor oriented fashion. I end up waiting for these promises to be materialized and, by doing so, I limit the amount of parallelism that I unleash. As a matter of fact, the whole business of hiding the message passing nature of the programming model is dubious. It makes for a nicer syntax, but you need to make an extra effort in your mind to translate it to what it really is: just message passing with a nice syntactical veneer. I haven’t decided yet which model I like the most.

You should have a sense of what WorkQueue is. In essence, it is a Mailbox of lambdas (look at the red bold code below).

type WorkQueue() =
    let workQueue = spawnWorker (fun f -> f())
    member w.Queue (f) = workQueue <-- f
    member w.QueueWithTask f : Task<'T> =
        let source = new TaskCompletionSource<_>()
        workQueue <-- (fun () -> f() |> source.SetResult)
        source.Task
    member w.QueueWithAsync (f:unit -> 'T) : Async<'T> =
        let result = new AsyncResultCell<'T>()
        workQueue <-- (fun () -> f() |> result.RegisterResult )
        result.AsyncWaitResult
    member w.Restart () = workQueue <-! Restart
    member w.Stop () = workQueue <-! Stop
    member w.SetErrorHandler(h) =
        let managerF = fun (_, name:string, ex:Exception, _, _, _) -> h name ex                             
        let manager = spawnWorker managerF
        workQueue <-! SetManager manager
    member w.SetName(name) = workQueue <-! SetName(name)
    member w.SetQueueHandler(g) = workQueue <-! SetWorkerHandler(g)
    member w.SetTimeoutHandler(timeout, f) = workQueue <-! SetTimeoutHandler(timeout, f)

I implemented all the services that are in the message passing model. The two are equivalent as expressing power goes. In case you wonder how a real piece of code looks like using this model, here is an ActiveObject version of the map reduce algorithm. One of these days, I will gather the strength to go trough this code and explain what it does, but not today :-)

#load "AgentSystem.fs"
open AgentSystem.LAgent
open System
open System.Collections
open System.Collections.Generic
open System.Threading

type IOutput<'out_key, 'out_value> =
    abstract Reduced: 'out_key -> seq<'out_value> -> unit
    abstract MapReduceDone: unit -> unit
    
type Mapper<'in_key, 'in_value, 'my_out_key, 'out_value when 'my_out_key : comparison>
(map:'in_key -> 'in_value -> seq<'my_out_key * 'out_value>, i, partitionF) = let w = new WorkQueue() let mutable reducerTracker: BitArray = null let mutable controller = Unchecked.defaultof<Controller<'in_key, 'in_value, 'my_out_key, 'out_value>> let mutable reducers = Unchecked.defaultof<Reducer<'in_key, 'in_value, 'my_out_key, 'out_value> array> member m.Init c reds = w.Queue (fun () -> controller <- c reducers <- reds reducerTracker <- new BitArray(reducers.Length, false)) member m.Process inKey inValue = w.Queue (fun () -> let outKeyValues = map inKey inValue outKeyValues |> Seq.iter (fun (outKey, outValue) -> let reducerUsed = partitionF outKey (reducers.Length) reducerTracker.Set(reducerUsed, true) reducers.[reducerUsed].Add(outKey, outValue))) member m.Done () = w.Queue (fun () -> controller.MapDone i reducerTracker) member m.Stop () = w.Stop () and Reducer<'in_key, 'in_value, 'out_key, 'out_value when 'out_key :
comparison>(reduce:'out_key -> seq<'out_value> -> seq<'out_value>, i, output:IOutput<'out_key, 'out_value>) = let w = new WorkQueue() let mutable workItems = new List<'out_key * 'out_value>() let mutable controller = Unchecked.defaultof<Controller<'in_key, 'in_value, 'out_key, 'out_value>> member r.Init c = w.Queue (fun () -> controller <- c) member r.StartReduction () = w.Queue (fun () -> workItems |> Seq.groupBy fst |> Seq.sortBy fst |> Seq.map (fun (key, values) -> (key, reduce key (values |> Seq.map snd))) |> Seq.iter (fun (key, value) -> output.Reduced key value) controller.ReductionDone i) member r.Add (outKey:'out_key, outValue:'out_value) : unit = w.Queue (fun () -> workItems.Add((outKey, outValue))) member m.Stop () = w.Stop () and Controller<'in_key, 'in_value, 'out_key, 'out_value when 'out_key : comparison>(output:IOutput<'out_key, 'out_value>) = let w = new WorkQueue() let mutable mapperTracker: BitArray = null let mutable reducerUsedByMappers: BitArray = null let mutable reducerDone: BitArray = null let mutable mappers = Unchecked.defaultof<Mapper<'in_key, 'in_value, 'out_key, 'out_value> array> let mutable reducers = Unchecked.defaultof<Reducer<'in_key, 'in_value, 'out_key, 'out_value> array> let BAtoSeq (b:BitArray) = [for x in b do yield x] member c.Init maps reds = w.Queue (fun () -> mappers <- maps reducers <- reds mapperTracker <- new BitArray(mappers.Length, false) reducerUsedByMappers <- new BitArray(reducers.Length, false) reducerDone <- new BitArray(reducers.Length, false)) member c.MapDone (i : int) (reducerTracker : BitArray) : unit = w.Queue (fun () -> mapperTracker.Set(i, true) let reducerUsedByMappers = reducerUsedByMappers.Or(reducerTracker) if not( BAtoSeq mapperTracker |> Seq.exists(fun bit -> bit = false)) then BAtoSeq reducerUsedByMappers |> Seq.iteri (fun i r -> if r = true then reducers.[i].StartReduction ()) mappers |> Seq.iter (fun m -> m.Stop ()) ) member c.ReductionDone (i: int) : unit = w.Queue (fun () -> reducerDone.Set(i, true) if BAtoSeq reducerDone |> Seq.forall2 (fun x y -> x = y) (BAtoSeq reducerUsedByMappers) then output.MapReduceDone () reducers |> Seq.iter (fun r -> r.Stop ()) c.Stop() ) member m.Stop () = w.Stop () let mapReduce (inputs:seq<'in_key * 'in_value>) (map:'in_key -> 'in_value -> seq<'out_key * 'out_value>) (reduce:'out_key -> seq<'out_value> -> seq<'out_value>) (output:IOutput<'out_key, 'out_value>) M R partitionF = let len = inputs |> Seq.length let M = if len < M then len else M let mappers = Array.init M (fun i -> new Mapper<'in_key, 'in_value, 'out_key, 'out_value>(map, i, partitionF)) let reducers = Array.init R (fun i -> new Reducer<'in_key, 'in_value, 'out_key, 'out_value>(reduce, i, output)) let controller = new Controller<'in_key, 'in_value, 'out_key, 'out_value>(output) mappers |> Array.iter (fun m -> m.Init controller reducers) reducers |> Array.iter (fun r -> r. Init controller ) controller.Init mappers reducers inputs |> Seq.iteri (fun i (inKey, inValue) -> mappers.[i % M].Process inKey inValue) mappers |> Seq.iter (fun m -> m.Done ()) let partitionF = fun key M -> abs(key.GetHashCode()) % M let map = fun (fileName:string) (fileContent:string) -> let l = new List<string * int>() let wordDelims = [|' ';',';';';'.';':';'?';'!';'(';')';'n';'t';'f';'r';'b'|] fileContent.Split(wordDelims) |> Seq.iter (fun word -> l.Add((word, 1))) l :> seq<string * int> let reduce = fun key (values:seq<int>) -> [values |> Seq.sum] |> seq<int> let printer () = { new IOutput<string, int> with member o.Reduced key values = printfn "%A %A" key values member o.MapReduceDone () = printfn "All done!!"} let testInput =
["File1", "I was going to the airport when I saw someone crossing"; "File2", "I was going home when I saw you coming toward me"] mapReduce testInput map reduce (printer ()) 2 2 partitionF open System.IO open System.Text let gatherer(step) = let w = new WorkQueue() let data = new List<string * int>() let counter = ref 0 { new IOutput<string, int> with member o.Reduced key values = w.Queue (fun () -> if !counter % step = 0 then printfn "Processed %i words. Now processing %s" !counter key data.Add((key, values |> Seq.hd)) counter := !counter + 1) member o.MapReduceDone () = w.Queue (fun () -> data |> Seq.distinctBy (fun (key, _) -> key.ToLower()) |> Seq.filter (fun (key, _) -> not(key = "" || key = """ || (fst (Double.TryParse(key))))) |> Seq.to_array |> Array.sortBy snd |> Array.rev |> Seq.take 20 |> Seq.iter (fun (key, value) -> printfn "%Att%A" key value) printfn "All done!!") } let splitBook howManyBlocks fileName = let buffers = Array.init howManyBlocks (fun _ -> new StringBuilder()) fileName |> File.ReadAllLines |> Array.iteri (fun i line -> buffers.[i % (howManyBlocks)].Append(line) |> ignore) buffers let blocks1 = __SOURCE_DIRECTORY__ + "kjv10.txt" |> splitBook 100 let blocks2 = __SOURCE_DIRECTORY__ + "warandpeace.txt" |> splitBook 100 let input = blocks1 |> Array.append blocks2 |> Array.mapi (fun i b -> i.ToString(), b.ToString()) //mapReduce input map reduce (gatherer(1000)) 20 20 partitionF type BookSplitter () = let blocks = new List<string * string>() member b.Split howManyBlocks fileName = let b = fileName |> splitBook howManyBlocks |> Array.mapi (fun i b -> i.ToString(), b.ToString()) blocks.AddRange(b) member b.Blocks () = blocks.ToArray() :> seq<string * string> type WordCounter () = let w = new WorkQueue() let words = new Dictionary<string,int>() let worker(wordCounter:WordCounter, ev:EventWaitHandle) = let w1 = new WorkQueue() { new IOutput<string, int> with member o.Reduced key values = w1.Queue (fun() -> wordCounter.AddWord key (values |> Seq.hd)) member o.MapReduceDone () = w1.Queue(fun () -> ev.Set() |> ignore) } member c.AddWord word count = let exist, value = words.TryGetValue(word) if exist then words.[word] <- value + count else words.Add(word, count) member c.Add fileName = w.Queue (fun () -> let s = new BookSplitter() fileName |> s.Split 100 let ev = new EventWaitHandle(false, EventResetMode.AutoReset) let blocks = s.Blocks () mapReduce blocks map reduce (worker(c, ev)) 20 20 partitionF ev.WaitOne() |> ignore ) member c.Words = w.QueueWithAsync (fun () -> words |> Seq.to_array |> Array.map (fun kv -> kv.Key, kv.Value) ) let wc = new WordCounter() wc.Add (__SOURCE_DIRECTORY__ + "kjv10.txt") wc.Add (__SOURCE_DIRECTORY__ + "warandpeace.txt") let wordsToPrint = async { let! words = wc.Words return words |> Seq.distinctBy (fun (key, _) -> key.ToLower()) |> Seq.filter (fun (key, _) -> not(key = "" || key = """ || (fst (Double.TryParse(key))))) |> Seq.to_array |> Array.sortBy snd |> Array.rev |> Seq.take 20 |> Seq.iter (fun (key, value) -> printfn "%Att%A" key value)} Async.RunSynchronously wordsToPrint Thread.Sleep(15000) printfn "Closed session"