Let’s continue our investigation about how to create real world distributed apps. But before we continue, we should find a well-established dictionary for our problem domain.
Distributed computing have a long history and there are a lot of patterns which works better than others. And just to read and understand other posts and articles about distributed computing we have to know pattern names and terminology. Distributed patterns have a lot in common with patterns of concurrency, parallelism and asynchronous methods. Distribution just adds additional level of complexity related with placement, connection speed, time synchronization and so on. Distributed patterns tries to solve problems caused by physical world in your code. There are some links where you could find patterns description.
I think you can easily find more, but these resources were very helpful for me. Now we can name some patterns naively implemented in previous post: Pipes and Filters, Routing Slip…
Now let’s describe a problem which is usual when you are developing a worker role. For example we have a task to process a lot of images, apply some filter to them. We could develop a worker role which will grab image ids from an input queue, load, and process and save them in parallel. We can also setup auto scaling to add or remove additional role instances based on the input queue length. Probably our worker is cpu bound, but also it could be io bound if image storage will be not so fast. Let’s represent a problem as a simple function.
We wrote simple function which uses blocking io to perform the task. Also we added “io_bound” variable to emulate (very naively) io or cpu boud work. And finally our worker will look like this
But this sequential lets parallelize it.
But this function uses sync io and it blocks threads from the thread pool. We will rewrite it with async io.
Let’s add some time measurement and check which version is faster.
On my machine with numImages = 2000, parallel asynchronous and parallel sync versions for cpu bound tasks take the same amount of time, but for io bound tasks sync version is 2 times slower than async. Sequential sync version is 10 times slower than parallel sync version. But do you see a problem here with async version? It has unbounded concurrency level. In short it maps all ids into tasks and starts them in parallel. This kind of behavior will stress our data source until eventually it fails and requests starts to error. Let’s limit concurrency level with a semaphore. We can use TPL schedulers for that purpose also but semaphore is easier to understand.
Now we could respect concurrency level allowed by a data source and probably find a best level by starting it several times with different limits. Unfortunately we still have a problem. We have different types of concurrency in our pipeline which could request different levels of concurrency. For example in our case reading from disk could be done with 100 concurrent requests, image processing is cpu bound and usually it has level which equals to count of available processors. So we need to split our pipeline into smaller parts and set them different levels of concurrency. But we need to connect them somehow and synchronize their speeds.
In short the pattern describes two processes, the producer and the consumer, who share a common, fixed-size buffer used as a queue. Consumer could write into the queue when it is not full, in other case it will block. Producer could read from the queue when it is not empty and in other case it will block. The queue is buffered to increase overall speed and increase throughput in case when there are multiple consumers and producers. When queue is safe to use by multiple consumers and producers then it could be easy to change level of concurrency for different parts of pipeline in runtime. We will use BlockingQueueAgent described by Tomas Patricek here next samples will be almost the same as in this post, but with some additions. So you could go and read these awesome posts and after that return back. But I’ll describe them here in short just to keep all info in one place. First task is to split our pipeline to separate tasks.
Ok we have queues and it looks really similar to code which you could see in Golang and Clojure async. Async bounded channels as a connection between async sequential processes. And yes we use the patterns of interaction in concurrent systems that initially was described by C. A. R. Hoare and well known as Communicating Sequential Processes (CSP). But we need an additional item to be a fully CSP compatible it is a “choice” operator. In short it allows us to compose several get add operations on different channels (queues). For example we could try to read form two channels and continue execution with result which arrives first. If one queue wins (first unblocks process) then second should not be affected (no values should be got or added to the second queue). Our blocking queue is built on top of a mailbox processor so all operations are not atomic so we need to use “Compensating Transaction Pattern” to undo effect of a get operation. For simplicity we will support only get operations in our switch implementation.
Very naive but simple and works. Now we could express our wrapper for workers which adds possibility to be stopped.
It tries to get from both a stop channel and an input channel. If it receives stop signal then it stops, in other case it invokes worker function. And finally workers are built on top of stoppable worker.
It is time for load balancer, it will start specified number of workers and will check lengths of input and output queues and increase or reduce number of workers at runtime.
To start our pipeline we should wrap our stoppable workers into load balancer and start result workflows.
If you run this code then you will see something like this.
load_images inq 1 outq 0 workers 2proccess_images inq 0 outq 0 workers 2
save_images inq 0 outq 0 workers 2
load_images inq 1998 outq 0 workers 3
save_images inq 0 outq 0 workers 1
proccess_images inq 0 outq 0 workers 1
proccess_images inq 0 outq 0 workers 2
save_images inq 0 outq 0 workers 2
load_images inq 1997 outq 0 workers 4
load_images inq 1993 outq 0 workers 5
save_images inq 0 outq 0 workers 1
proccess_images inq 0 outq 0 workers 1
load_images inq 1991 outq 0 workers 6
proccess_images inq 0 outq 0 workers 2
save_images inq 0 outq 0 workers 2
load_images inq 1986 outq 0 workers 7
proccess_images inq 0 outq 0 workers 1save_images inq 0 outq 0 workers 1
load_images inq 1984 outq save_images inq 0 outq 0 workers 2
0 workers 8
proccess_images inq 0 outq 0 workers 2
load_images inq 1977 outq 0 workers 9
proccess_images inq 0 outq 0 workers 1
save_images inq 0 outq 0 workers 1
save_images inq 0 outq 0 workers 2
proccess_images inq 4 outq 0 workers 2
load_images inq 1971 outq 4 workers 9 It takes some time to find the best count of workers for every task. For better performance you could change starting number of workers from minimal to maximal count. Also we could easily change rules for load balancer and use performance metrics of used resource and so on. It is not production quality code, but could be rewrote and after that you will have possibility to read and use [Golang patterns](http://blog.golang.org/advanced-go-concurrency-patterns) in your fsharp code. Unfortunately I have no enough time for this blog so actors will arrive only in the next part. And as usual commants and corrections are welcome. # Update Recommendation for Further Reading: [Programming in Hopac](https://github.com/Hopac/Hopac/blob/master/Docs/Programming.md) suggested by [Vasily Kirichenko](https://twitter.com/kot_2010). # Update 2 Hopac library has an intersting concept of [Alternatives](http://hopac.github.io/Hopac/Hopac.html#def:type Hopac.Alt) (events in Concurrent ML). Alternative object represents a first-class selective operation with possibility to use nack message for not selected concurrent operations (where we used compensation transaction). It was a little bit hard to understand how they work and I implemented them using only fsharp async. Hope it would be helpful for someone. [Code](https://gist.github.com/hodzanassredin/21cedfa2815735f8880e) contains atomic version of BlockingQueueAgent and some examples. # Update 3 There is is a [reference implementation](https://github.com/intellifactory/websharper.hopac/blob/master/websharper.hopac/Hopac.fsi) of Hopac in fsharp, created by [Anton Tayanovskyy](https://twitter.com/t0yv0) for WebSharper project.