12
votes

I have some update triggers which push jobs onto the Sidekiq queue. So in some cases, there can be multiple jobs to process the same object.

There are a couple of uniqueness plugins ("Middleware", Unique Jobs), they're not documented much, but they seem to be more like throttlers to prevent repeat processing; what I want is a throttler that prevents repeat creating of the same jobs. That way, an object will always be processed in its freshest state. Is there a plugin or technique for this?


Update: I didn't have time to make a middleware, but I ended up with a related cleanup function to ensure queues are unique: https://gist.github.com/mahemoff/bf419c568c525f0af903

4
Not to troll but one of the assumptions of Sidekiq is that the job is idempotent which is exactly the problem you're complaining about.engineerDave
I'm not worried about a repeat job causing some unwanted consequence; I'm worried about performance. Identical jobs means wasted cycles. e.g. If an object is changed and a job added to the queue, then the object changes again while the job is still on the queue, there's no point of executing both identical jobs.mahemoff
Is that intuition telling you its an optimization problem or a benchmarks proving a performance bottleneck? As Sidekiq runs its jobs concurrently, and in a non blocking fashion, the jobs are executed in parallel with little overhead. Doing an operation to find the unique jobs may chew up more cycles or cause a blocking operation that would slow you down more than a few duplicate operations executing in threads. Again you never know until you have benchmarks. Either way, I wish you luck!engineerDave
Thanks Dave! When you say "little overhead", you're referring to Sidekiq's effort, but if the job itself requires substantial network activity and grunt work, the savings can be huge. I mean there's a reason why these jobs are being deferred after all, some of them can be heavy.mahemoff
Sorry for any confusion, by little overhead I meant low memory profile and non blocking in the context that it's a background operation.engineerDave

4 Answers

8
votes

What about a simple client middleware?

module Sidekiq
  class UniqueMiddleware

    def call(worker_class, msg, queue_name, redis_pool)
      if msg["unique"]
        queue = Sidekiq::Queue.new(queue_name)
        queue.each do |job|
          if job.klass == msg['class'] && job.args == msg['args']
            return false
          end
        end
      end

      yield

    end
  end
end

Just register it

  Sidekiq.configure_client do |config|
    config.client_middleware do |chain|
      chain.add Sidekiq::UniqueMiddleware
    end
  end

Then in your job just set unique: true in sidekiq_options when needed

3
votes

My suggestion is to search for prior scheduled jobs based on some select criteria and delete, before scheduling a new one. This has been useful for me when i want a single scheduled job for a particular Object, and/or one of its methods.

Some example methods in this context:

 find_jobs_for_object_by_method(klass, method)

  jobs = Sidekiq::ScheduledSet.new

  jobs.select { |job|
    job.klass == 'Sidekiq::Extensions::DelayedClass' &&
        ((job_klass, job_method, args) = YAML.load(job.args[0])) &&
        job_klass == klass &&
        job_method == method
  }

end

##
# delete job(s) specific to a particular class,method,particular record
# will only remove djs on an object for that method
#
def self.delete_jobs_for_object_by_method(klass, method, id)

  jobs = Sidekiq::ScheduledSet.new
  jobs.select do |job|
    job.klass == 'Sidekiq::Extensions::DelayedClass' &&
        ((job_klass, job_method, args) = YAML.load(job.args[0])) &&
        job_klass == klass &&
        job_method == method  &&
        args[0] == id
  end.map(&:delete)

end

##
# delete job(s) specific to a particular class and particular record
# will remove any djs on that Object
#
def self.delete_jobs_for_object(klass, id)

  jobs = Sidekiq::ScheduledSet.new
  jobs.select do |job|
    job.klass == 'Sidekiq::Extensions::DelayedClass' &&
        ((job_klass, job_method, args) = YAML.load(job.args[0])) &&
        job_klass == klass &&
        args[0] == id
  end.map(&:delete)

end
2
votes

Take a look at this: https://github.com/mhenrixon/sidekiq-unique-jobs

It's sidekiq with unique jobs added

0
votes

Maybe you could use Queue Classic which enqueues jobs on a Postgres database (in a really open way), so it could be extended (open-source) to check for uniqueness before doing so.