I decided to write a history and current state of our distributed azure project. We started from a simple azure worker and finished with a very complex system. I suppose it will be interesting for developers who is going to build some distributed application and hope this post will help them to avoid some mistakes. In addition, I am going to describe two possible solutions in FSharp for our current problems.
We started form a single azure worker, it had only two queues one for input and other for output and it was single threaded. Many azure tutorials shows the way to build this kind of worker. We decided to use azure storage queue instead of Azure Service Bus queue. It is cheaper and has better SLA. For possible future changes we abstracted queue in a simple C# interface:
As you can see, main interface is push based, but for example storage queue is pull based, so we need additional one to let a worker know what it should do some additional work. We have implementations for both types of azure queues and RabbitMQ. Woker uses it this way:
I am not going to show client code but it almost the same. This simple abstraction works very well for us. All queues implementations on message fetching invokes subscribed action with fetched message and in case of no exception, deletes massage from queue. If action throws an exception then our queue wrapper do nothing and queue provider after some time will return this message to the queue and it can be handled second time by some worker. When something is going wrong with your worker, you can easy see this by checking your requests queue in visual studio’s server explorer. In case of problems there will be some messages with non-zero fetching count.
In addition, we hide queues behind a REST api facade implemented as azure web site. Therefore, our external clients know nothing about queues. But how to add possibility to use worker by multiple clients at the same time and give them identical bandwidth? Our abstraction gave us a way to solve it without changing worker code. For azure storage queue, we implemented QueueStorageMulti class.
Now worker uses QueueStorageMulti instead of QueueStorage. Every time when it fetches message it uses next queue. There is a problem with refresh strategy, code above refreshes queues list only after when it finishes fetching from all existing queues. If we have many clients then new one should wait a little bit more. If you want to reduce queue latency then you have to choose other refresh strategy. New problem: which queue should be used by a worker to send result back to client? We can use some client id in message and use some convention about queues naming. If we have requests queue with worker1 prefix then request queue has name worker1-clientId and response queue has name responses-worker-clientid. However, we decided to use some base message class with response address property. Back address also has a type, in our case types are: redis, storage queue, email. Answer could be send not only to some queue, but also to other supported back address types. We should know about errors so there is Error and StackTrace fields. big messages, back address
New problem: different types of queues have different maximum limit of message size. We solved that problem by implementing Ref message type. If queue wrapper see that message is too big then it converts it to a ref message. Ref message stores message data in azure blob storage.
A lot of work done and our simple worker can work without any issues. However, after some time you understand that your worker is not enough and you need to scale it. Scaling should be no problem because this is the Cloud. Right? No. It is extremely hard in azure. Yes there is some possibilities to do auto scaling which depends on some metric But we need to scale depending on queue length and our queue could be changed to other type unsupported by auto scaling. For example, there is no auto scaling support for azure storage queue. After some googling I found auto scaling application block. It was outdated and after several days without any success, I decided to write my own. Main idea is to use azure management api. You probably think it is simple as installing nuget package and writing several lines of code. NOOOOO. It is extremely undocumented and not easy to understand process. In two words you need to load security certificate by thumbprint and after that do some black magic with downloading xml and changing instance count in it and uploading it back. I will not write a code here because it is a theme for a different blog post. Now is the time to load a file to all of our worker instances. We do it this way. Upload you file to a blob storage and during worker loading, download it to a local worker’s storage. You need to enable it in your worker’s configuration file and specify local storage size. You should not make a mistake. Because after deploy, future changes in local storage settings can broke you deployment and you will be forced to delete cloud service from azure and redeploy it. In addition, if you specify local storage size, which is less than size of your file, then you will get a cryptic soap error message during deployment. Next step is to utilize all worker cores. We created some abstractions for worker and tasks. Worker, on start, loading all files and other shared resources and after that checks count of logical processors and creates this number of task executors. If task executor throws an exception then worker should restart it again.
So far so good. We have all what we need to use single type worker in azure. It can handle different types of messages and all seems to be fine. Unfortunately, we have new requirement to use the same type of worker but with a different data. In our case, it was different types of text classifiers. Both types of workers have the same pipeline. Crawl -> Extract features -> Feature hashing -> Vectorization -> Classification. Both classifiers take a lot of memory and load different files for classifiers and vectorizers. You may think that perfect way to do that is to have two different requests prefixes and one small auto-scaler worker instance, which will create classifier instance with a different configuration based on a queue prefix. Unfortunately, it takes too long to create an instance in azure and takes too long to load classifier into instance’s memory. In our case about half an hour. Nevertheless, clients of our web site uses synchronous interface and cannot wait too long. We need to keep one instance in running state for every type of classifier. It is not a problem from a developer point of view.
Next feature request is to add some possibility to only one type of classifier. Now our workers are different not only by config but also by incoming message types and code. We have to separate both types of workers in code.
After some profiling, we see that we have bottleneck in our performance and it is a crawling stage. Yes, it has async nature but we do not want to use an event loop in our processing thread in a worker and we also have new requirement to implement a new pipeline, which uses crawling but not related to our classifiers. So we need to move crawling and tokenization into a new small worker instance. It is cheap and now we auto scale only this worker. Classification workers have only one instance in a running state, they are too pricy in terms of instance size. As you probably know your azure subscription has a limit in total count of cores which can be used. Our solution allows us to keep this count small. Now we have a problem in code: how can we express some pipeline? It is time for a pipeline message class. It is a simple message class with a message, which have to be sent to a response receiver. So worker, when it receives a message, should do some work and create some output value and invoke GetNextMessage(calculated output value). This method returns response message, which could has some fields populated initially by a client and some fields populated form the calculated output value. This response message should be sent to a response receiver. Also in case of any error, we need to break execution, find a final message, set error data and send it to a final receiver. Why to send error message back to receiver? Because it could be not asynchronous and can be in a waiting state and needs to know when to show error or try again. Probably better way is to add ErrorMessageRecievers.
Now we have a system, which allows us to split our work to distributed small pieces and express pipelines with message builders. For example, classification message builder will look like this.
We created some code, which allows us to express distributed computations for azure platform. Very interesting, but now we are using Microservices pattern. You can read more here. We can decompose our algorithms into small pieces and they could be deployed separately without breaking currently executing pipelines. Unfortunately our system described above have a lot of problems which are really difficult to solve. With all that hype about Microserices I read many articles and unfortunately found no answers to my questions. Let us start from our solution description. Our solution has possibility to express only SEQUENTIAL pipelines with a primitive error handling. That’s all. No possibility to express cancellation, loops, if conditions and other control flow operators, fork/join parallelization. Even with only sequential pipelines, it is damn hard to use.
All from that list increases time from requirement to release. It is just takes too long to decompose, workaround problems with control flow and test.
So currently in our project, we have a lot of boilerplate code with workarounds, which are hard to support. Some time ago, I started to think how can we solve that problems. Main idea was to use some actor framework like akka.net or Orleans. But after some attempts to emulate system on a local computer I found that they can’t solve all that problems. There is again small pieces of more complex pipelines. There is no abstract way to compose complex pipelines in code. Quote from @runarorama’s twitter “Actors are an implementation detail for higher-order constructs, not a programming model”. Actors are not a free from their own problems too. Depression, depression, depression. However, several days ago I received a question in twitter from @zahardzhan: “Do you use monads described in your blog in practice?” My answer was “NO”. I decided to refresh my memories and read my posts just for fun. And after reading of a post about Workflow monad. I decided to use a way described in this post to express pipelines as workflows, and after some thoughts and experiments it was transformed into something new. So let’s, as we do in the post about workflow monad, start from an ideal solution which allows us to do:
As you can see I did not list here other problems and I’m going to start from a something simple and after that add new possibilities one by one.
I’m not going to write about whole way of thinking just describe my solution in short and show you some code. I decided to write a computation expression builder, which could do everything described above.
As a starting point, I took Reader monad and changed a signature.
As you can see I also used Async as a return type because currently all our code is async. It is easier to merge both monads from the scratch.
That’s all, now we can do all what we want to do.
ready to go
Console output:
executing in "without stop_lst"
tokenizing
requesting resource "big_res"
found resource "big_res"
requesting resource "stop_list"
not found resource "stop_list"
env swap
executing in "with stop_lst"
requesting resource "stop_list"
found resource "stop_list"
executed ...
Now we can go further and add missing functionality: error handling, fork join paralellism. Do we have new problems? Definitely yes, we need to solve how to use disposable objects, how to avoid capturing of resource objects in a closures….
I would really appreciate if you share your experience with distributed projects, problems and solutions in comments.
In the next post we will discuss other way to compose distributed computations based on queue combinators, compare actors(fsharp, akka.net, orleans) and csp(golang and clojure), go deep inside reducers, transducers and nessos streams and will find some really interesting stuff.