LAgent: an agent framework in F# – Part V – Timeout management

Download framework here.

All posts are here:

Timeout management

Timeouts are very important in message based systems. In essence, if you are not getting messages for a certain period of time, that usually means something. It might be that something crashed, that other agents think that you are not online, or any other number of things. Hence the need to set timeouts and react when they are triggered.

You do that by writing the following:

counter1 <--SetTimeoutHandler 1000 
(fun state -> printfn "I'm still waiting for a message in state %A, come on ..."
state; ContinueProcessing(state))

Which generates the following message every second:

I’m still waiting for a message in state 2, come on …

I’m still waiting for a message in state 2, come on .…

The first parameter to SetTimeoutHandler is how long to wait before triggering the handler. The second parameter is the handler that gets called whenever no message is received for that amount of time. Notice that the handler takes the current state of the agent and returns ContinueProcessing(state).  This tells the agent to continue processing messages and sets the current state to state.

The following code:

counter1 <-- 2

Then generates:

I’m still waiting for a message in state 4, come on …

I’m still waiting for a message in state 4, come on …

ContinueProcessing is just one of the three possible values of the (terribly named) AfterError:

type AfterError =
| ContinueProcessing of obj
| StopProcessing
| RestartProcessing

Let’s see what RestartProcessing does.

counter1 <-- SetTimeoutHandler 1000  (fun state -> printfn "Restart from state %A" state
; RestartProcessing)

Which, as expected, generates a nice stream of:

Restart from state 0

Restart from state 0

To bring things back to normal (aka no timeout) you can just pass –1 as in:

counter1 <-- SetTimeoutHandler -1  (fun state -> ContinueProcessing(state))

Also, you can stop the agent when a timeout occurs by returning the aptly named StopProcessing:

counter1 <-- SetTimeoutHandler 1000  (fun state -> printfn "Restart from state %A" state; 
StopProcessing)

Another interesting thing you might want to do is hot swapping of code. More on that in the next part …

Advertisements

LAgent: an agent framework in F# – Part IV – Custom error management

Download framework here.

All posts are here:

Custom error management

In the last part we saw what happens by default in the framework when an error occurs. But that might not be what you want. You might want to have your sophisticated error detection and recovery distributed algorithm.

To make such a thing possible each agent has a manager. The manager is an agent that gets called whenever an error occurs in the agent it is monitoring.

In code:

let manager = spawnWorker (fun (agent, name:string, ex:Exception, msg:obj,
state, initialState) -> printfn "%s restarting ..." name; agent <-- Restart) counter1 <-- SetManager(manager)

Whenever an error is generated the manager receives a tuple of:

(agent, name, exception, message, currentState, inititialState)

This manager prints out something and then restarts the agent. Let’s trigger an error by posting the wrong message:

counter1 <-- "afdaf"
counter1 <-- 2

The expectation is that the counter will restart from 0 whenever an error is triggered. This is what happens:

Bob restarting …

From 0 to 2

Which is what we expected. Obviously this is not a very sophisticated error recovery algorithm. You might want to do something more meaningful. Hopefully you have enough information to build whatever you need.

A particularly important class of unexpected event is timeouts. We’ll talk about them next.

LAgent: an agent framework in F# – Part III – Default error management

Download framework here.

All posts are here:

Default error management

What happens when an error occurs? Well, ideally you want to notify someone and continue processing messages. By default you want to print the error and as much information as you can about it.

Let’s first see what happens if you pass the wrong message type:

counter1 <-- "fst"

Generates:

> The exception below occurred on agent Undefined at state 3 with message "fst". The agent was started with state 0.

System.InvalidCastException: Specified cast is not valid.

   at Microsoft.FSharp.Core.LanguagePrimitives.IntrinsicFunctions.UnboxGeneric[T](Object source)

   at FSI_0003.AgentSystem.f@158.Invoke(Object a, Object b)

   at Microsoft.FSharp.Core.FastFunc`2.InvokeFast[V](FastFunc`2 func, T arg1, TResult arg2)

   at Microsoft.FSharp.Core.FastFunc`2.InvokeFast[V](FastFunc`2 func, T arg1, TResult arg2)

   at FSI_0003.AgentSystem.loop@20-3.Invoke(Unit unitVar)

   at Microsoft.FSharp.Control.AsyncBuilderImpl.callA@245.Invoke(AsyncParams`1 args)

You get information about the current state of the agent, the message that generated the error, the initial state of the agent and the exception that was generated. But, in a system with several agents, you’d like to know which one agent failed. Then you need to name your agent:

counter1 <-- SetName("Bob")
counter1 <-- "fadfad"

Now you get (important part in blue):

> The exception below occurred on agent Bob at state 3 with message "fadfad". The agent was started with state 0.

System.InvalidCastException: Specified cast is not valid.

   at Microsoft.FSharp.Core.LanguagePrimitives.IntrinsicFunctions.UnboxGeneric[T](Object source)

   at FSI_0003.AgentSystem.f@158.Invoke(Object a, Object b)

   at Microsoft.FSharp.Core.FastFunc`2.InvokeFast[V](FastFunc`2 func, T arg1, TResult arg2)

   at Microsoft.FSharp.Core.FastFunc`2.InvokeFast[V](FastFunc`2 func, T arg1, TResult arg2)

   at FSI_0003.AgentSystem.loop@20-3.Invoke(Unit unitVar)

   at Microsoft.FSharp.Control.AsyncBuilderImpl.callA@245.Invoke(AsyncParams`1 args)

The important thing is that the agent continues running. It lives to fight another day. Hence:

counter1 <-- 3

Produces:

From 3 to 6

Which shows that the agent is running and that it has kept its current state. Also errors can occur inside the message handler with a similar result:

(spawnAgent (fun msg state -> state / msg) 100) <-- 0

Produces:

> The exception below occurred on agent Undefined at state 100 with message 0. The agent was started with state 100.

System.DivideByZeroException: Attempted to divide by zero.

   at FSI_0013.it@48-3.Invoke(Int32 msg, Int32 state)

   at Microsoft.FSharp.Core.FastFunc`2.InvokeFast[V](FastFunc`2 func, T arg1, TResult arg2)

   at FSI_0003.AgentSystem.f@158.Invoke(Object a, Object b)

   at Microsoft.FSharp.Core.FastFunc`2.InvokeFast[V](FastFunc`2 func, T arg1, TResult arg2)

   at Microsoft.FSharp.Core.FastFunc`2.InvokeFast[V](FastFunc`2 func, T arg1, TResult arg2)

   at FSI_0003.AgentSystem.loop@20-3.Invoke(Unit unitVar)

   at Microsoft.FSharp.Control.AsyncBuilderImpl.callA@245.Invoke(AsyncParams`1 args)

But this might not be what you want. You might want to customize what happens when an error occurs. We’ll talk about that next.

LAgent : an agent framework in F# – Part II – Agents and control messages

Download framework here.

All posts are here:

Agents

Agents are entities that process messages and keep state between one message and the next. As such they need to be initialized with a lambda that takes a message and a state and returns a new state. In F# pseudo code: msg –> state –> newState. For example the following:

let counter = spawnAgent (fun msg state -> state + msg) 0

This is a counter that starts from 0 and gets incremented by the value of the received message. Let’s make it print something when it receives a message:

let counter1 = spawnAgent
(fun msg state -> printfn "From %i to %i" state (state + msg); state + msg) 0 counter1 <-- 3 counter1 <-- 4

Which produces:

From 0 to 3

From 3 to 7

There is no spawnParallelAgent, because I couldn’t figure out its usage patterns. Maybe I don’t have enough creativity. Obviously msg and state could be of whatever type (in real application they end up being tuples more often than not).

Control messages

You can do things to agents. I’m always adding to them but at this stage they are:

type Command =
| Restart
| Stop
| SetManager of AsyncAgent
| SetName of string

Plus some others. I’ll describe most of them later on, right now I want to talk about Restart and Stop. You use the former like this:

counter1 <-- Restart
counter1 <-- 3

Which produces:

From 0 to 3

This should be somehow surprising to you. You would have thought that you could just post integers to a counter. This is not the case. You can post whatever object. This is useful because it allows to have a common model for passing all sort of messages, it allows for the agent not to be parameterized by the type of the message (and of state) so that you can store them in data structures and allows advanced scenarios (i.e. hot swapping of code).

This is a debatable decision. I tried to get the best of strongly typing and dynamic typing, while keeping simplicity of usage. The implementation of this is kind of a mess though. We’ll get there.

BTW: you use Stop just by posting Stop, which stops the agent (forever).