Build your own concurrency control in Sidekiq

Implement Redis-based locking mechanism

Pushing a lot of jobs into Redis queue(s) for Sidekiq to process is very common in Ruby on Rails applications. Still, there are situations when you want to guarantee that specific jobs don’t execute simultaneously, or you want to ensure that they run sequentially.

This is useful sometimes, as a result, it can incur costs or end with minor inconveniences for users (sending emails twice) or, in some cases, even data corruption.

For example:

When an application talks to external APIs and needs to call those APIs only once, even if the application code triggered the worker multiple times.
You need to do expensive computations and send reports to some users, but you must ensure that computations are not done concurrently.
In a microservices architecture, when your application talks to multiple services.

The solution

To solve this problem, we can implement from scratch a simple and naïve blocking and non-blocking Redis-based locking mechanism. This Redis-based locking mechanism is also known as the RedLock algorithm and we will implement a “weak” guarantee of the lock (see disclaimer below).

  • Non-blocking – means that the function trying to obtain the lock will return immediately with a boolean value.
  • Blocking – means that the function trying to obtain the lock will wait until it is released to acquire it.

We will use the Adapter Design Pattern for better reusability and other OOP benefits by implementing a LockManager and a RedisAdapter over the RedLock algorithm.

Disclaimer

Use this solution with caution, only if you understand how it works before using it in production environments! It’s important to note that the RedLock algorithm is not a silver-bullet and can possibly be harmful! You can read more about how it can be harmful as well as learning about “strong” or “weak” guarantees in the Antirez blog. There’s a lot to it and it’s outside the scope of this article.

The strategies presented here are by no means a drop-in solution. It’s just a practical and basic implementation that I’ve found interesting and wanted to share. Other gems already try to solve this problem, such as Sidekiq Unique Jobs, Sidekiq Lock or Activejob Uniqueness.

Step 1 – acquire and release lock on Redis

Let’s first implement the naïve RedLock algorithm, which is the backbone of this whole solution and is responsible for acquiring and releasing locks. More information can be found on Redis.io Distlock.

The RedLock algorithm has the following properties:

  • lock method, which leverages the Redis gem to set a key only if it doesn’t exist (NX option) with an expiry of 30000 milliseconds (PX option). The value for the key can be anything. More options can be found here: Redis-rb
  • The expires_in attribute, also known as the “lock validity time”. It is both the auto release time and the time the client has to perform the operation required before another client can acquire the lock again. The mutex (mutual exclusion) principle is only limited to a given time window from the moment the lock is acquired. This ensures that we don’t end up with deadlocks.
class RedisLock
  class Lock
    # can be useful for monitoring and logging purposes
    LOCK_PREFIX = 'lock:'

    attr_reader :redis, :key, :value, :expires_in

    def initialize(redis, key:, value:, expires_in: 30)
      @redis = redis
      @key = LOCK_PREFIX + key
      @value = value
      # miliseconds
      @expires_in = expires_in * 1000
    end

    def lock
      reply = redis.set(key, value, nx: true, px: expires_in)
      reply == true
    end
  end
end
  • unlock method, the value is used to release the lock safely, with a Lua script that tells Redis: remove the key only if it exists and if the value stored at that key is precisely the one I expect it to be.
# Inspired from https://redis.io/topics/distlock
def unlock
  reply = begin
            redis.evalsha redis_lock_script_sha, keys: [key], argv: [value]
          rescue Redis::CommandError
            redis.eval redis_lock_script, keys: [key], argv: [value]
          end

  reply.to_s == '1'
end

private

def redis_lock_script_sha
  @redis_lock_script_sha ||= Digest::SHA1.hexdigest(redis_lock_script)
end

def redis_lock_script
  @redis_lock_script ||= <<-LUA
    if redis.call("get", KEYS[1]) == ARGV[1]
    then
      return redis.call("del", KEYS[1])
    else
      return 0
    end
  LUA
end

The final RedLock algorithm:

class RedisLock
  class Lock
    LOCK_PREFIX = 'lock:'

    attr_reader :redis, :key, :value, :expires_in

    def initialize(redis, key:, value:, expires_in: 30)
      @redis = redis
      @key = LOCK_PREFIX + key
      @value = value
      # miliseconds
      @expires_in = expires_in * 1000
    end

    def lock
      reply = redis.set(key, value, nx: true, px: expires_in)
      reply == true
    end

    def unlock
      reply = begin
                redis.evalsha redis_lock_script_sha, keys: [key], argv: [value]
              rescue Redis::CommandError
                redis.eval redis_lock_script, keys: [key], argv: [value]
              end

      reply.to_s == '1'
    end

    private

    def redis_lock_script_sha
      @redis_lock_script_sha ||= Digest::SHA1.hexdigest(redis_lock_script)
    end

    def redis_lock_script
      @redis_lock_script ||= <<-LUA
        if redis.call("get", KEYS[1]) == ARGV[1]
        then
          return redis.call("del", KEYS[1])
        else
          return 0
        end
      LUA
    end
  end
end

Step 2 – logic for locking and unlocking key on Redis

Secondly, we implement the RedisAdapter containing the logic for locking and unlocking a key in a blocking and non-blocking way. It makes use of the RedLock algorithm that we’ve implemented before.

The RedisAdapter will create a connection with Redis by using a ConnectionPool.

module LockManager
  module Adapters
    class RedisLockAdapter
      def initialize(pool_size: 5, pool_timeout: 5, redis: {})
        @connection_pool = ConnectionPool.new(size: pool_size, timeout: pool_timeout) do
          Redis.new(redis)
        end
      end
    end
  end
end

What is a Connection Pool?

Usually, when it comes to a Redis connection, we don’t have any inbuilt connection pool, meaning that we end up using just one direct connection, mainly via a configuration or application-level global variable.

Since Redis is very fast, we often don’t have any issues even with multiple parallel connections, even though it’s a shared blocking resource. In highly-intensive applications, this blocking nature can have a severe performance impact.

We’ve used the ConnectionPool gem to solve this problem, which most Rails applications already have installed by default (with ActiveRecord).

From a given pool size (usually the number is equal to the max number of threads of our application server), threads calling with on the connection pool will pick a new connection from the pool or create one if one is not available. The connection is passed to the block and returned to the pool upon completion. If the max pool size is reached, threads calling with will have to wait for a free connection.

Coming back to our RedisAdapter, we add two methods of acquiring a lock and only one method for releasing a lock.

try_lock acquires the lock in a non-blocking way and immediately returns with a boolean value if it wasn’t possible to acquire the lock

def try_lock(key, value, timeout)
  locked = false
  with_redis do |redis|
    locked = RedisLock::Lock.new(redis, key: key, value: value, expires_in: timeout).lock
    return false if locked == false
  end

  locked
end

private

def with_redis
  @connection_pool.with do |r|
    yield r
  end
end

lock acquires the lock in a blocking manner. So if acquiring the lock is impossible, it will retry every sleep_time seconds to acquire it until it’s possible (either lock is released or the key expires).

class LockNotAcquired < StandardError; end

def lock(key, value, timeout)
  expire = Time.now + timeout.to_f
  locked = false

  loop do
    raise LockNotAcquired.new(key) if Time.now + sleep_time > expire

    with_redis do |redis|
      locked = RedisLock::Lock.new(redis, key: key, value: value, expires_in: timeout).lock
    end

    break if locked == true

    sleep(sleep_time)
  end
end

private

def with_redis
  @connection_pool.with do |r|
    yield r
  end
end

unlock releases the lock of the Redis instance

def unlock(key, value)
  with_redis do |redis|
    RedisLock::Lock.new(redis, key: key, value: value).unlock
  end
end


private

def with_redis
  @connection_pool.with do |r|
    yield r
  end
end

The final RedisAdapter:

require 'redis_lock/lock'

module LockManager
  module Adapters
    class RedisLockAdapter
      class LockNotAcquired < StandardError; end

      attr_reader :sleep_time

      def initialize(pool_size: 5, pool_timeout: 5, redis: {}, sleep_time: 0.1)
        @connection_pool = ConnectionPool.new(size: pool_size, timeout: pool_timeout) do
          Redis.new(redis)
        end

        @sleep_time = sleep_time
      end

      def unlock(key, value)
        with_redis do |redis|
          RedisLock::Lock.new(redis, key: key, value: value).unlock
        end
      end

      def try_lock(key, value, timeout)
        locked = false
        with_redis do |redis|
          locked = RedisLock::Lock.new(redis, key: key, value: value, expires_in: timeout).lock
          return false if locked == false
        end

        locked
      end

      def lock(key, value, timeout)
        expire = Time.now + timeout.to_f
        locked = false

        loop do
          raise LockNotAcquired.new(key) if Time.now + sleep_time > expire

          with_redis do |redis|
            locked = RedisLock::Lock.new(redis, key: key, value: value, expires_in: timeout).lock
          end

          break if locked == true

          sleep(sleep_time)
        end
      end

      private

      def with_redis
        @connection_pool.with do |r|
          yield r
        end
      end
    end
  end
end

Step 3 – main gateway for the locking mechanism

Lastly, let’s implement the LockManager. It’s the main entry point/gateway for the locking mechanism. The name is generic, as we can add multiple adapters to it. But for this use case, we will focus on the RedisAdapter as the primary adapter. The LockManager will have 2 class methods: blocking and non-blocking.

The LockManager is a singleton class and is responsible for setting a key and a timeout (expiration) for that specific key.

module LockManager
  class Locker
    include Singleton

    class << self
      def nonblocking_lock(key, timeout: 30, &block)
        instance.nonblocking_lock(key, timeout: timeout, &block)
      end

      def blocking_lock(key, timeout: 30, &block)
        instance.blocking_lock(key, timeout: timeout, &block)
      end
    end
  end
end

The class methods would just invoke the singleton instance methods:

def nonblocking_lock(key, timeout:)
  value = SecureRandom.hex

  if adapter.try_lock(key, value, timeout)
    begin
      Result.new(true, yield)
    ensure
      adapter.unlock(key, value)
    end
  else
    Result.new(false, nil)
  end
end

def blocking_lock(key, timeout:)
  value = SecureRandom.hex
  adapter.lock(key, value, timeout)
  yield
ensure
  adapter.unlock(key, value)
end

To check if the non-blocking operation has succeeded, we can use a Result class for more conditional control flow in our application if needed.

class Result
  attr_reader :result

  def initialize(run, result = nil)
    @run = run
    @result = result
  end

  def run?
    @run
  end
end

The final LockManager:

require 'lock_manager/adapters/redis_lock_adapter'

module LockManager
  class Locker
    include Singleton

    attr_reader :adapter

    def initialize
      @adapter = Adapters::RedisLockAdapter.new
    end

    class << self
      def nonblocking_lock(key, timeout: 30, &block)
        instance.nonblocking_lock(key, timeout: timeout, &block)
      end

      def blocking_lock(key, timeout: 30, &block)
        instance.blocking_lock(key, timeout: timeout, &block)
      end
    end

    def nonblocking_lock(key, timeout:)
      value = SecureRandom.hex

      if adapter.try_lock(key, value, timeout)
        begin
          Result.new(true, yield)
        ensure
          adapter.unlock(key, value)
        end
      else
        Result.new(false, nil)
      end
    end

    def blocking_lock(key, timeout:)
      value = SecureRandom.hex
      adapter.lock(key, value, timeout)
      yield
    ensure
      adapter.unlock(key, value)
    end

    class Result
      attr_reader :result

      def initialize(run, result = nil)
        @run = run
        @result = result
      end

      def run?
        @run
      end
    end
  end
end

Usage

Assuming that our ApplicationJob includes Sidekiq::Worker, we can test our LockManager implementation to see how it works using both blocking and non-blocking behaviour.

Non-blocking example

In the following example, we simulate a long-running job that takes 5 seconds to run. Since we’re using the non-blocking algorithm, we can assume that if we were to trigger this worker multiple times during that 5 second period, only the first job would execute, as all the other invocations would fall through.

require 'lock_manager/locker'

class MyWorker < ApplicationJob
  def perform
    LockManager::Locker.nonblocking_lock("MyWorker") do
      sleep 5
      logger.debug "MyWorker result"
    end
  end
end

Calling the worker 5 times during the 5 second sleep period yields only 1 MyWorker result log from the first job that acquired the lock, which is exactly what we expected:

> MyWorker.perform_async
 => "ae15e0c28464f1eceaf4dee4"
> MyWorker.perform_async
 => "8594484c746ef6259b35df1f"
> MyWorker.perform_async
 => "b997adc9c04ffc9a52eb49bd"
> MyWorker.perform_async
 => "ac36b4cdd47ba384849d047b"
> MyWorker.perform_async
 => "53053b33c715554a54408d9b"
2022-01-17T10:24:54.546Z pid=14379 tid=o5z class=MyWorker jid=ae15e0c28464f1eceaf4dee4 INFO: start
2022-01-17T10:24:54.862Z pid=14379 tid=o73 class=MyWorker jid=8594484c746ef6259b35df1f INFO: start
2022-01-17T10:24:54.869Z pid=14379 tid=o73 class=MyWorker jid=8594484c746ef6259b35df1f elapsed=0.007 INFO: done
2022-01-17T10:24:55.162Z pid=14379 tid=o6j class=MyWorker jid=b997adc9c04ffc9a52eb49bd INFO: start
2022-01-17T10:24:55.168Z pid=14379 tid=o6j class=MyWorker jid=b997adc9c04ffc9a52eb49bd elapsed=0.007 INFO: done
2022-01-17T10:24:55.445Z pid=14379 tid=o73 class=MyWorker jid=ac36b4cdd47ba384849d047b INFO: start
2022-01-17T10:24:55.453Z pid=14379 tid=o73 class=MyWorker jid=ac36b4cdd47ba384849d047b elapsed=0.007 INFO: done
2022-01-17T10:24:55.754Z pid=14379 tid=o6j class=MyWorker jid=53053b33c715554a54408d9b INFO: start
2022-01-17T10:24:55.764Z pid=14379 tid=o6j class=MyWorker jid=53053b33c715554a54408d9b elapsed=0.01 INFO: done
2022-01-17T10:24:59.555Z pid=14379 tid=o5z class=MyWorker jid=ae15e0c28464f1eceaf4dee4 DEBUG: MyWorker result
2022-01-17T10:24:59.556Z pid=14379 tid=o5z class=MyWorker jid=ae15e0c28464f1eceaf4dee4 elapsed=5.011 INFO: done

Blocking example

If we change to blocking behaviour, then we would assume that each worker will have to wait until the lock is released to process the job. So basically, all jobs would run sequentially.

Change:

LockManager::Locker.nonblocking_lock("MyWorker") do

to:

LockManager::Locker.blocking_lock("MyWorker") do

Calling the worker three times during the 5 second sleep period yields 3 MyWorker result logs, from each job, sequentially, which is exactly what we expected:

> MyWorker.perform_async
 => "991624dca5f5c748faf0ed54"
> MyWorker.perform_async
 => "7213cedf4c8353e8cf2743f7"
> MyWorker.perform_async
 => "b1a523be56815b68425c5814"
2022-01-17T10:34:41.764Z pid=14379 tid=o73 class=MyWorker jid=991624dca5f5c748faf0ed54 INFO: start
2022-01-17T10:34:42.197Z pid=14379 tid=o5z class=MyWorker jid=7213cedf4c8353e8cf2743f7 INFO: start
2022-01-17T10:34:42.714Z pid=14379 tid=o6j class=MyWorker jid=b1a523be56815b68425c5814 INFO: start
2022-01-17T10:34:46.845Z pid=14379 tid=o73 class=MyWorker jid=991624dca5f5c748faf0ed54 DEBUG: MyWorker result
2022-01-17T10:34:46.845Z pid=14379 tid=o73 class=MyWorker jid=991624dca5f5c748faf0ed54 elapsed=5.081 INFO: done
2022-01-17T10:34:51.903Z pid=14379 tid=o5z class=MyWorker jid=7213cedf4c8353e8cf2743f7 DEBUG: MyWorker result
2022-01-17T10:34:51.905Z pid=14379 tid=o5z class=MyWorker jid=7213cedf4c8353e8cf2743f7 elapsed=9.707 INFO: done
2022-01-17T10:34:56.927Z pid=14379 tid=o6j class=MyWorker jid=b1a523be56815b68425c5814 DEBUG: MyWorker result
2022-01-17T10:34:56.928Z pid=14379 tid=o6j class=MyWorker jid=b1a523be56815b68425c5814 elapsed=14.213 INFO: done

Summary

Not only did we implement unique Sidekiq jobs in Ruby from scratch, but we also added a concurrency control layer for running jobs sequentially for use cases where that is needed. Additionally, we can extend the above for advisory locks on the database with minimal code changes. This can be a topic for a follow-up article.