Mailbox processors can be used to manage mutable state in a transparent and thread-safe way. Let's build a simple counter.
// Increment or decrement by one.
type CounterMessage =
| Increment
| Decrement
let createProcessor initialState =
MailboxProcessor<CounterMessage>.Start(fun inbox ->
// You can represent the processor's internal mutable state
// as an immutable parameter to the inner loop function
let rec innerLoop state = async {
printfn "Waiting for message, the current state is: %i" state
let! message = inbox.Receive()
// In each call you use the current state to produce a new
// value, which will be passed to the next call, so that
// next message sees only the new value as its local state
match message with
| Increment ->
let state' = state + 1
printfn "Counter incremented, the new state is: %i" state'
innerLoop state'
| Decrement ->
let state' = state - 1
printfn "Counter decremented, the new state is: %i" state'
innerLoop state'
}
// We pass the initialState to the first call to innerLoop
innerLoop initialState)
// Let's pick an initial value and create the processor
let processor = createProcessor 10
Now let's generate some operations
processor.Post(Increment)
processor.Post(Increment)
processor.Post(Decrement)
processor.Post(Increment)
And you will see the following log
Waiting for message, the current state is: 10
Counter incremented, the new state is: 11
Waiting for message, the current state is: 11
Counter incremented, the new state is: 12
Waiting for message, the current state is: 12
Counter decremented, the new state is: 11
Waiting for message, the current state is: 11
Counter incremented, the new state is: 12
Waiting for message, the current state is: 12
Since mailbox processor processes the messages one by one and there is no interleaving, you can also produce the messages from multiple threads and you will not see the the typical problems of lost or duplicated operations. There is no way for a message to use the old state of other messages, unless you specifically implement the processor so.
let processor = createProcessor 0
[ async { processor.Post(Increment) }
async { processor.Post(Increment) }
async { processor.Post(Decrement) }
async { processor.Post(Decrement) } ]
|> Async.Parallel
|> Async.RunSynchronously
All messages are posted from different threads. The order in which messages are posted to the mailbox is not deterministic, so the order of processing them is not deterministic, but since the overall number of increments and decrements is balanced, you will see the final state being 0, no matter in what order and from which threads the messages were sent.
In the previous example we've only simulated mutable state by passing the recursive loop parameter, but mailbox processor has all these properties even for a truly mutable state. This is important when you maintain large state and immutability is impractical for performance reasons.
We can rewrite our counter to the following implementation
let createProcessor initialState =
MailboxProcessor<CounterMessage>.Start(fun inbox ->
// In this case we represent the state as a mutable binding
// local to this function. innerLoop will close over it and
// change its value in each iteration instead of passing it around
let mutable state = initialState
let rec innerLoop () = async {
printfn "Waiting for message, the current state is: %i" state
let! message = inbox.Receive()
match message with
| Increment ->
let state <- state + 1
printfn "Counter incremented, the new state is: %i" state'
innerLoop ()
| Decrement ->
let state <- state - 1
printfn "Counter decremented, the new state is: %i" state'
innerLoop ()
}
innerLoop ())
Even though this would definitely not be thread safe if the counter state was modified directly from multiple threads, you can see by using the parallel message Posts from previous section that mailbox processor processes the messages one after another with no interleaving, so each message uses the most current value.