Implement Redis-based locking mechanism
Pushing a lot of jobs into Redis queue(s) for Sidekiq to process is very common in Ruby on Rails applications. Still, there are situations when you want to guarantee that specific jobs don’t execute simultaneously, or you want to ensure that they run sequentially.
This is useful sometimes, as a result, it can incur costs or end with minor inconveniences for users (sending emails twice) or, in some cases, even data corruption.
For example:
When an application talks to external APIs and needs to call those APIs only once, even if the application code triggered the worker multiple times.
You need to do expensive computations and send reports to some users, but you must ensure that computations are not done concurrently.
In a microservices architecture, when your application talks to multiple services.
The solution
To solve this problem, we can implement from scratch a simple and naïve blocking and non-blocking Redis-based locking mechanism. This Redis-based locking mechanism is also known as the RedLock algorithm and we will implement a “weak” guarantee of the lock (see disclaimer below).
- Non-blocking – means that the function trying to obtain the lock will return immediately with a boolean value.
- Blocking – means that the function trying to obtain the lock will wait until it is released to acquire it.
We will use the Adapter Design Pattern for better reusability and other OOP benefits by implementing a LockManager and a RedisAdapter over the RedLock algorithm.
Disclaimer
Use this solution with caution, only if you understand how it works before using it in production environments! It’s important to note that the RedLock algorithm is not a silver-bullet and can possibly be harmful! You can read more about how it can be harmful as well as learning about “strong” or “weak” guarantees in the Antirez blog. There’s a lot to it and it’s outside the scope of this article.
The strategies presented here are by no means a drop-in solution. It’s just a practical and basic implementation that I’ve found interesting and wanted to share. Other gems already try to solve this problem, such as Sidekiq Unique Jobs, Sidekiq Lock or Activejob Uniqueness.
Step 1 – acquire and release lock on Redis
Let’s first implement the naïve RedLock algorithm, which is the backbone of this whole solution and is responsible for acquiring and releasing locks. More information can be found on Redis.io Distlock.
The RedLock algorithm has the following properties:
- lock method, which leverages the Redis gem to set a key only if it doesn’t exist (NX option) with an expiry of 30000 milliseconds (PX option). The value for the key can be anything. More options can be found here: Redis-rb
- The expires_in attribute, also known as the “lock validity time”. It is both the auto release time and the time the client has to perform the operation required before another client can acquire the lock again. The mutex (mutual exclusion) principle is only limited to a given time window from the moment the lock is acquired. This ensures that we don’t end up with deadlocks.
class RedisLock
class Lock
# can be useful for monitoring and logging purposes
LOCK_PREFIX = 'lock:'
attr_reader :redis, :key, :value, :expires_in
def initialize(redis, key:, value:, expires_in: 30)
@redis = redis
@key = LOCK_PREFIX + key
@value = value
# miliseconds
@expires_in = expires_in * 1000
end
def lock
reply = redis.set(key, value, nx: true, px: expires_in)
reply == true
end
end
end
- unlock method, the value is used to release the lock safely, with a Lua script that tells Redis: remove the key only if it exists and if the value stored at that key is precisely the one I expect it to be.
# Inspired from https://redis.io/topics/distlock
def unlock
reply = begin
redis.evalsha redis_lock_script_sha, keys: [key], argv: [value]
rescue Redis::CommandError
redis.eval redis_lock_script, keys: [key], argv: [value]
end
reply.to_s == '1'
end
private
def redis_lock_script_sha
@redis_lock_script_sha ||= Digest::SHA1.hexdigest(redis_lock_script)
end
def redis_lock_script
@redis_lock_script ||= <<-LUA
if redis.call("get", KEYS[1]) == ARGV[1]
then
return redis.call("del", KEYS[1])
else
return 0
end
LUA
end
The final RedLock algorithm:
class RedisLock
class Lock
LOCK_PREFIX = 'lock:'
attr_reader :redis, :key, :value, :expires_in
def initialize(redis, key:, value:, expires_in: 30)
@redis = redis
@key = LOCK_PREFIX + key
@value = value
# miliseconds
@expires_in = expires_in * 1000
end
def lock
reply = redis.set(key, value, nx: true, px: expires_in)
reply == true
end
def unlock
reply = begin
redis.evalsha redis_lock_script_sha, keys: [key], argv: [value]
rescue Redis::CommandError
redis.eval redis_lock_script, keys: [key], argv: [value]
end
reply.to_s == '1'
end
private
def redis_lock_script_sha
@redis_lock_script_sha ||= Digest::SHA1.hexdigest(redis_lock_script)
end
def redis_lock_script
@redis_lock_script ||= <<-LUA
if redis.call("get", KEYS[1]) == ARGV[1]
then
return redis.call("del", KEYS[1])
else
return 0
end
LUA
end
end
end
Step 2 – logic for locking and unlocking key on Redis
Secondly, we implement the RedisAdapter containing the logic for locking and unlocking a key in a blocking and non-blocking way. It makes use of the RedLock algorithm that we’ve implemented before.
The RedisAdapter will create a connection with Redis by using a ConnectionPool.
module LockManager
module Adapters
class RedisLockAdapter
def initialize(pool_size: 5, pool_timeout: 5, redis: {})
@connection_pool = ConnectionPool.new(size: pool_size, timeout: pool_timeout) do
Redis.new(redis)
end
end
end
end
end
What is a Connection Pool?
Usually, when it comes to a Redis connection, we don’t have any inbuilt connection pool, meaning that we end up using just one direct connection, mainly via a configuration or application-level global variable.
Since Redis is very fast, we often don’t have any issues even with multiple parallel connections, even though it’s a shared blocking resource. In highly-intensive applications, this blocking nature can have a severe performance impact.
We’ve used the ConnectionPool gem to solve this problem, which most Rails applications already have installed by default (with ActiveRecord).
From a given pool size (usually the number is equal to the max number of threads of our application server), threads calling with on the connection pool will pick a new connection from the pool or create one if one is not available. The connection is passed to the block and returned to the pool upon completion. If the max pool size is reached, threads calling with will have to wait for a free connection.
Coming back to our RedisAdapter, we add two methods of acquiring a lock and only one method for releasing a lock.
try_lock acquires the lock in a non-blocking way and immediately returns with a boolean value if it wasn’t possible to acquire the lock
def try_lock(key, value, timeout)
locked = false
with_redis do |redis|
locked = RedisLock::Lock.new(redis, key: key, value: value, expires_in: timeout).lock
return false if locked == false
end
locked
end
private
def with_redis
@connection_pool.with do |r|
yield r
end
end
lock acquires the lock in a blocking manner. So if acquiring the lock is impossible, it will retry every sleep_time seconds to acquire it until it’s possible (either lock is released or the key expires).
class LockNotAcquired < StandardError; end
def lock(key, value, timeout)
expire = Time.now + timeout.to_f
locked = false
loop do
raise LockNotAcquired.new(key) if Time.now + sleep_time > expire
with_redis do |redis|
locked = RedisLock::Lock.new(redis, key: key, value: value, expires_in: timeout).lock
end
break if locked == true
sleep(sleep_time)
end
end
private
def with_redis
@connection_pool.with do |r|
yield r
end
end
unlock releases the lock of the Redis instance
def unlock(key, value)
with_redis do |redis|
RedisLock::Lock.new(redis, key: key, value: value).unlock
end
end
private
def with_redis
@connection_pool.with do |r|
yield r
end
end
The final RedisAdapter:
require 'redis_lock/lock'
module LockManager
module Adapters
class RedisLockAdapter
class LockNotAcquired < StandardError; end
attr_reader :sleep_time
def initialize(pool_size: 5, pool_timeout: 5, redis: {}, sleep_time: 0.1)
@connection_pool = ConnectionPool.new(size: pool_size, timeout: pool_timeout) do
Redis.new(redis)
end
@sleep_time = sleep_time
end
def unlock(key, value)
with_redis do |redis|
RedisLock::Lock.new(redis, key: key, value: value).unlock
end
end
def try_lock(key, value, timeout)
locked = false
with_redis do |redis|
locked = RedisLock::Lock.new(redis, key: key, value: value, expires_in: timeout).lock
return false if locked == false
end
locked
end
def lock(key, value, timeout)
expire = Time.now + timeout.to_f
locked = false
loop do
raise LockNotAcquired.new(key) if Time.now + sleep_time > expire
with_redis do |redis|
locked = RedisLock::Lock.new(redis, key: key, value: value, expires_in: timeout).lock
end
break if locked == true
sleep(sleep_time)
end
end
private
def with_redis
@connection_pool.with do |r|
yield r
end
end
end
end
end
Step 3 – main gateway for the locking mechanism
Lastly, let’s implement the LockManager. It’s the main entry point/gateway for the locking mechanism. The name is generic, as we can add multiple adapters to it. But for this use case, we will focus on the RedisAdapter as the primary adapter. The LockManager will have 2 class methods: blocking and non-blocking.
The LockManager is a singleton class and is responsible for setting a key and a timeout (expiration) for that specific key.
module LockManager
class Locker
include Singleton
class << self
def nonblocking_lock(key, timeout: 30, &block)
instance.nonblocking_lock(key, timeout: timeout, &block)
end
def blocking_lock(key, timeout: 30, &block)
instance.blocking_lock(key, timeout: timeout, &block)
end
end
end
end
The class methods would just invoke the singleton instance methods:
def nonblocking_lock(key, timeout:)
value = SecureRandom.hex
if adapter.try_lock(key, value, timeout)
begin
Result.new(true, yield)
ensure
adapter.unlock(key, value)
end
else
Result.new(false, nil)
end
end
def blocking_lock(key, timeout:)
value = SecureRandom.hex
adapter.lock(key, value, timeout)
yield
ensure
adapter.unlock(key, value)
end
To check if the non-blocking operation has succeeded, we can use a Result class for more conditional control flow in our application if needed.
class Result
attr_reader :result
def initialize(run, result = nil)
@run = run
@result = result
end
def run?
@run
end
end
The final LockManager:
require 'lock_manager/adapters/redis_lock_adapter'
module LockManager
class Locker
include Singleton
attr_reader :adapter
def initialize
@adapter = Adapters::RedisLockAdapter.new
end
class << self
def nonblocking_lock(key, timeout: 30, &block)
instance.nonblocking_lock(key, timeout: timeout, &block)
end
def blocking_lock(key, timeout: 30, &block)
instance.blocking_lock(key, timeout: timeout, &block)
end
end
def nonblocking_lock(key, timeout:)
value = SecureRandom.hex
if adapter.try_lock(key, value, timeout)
begin
Result.new(true, yield)
ensure
adapter.unlock(key, value)
end
else
Result.new(false, nil)
end
end
def blocking_lock(key, timeout:)
value = SecureRandom.hex
adapter.lock(key, value, timeout)
yield
ensure
adapter.unlock(key, value)
end
class Result
attr_reader :result
def initialize(run, result = nil)
@run = run
@result = result
end
def run?
@run
end
end
end
end
Usage
Assuming that our ApplicationJob includes Sidekiq::Worker, we can test our LockManager implementation to see how it works using both blocking and non-blocking behaviour.
Non-blocking example
In the following example, we simulate a long-running job that takes 5 seconds to run. Since we’re using the non-blocking algorithm, we can assume that if we were to trigger this worker multiple times during that 5 second period, only the first job would execute, as all the other invocations would fall through.
require 'lock_manager/locker'
class MyWorker < ApplicationJob
def perform
LockManager::Locker.nonblocking_lock("MyWorker") do
sleep 5
logger.debug "MyWorker result"
end
end
end
Calling the worker 5 times during the 5 second sleep period yields only 1 MyWorker result log from the first job that acquired the lock, which is exactly what we expected:
> MyWorker.perform_async
=> "ae15e0c28464f1eceaf4dee4"
> MyWorker.perform_async
=> "8594484c746ef6259b35df1f"
> MyWorker.perform_async
=> "b997adc9c04ffc9a52eb49bd"
> MyWorker.perform_async
=> "ac36b4cdd47ba384849d047b"
> MyWorker.perform_async
=> "53053b33c715554a54408d9b"
2022-01-17T10:24:54.546Z pid=14379 tid=o5z class=MyWorker jid=ae15e0c28464f1eceaf4dee4 INFO: start
2022-01-17T10:24:54.862Z pid=14379 tid=o73 class=MyWorker jid=8594484c746ef6259b35df1f INFO: start
2022-01-17T10:24:54.869Z pid=14379 tid=o73 class=MyWorker jid=8594484c746ef6259b35df1f elapsed=0.007 INFO: done
2022-01-17T10:24:55.162Z pid=14379 tid=o6j class=MyWorker jid=b997adc9c04ffc9a52eb49bd INFO: start
2022-01-17T10:24:55.168Z pid=14379 tid=o6j class=MyWorker jid=b997adc9c04ffc9a52eb49bd elapsed=0.007 INFO: done
2022-01-17T10:24:55.445Z pid=14379 tid=o73 class=MyWorker jid=ac36b4cdd47ba384849d047b INFO: start
2022-01-17T10:24:55.453Z pid=14379 tid=o73 class=MyWorker jid=ac36b4cdd47ba384849d047b elapsed=0.007 INFO: done
2022-01-17T10:24:55.754Z pid=14379 tid=o6j class=MyWorker jid=53053b33c715554a54408d9b INFO: start
2022-01-17T10:24:55.764Z pid=14379 tid=o6j class=MyWorker jid=53053b33c715554a54408d9b elapsed=0.01 INFO: done
2022-01-17T10:24:59.555Z pid=14379 tid=o5z class=MyWorker jid=ae15e0c28464f1eceaf4dee4 DEBUG: MyWorker result
2022-01-17T10:24:59.556Z pid=14379 tid=o5z class=MyWorker jid=ae15e0c28464f1eceaf4dee4 elapsed=5.011 INFO: done
Blocking example
If we change to blocking behaviour, then we would assume that each worker will have to wait until the lock is released to process the job. So basically, all jobs would run sequentially.
Change:
LockManager::Locker.nonblocking_lock("MyWorker") do
to:
LockManager::Locker.blocking_lock("MyWorker") do
Calling the worker three times during the 5 second sleep period yields 3 MyWorker result logs, from each job, sequentially, which is exactly what we expected:
> MyWorker.perform_async
=> "991624dca5f5c748faf0ed54"
> MyWorker.perform_async
=> "7213cedf4c8353e8cf2743f7"
> MyWorker.perform_async
=> "b1a523be56815b68425c5814"
2022-01-17T10:34:41.764Z pid=14379 tid=o73 class=MyWorker jid=991624dca5f5c748faf0ed54 INFO: start
2022-01-17T10:34:42.197Z pid=14379 tid=o5z class=MyWorker jid=7213cedf4c8353e8cf2743f7 INFO: start
2022-01-17T10:34:42.714Z pid=14379 tid=o6j class=MyWorker jid=b1a523be56815b68425c5814 INFO: start
2022-01-17T10:34:46.845Z pid=14379 tid=o73 class=MyWorker jid=991624dca5f5c748faf0ed54 DEBUG: MyWorker result
2022-01-17T10:34:46.845Z pid=14379 tid=o73 class=MyWorker jid=991624dca5f5c748faf0ed54 elapsed=5.081 INFO: done
2022-01-17T10:34:51.903Z pid=14379 tid=o5z class=MyWorker jid=7213cedf4c8353e8cf2743f7 DEBUG: MyWorker result
2022-01-17T10:34:51.905Z pid=14379 tid=o5z class=MyWorker jid=7213cedf4c8353e8cf2743f7 elapsed=9.707 INFO: done
2022-01-17T10:34:56.927Z pid=14379 tid=o6j class=MyWorker jid=b1a523be56815b68425c5814 DEBUG: MyWorker result
2022-01-17T10:34:56.928Z pid=14379 tid=o6j class=MyWorker jid=b1a523be56815b68425c5814 elapsed=14.213 INFO: done
Summary
Not only did we implement unique Sidekiq jobs in Ruby from scratch, but we also added a concurrency control layer for running jobs sequentially for use cases where that is needed. Additionally, we can extend the above for advisory locks on the database with minimal code changes. This can be a topic for a follow-up article.