LAgent : an agent framework in F# – Part I – Workers and ParallelWorkers

Download framework here.

All posts are here:

      Introduction

      I like to try out different programming paradigms. I started out as an object oriented programmer. In university, I used Prolog. I then learned functional programming. I also experimented with various shared memory parallel paradigms (i.e. async, tasks and such). I now want to learn more about message based parallel programming (Erlang style). I’m convinced that doing so makes me a better programmer. Plus, I enjoy it …

      My usual learning style is to build a framework that replicates a particular programming model and then write code using it. In essence, I build a very constrained environment. For example, when learning functional programming, I didn’t use any OO construct for a while even if my programming language supports them.

      In this case, I built myself a little agent framework based on F# MailboxProcessors. I could have used MailboxProcessors directly, but they are too flexible for my goal. Even to write a simple one of these guys, you need to use async and recursion in a specific pattern, which I always forget. Also, there are multiple ways to to do Post. I wanted things to be as simple as possible. I was willing to sacrifice flexibility for that.

      Notice that there are serious efforts in this space (as Axum). This is not one of them. It’s just a simple thing I enjoy working on between one meeting and the next.

      Workers and ParallelWorkers

      The two major primitives are spawning an agent and posting a message.

      let echo = spawnWorker (fun msg -> printfn "%s" msg)
      echo <-- "Hello guys!"

      There are two kinds of agents in my system. A worker is an agent that doesn’t keep any state between consecutive messages. It is a stateless guy. Notice that the lambda that you pass to create the agent is strongly typed (aka msg is of type string). Also notice that I overloaded the <— operator to mean Post.

      Given that a worker is stateless, you can create a whole bunch of them and, when a message is posted, route it to one of them transparently.

      let parallelEcho = spawnParallelWorker(fun s -> printfn "%s" s) 10
      parallelEcho <-- "Hello guys!”

      For example, in the above code, 10 workers are created and, when a message is posted, it gets routed to one of them (using a super duper innovative dispatching algorithm I’ll describe in the implementation part). This parallelWorker guy is not really needed, you could easily built it out of the other primitives, but it is kind of cute.

      To show the difference between a worker and a parallelWorker, consider this:

      let tprint s = printfn "%s running on thread %i" s Thread.CurrentThread.ManagedThreadId
      let echo1 = spawnWorker (fun s -> tprint s)
      let parallelEcho1 = spawnParallelWorker(fun s -> tprint s) 10
      
      let messages = ["a";"b";"c";"d";"e";"f";"g";"h";"i";"l";"m";"n";"o";"p";"q";"r";"s";"t"]
      messages |> Seq.iter (fun msg -> echo1 <-- msg)
      messages |> Seq.iter (fun msg -> parallelEcho1 <-- msg)

       

      The result of the echo1 iteration is:

      a running on thread 11

      b running on thread 11

      c running on thread 11

      d running on thread 11

      While the result of the parallelEcho1 iteration is:

      a running on thread 13

      c running on thread 14

      b running on thread 12

      o running on thread 14

      m running on thread 13

      Notice how the latter executes on multiple threads (but not in order). Next time I’ll talk about agents, control messages and error management.

      Advertisements

      An Async Html cache – part II – Testing the cache

      Other posts:



      Let’s try out our little cache. First I want to write a synchronous version of it as a baseline.

          Private Shared Sub TestSync(ByVal sites() As String, ByVal sitesToDownload As Integer, ByVal howLong As Integer)
      Dim syncCache As New Dictionary(Of String, String)
      Dim count = sites.Count()
      Dim url1 = “http://moneycentral.msn.com/investor/invsub/results/statemnt.aspx?Symbol=”

      For i = 0 To sitesToDownload – 1
      Dim html As String = “”
      Dim url = url1 & sites(i Mod count)
      If Not syncCache.TryGetValue(url, html) Then
      html = LoadWebPage(url)
      syncCache(url) = html
      End If
      DoWork(html, howLong)
      Next
      End Sub


      This is a loop that loads webpages in the cache if they are not already there. sites is a list of tickers used to compose the urls; sitesToDownload is the total number of sites to download, so that a single url can be loaded multiple times; howLong represents the work to be done on each loaded page.


      In this version the cache is simply a Dictionary and there is no parallelism. The two bold lines is where the cache is managed.


      DoWork is this.

          Public Shared Sub DoWork(ByVal html As String, ByVal howLong As Integer)
      Thread.Sleep(howLong)
      End Sub

      Let’s take a look at the asynchronous version.

          Private Shared Sub TestAsync(ByVal sites() As String, ByVal sitesToDownload As Integer, ByVal howLong As Integer)
      Dim htmlCache As New HtmlCache
      Dim count = sites.Count()
      Dim url = “http://moneycentral.msn.com/investor/invsub/results/statemnt.aspx?Symbol=”
      Using ce = New CountdownEvent(sitesToDownload)
      For i = 1 To sitesToDownload
      htmlCache.GetHtmlAsync(
      url & sites(i
      Mod count),
      Sub(s)
      DoWork(s, howLong)
      ce.Signal()
      End Sub)
      Next
      ce.Wait()
      End Using

      There are several points worth making on this:



      • The lambda used as second parameter for GetHtmlAsync is invoked on a different thread whenever the html has been retrieved (which could be immediately if the cache has downloaded the url before)

      • CountDownEvent allows a thread to wait for a certain number of signals to be sent. The waiting happens on the main thread in the ce.Wait() instruction. The triggering of the event happens in the lambda described in the point above (the ce.Signal() instruction)

      This is the driver for the overall testing.

          Private Shared Sub TestPerf(ByVal s As String, ByVal a As Action, ByVal iterations As Integer)
      Dim clock As New Stopwatch

      clock.Start()
      For i = 1 To iterations
      a()
      Next
      clock.Stop()
      Dim ts = clock.Elapsed
      Dim elapsedTime = String.Format(s & “: {0:00}:{1:00}:{2:00}.{3:00}”, ts.Hours, ts.Minutes, ts.Seconds, ts.Milliseconds / 10)
      Console.WriteLine(elapsedTime, “RunTime”)
      End Sub


      There is not much to say about it. Start the clock, perform a bunch of iterations of the passed lambda, stop the clock, print out performance.


      And finally the main method. Note that all the adjustable parameters are factored out before the calls to TestPerf.

          Public Shared Sub Main()
      Dim tickers = New String() {“mmm”, “aos”, “shlm”, “cas”, “abt”, “anf”, “abm”, “akr”, “acet”, “afl”, “agl”, “adc”, “apd”,
      “ayr”, “alsk”, “ain”, “axb”, “are”, “ale”, “ab”, “all”}

      Dim sitesToDownload = 50
      Dim workToDoOnEachUrlInMilliSec = 20
      Dim perfIterations = 5

      TestPerf(“Async”, Sub() TestAsync(tickers, sitesToDownload, workToDoOnEachUrlInMilliSec), perfIterations)
      TestPerf(
      “Sync”, Sub() TestSync(tickers, sitesToDownload, workToDoOnEachUrlInMilliSec), perfIterations)
      End Sub


      Feel free to change (tickers, sitesToDownload, workToDoOnEachUrlInMilliSec, perfIterations). Depending on the ratios between these parameters and the number of cores on your machine, you’re going to see different results. Which highlights the fact that parallelizing your algorithms can yield performance gains or not depending on both software and hardware considerations. I get ~3X improvement on my box. I attached the full source file for your amusement.

      AsyncCache.vb