At work, some time ago we used azure storage queues as a task queue engine for our workers, but after some time we found that it is not as fast as we want it to be. At the same time, we used redis intensively and started to think about possibility to use it as a task queue. There is a lot of documentation how to use redis as publish/subscibe service, but not as much as a task queue.
So I decided to describe how to do it.
What is a task queue?
Task queue allows clients of some service asynchronously send tasks to it. Usually service has many clients and probably many workers. In short whole workflow looks like this:
Client puts task into a queue
Workers in loop periodically check the queue for a new task, if task exists then worker executes it
But there are some additional requirements to a queue:
Quality of service: One client should not block other client’s requests
Batch processing: clients and workers should have possibility to put and get several tasks at once for better performance.
Reliability: if worker fails during processing of a task after some time this task have to be handed by other or the same worker again.
Dead letters: if some task causes worker fail after several attempts to handle it then it have to be put into dead letters storage.
One task have to be processed only one time in case of success.
Implementation
We will use a redis list for every client. List key will use a task queue name as a prefix and second part will be client id. When client wants to put a message into the queue it will calculate list key by concatenation of the queue name and its own id. It would be great to put these lists into a separate redis database, but in case it we have to share redis db with some other code lets add an additional prefix like “queues:” to their names. So let’s define a class RedisQueue which will hide implementation details.
So sending was easy to implement and what about receiving side?
First of all, we need to find all queue lists. There are three options:
Use KEYS “prefix:*” command for all lists search. But this command could cause serious problems on production, it may ruin performance when it is executed against large databases. So never use this option.
use SCAN command this command works the same as keys, but has no performance problems.
Store all names in a redis set and add a list name to a set on massage sending and delete a queue form a set when all messages are processed. Unfortunately, this step requires additional code to implement so we will use second option.
When we found all queues, we need to randomly sort them to guaranty that all queues have the same probability to be processed. After that, we need to get required count of messages in one batch (with redis pipeline). After that if no messages found then we need to run whole process again in other case handle messages and delete them after processing. Also we need to prevent double processing of a message in a list and prevent message lose caused by exceptions during message processing, to do that we will use RPOPLPUSH command which atomically removes message from a list and put it into an additional “processing” list and return value to a caller. So we will use additional list for every queue with key “processing:queue_name”. After message handling we must remove it from prccessing list. But in case of several unsuccessful attempts to process message we need to finally move it to a deed letters. As a dead letters store we will use redis set with key “dead:queue_name”. From time to time we need to check processing list and if attempts count of a message is lower than max allowed count then put it back to a client list or in other case put it into dead letters set.