How to constrain delayed_job processing based on kubernetes cluster

11/2/2020

I am looking for a way to isolate which of my review environments process which jobs.

We are using delayed_job and am running some kubernetes alias clusters based on a master cluster.

Is this at all possible? I found a way to prefix the worker's name simply, but I can't find a way to pass this on to the actual job.

Any help is appreciated.

The way I figured it should work is something like this.

I'm not sure if this is the right way to go, perhaps the same thing could be achieved using the lifecycle events? I just add a column and use the lifecycle events to add the data and query it?

Crossposted to collectiveidea/delayed_job/issues/1125

-- mhenrixon
delayed-job
kubernetes

1 Answer

11/3/2020

Eventually, I ended up with the following solution. Add a varchar column named cluster to the delayed_jobs table and BOOM. Works like a charm.

require 'delayed/backend/active_record'

module Delayed
  module Backend
    module ActiveRecord
      class Configuration
        attr_accessor :cluster
      end

      # A job object that is persisted to the database.
      # Contains the work object as a YAML field.
      class Job < ::ActiveRecord::Base
        READY_SQL = <<~SQL.squish.freeze
          ((cluster = ? AND run_at <= ? AND (locked_at IS NULL OR locked_at < ?)) OR locked_by = ?) AND failed_at IS NULL
        SQL

        before_save :set_cluster

        def self.ready_to_run(worker_name, max_run_time)
          where(READY_SQL, cluster, db_time_now, db_time_now - max_run_time, worker_name)
        end

        # When a worker is exiting, make sure we don't have any locked jobs.
        def self.clear_locks!(worker_name)
          where(cluster: cluster, locked_by: worker_name)
            .update_all(locked_by: nil, locked_at: nil) # rubocop:disable Rails/SkipsModelValidations
        end

        def self.cluster
          Delayed::Backend::ActiveRecord.configuration.cluster
        end

        def set_cluster
          self.cluster ||= self.class.cluster
        end
      end
    end
  end
end

Delayed::Backend::ActiveRecord.configuration.cluster = ENV['CLUSTER'] if ENV['CLUSTER']
-- mhenrixon
Source: StackOverflow