Parent

Namespace

Delayed::Job

Public Class Methods

clear_locks!() click to toggle source
# File lib/delayed/job.rb, line 36
def self.clear_locks!
  update_all("locked_by = null, locked_at = null", ["locked_by = ?", worker_name])
end
enqueue(*args, &block) click to toggle source
# File lib/delayed/job.rb, line 79
def self.enqueue(*args, &block)
  object = block_given? ? EvaledJob.new(&block) : args.shift

  unless object.respond_to?(:perform) || block_given?
    raise ArgumentError, 'Cannot enqueue items which do not respond to perform'
  end

  priority = args.first || 0
  run_at   = args.second

  Job.create(:payload_object => object, :priority => priority.to_i, :run_at => run_at)
end
find_available(limit = 5, max_run_time = MAX_RUN_TIME) click to toggle source
# File lib/delayed/job.rb, line 92
def self.find_available(limit = 5, max_run_time = MAX_RUN_TIME)

  time_now = db_time_now

  sql = NextTaskSQL.dup

  conditions = [time_now, time_now - max_run_time, worker_name]

  if self.min_priority
    sql << ' AND (priority >= ?)'
    conditions << min_priority
  end

  if self.max_priority
    sql << ' AND (priority <= ?)'
    conditions << max_priority
  end

  conditions.unshift(sql)

  records = ActiveRecord::Base.silence do
    find(:all, :conditions => conditions, :order => NextTaskOrder, :limit => limit)
  end

  records.sort { rand() }
end
invoke_job(job, &block) click to toggle source

Moved into its own method so that new_relic can trace it.

# File lib/delayed/job.rb, line 199
def self.invoke_job(job, &block)
  block.call(job)
end
log_exception(job, error) click to toggle source

This is a good hook if you need to report job processing errors in additional or different ways

# File lib/delayed/job.rb, line 173
def self.log_exception(job, error)
  logger.error "* [JOB] #{job.name} failed with #{error.class.name}: #{error.message} - #{job.attempts} failed attempts"
  logger.error(error)
end
reserve(max_run_time = MAX_RUN_TIME, &block) click to toggle source

Get the payload of the next job we can get an exclusive lock on. If no jobs are left we return nil

# File lib/delayed/job.rb, line 121
def self.reserve(max_run_time = MAX_RUN_TIME, &block)

  # We get up to 5 jobs from the db. In face we cannot get exclusive access to a job we try the next.
  # this leads to a more even distribution of jobs across the worker processes
  find_available(5, max_run_time).each do |job|
    begin
      logger.info "* [JOB] aquiring lock on #{job.name}"
      job.lock_exclusively!(max_run_time, worker_name)
      runtime =  Benchmark.realtime do
        invoke_job(job.payload_object, &block)
        job.destroy
      end
      logger.info "* [JOB] #{job.name} completed after %.4f" % runtime

      return job
    rescue LockError
      # We did not get the lock, some other worker process must have
      logger.warn "* [JOB] failed to aquire exclusive lock for #{job.name}"
    rescue StandardError => e
      job.reschedule e.message, e.backtrace
      log_exception(job, e)
      return job
    end
  end

  nil
end
work_off(num = 100) click to toggle source
# File lib/delayed/job.rb, line 178
def self.work_off(num = 100)
  success, failure = 0, 0

  num.times do
    job = self.reserve do |j|
      begin
        j.perform
        success += 1
      rescue
        failure += 1
        raise
      end
    end

    break if job.nil?
  end

  return [success, failure]
end

Public Instance Methods

failed() click to toggle source
Alias for: failed?
failed?() click to toggle source
# File lib/delayed/job.rb, line 40
def failed?
  failed_at
end
Also aliased as: failed
lock_exclusively!(max_run_time, worker = worker_name) click to toggle source

This method is used internally by reserve method to ensure exclusive access to the given job. It will rise a LockError if it cannot get this lock.

# File lib/delayed/job.rb, line 151
def lock_exclusively!(max_run_time, worker = worker_name)
  now = self.class.db_time_now
  affected_rows = if locked_by != worker
    # We don't own this job so we will update the locked_by name and the locked_at
    self.class.update_all(["locked_at = ?, locked_by = ?", now, worker], ["id = ? and (locked_at is null or locked_at < ?)", id, (now - max_run_time.to_i)])
  else
    # We already own this job, this may happen if the job queue crashes.
    # Simply resume and update the locked_at
    self.class.update_all(["locked_at = ?", now], ["id = ? and locked_by = ?", id, worker])
  end
  raise LockError.new("Attempted to aquire exclusive lock failed") unless affected_rows == 1

  self.locked_at    = now
  self.locked_by    = worker
end
name() click to toggle source
# File lib/delayed/job.rb, line 49
def name
  @name ||= begin
    payload = payload_object
    if payload.respond_to?(:display_name)
      payload.display_name
    else
      payload.class.name
    end
  end
end
payload_object() click to toggle source
# File lib/delayed/job.rb, line 45
def payload_object
  @payload_object ||= deserialize(self['handler'])
end
payload_object=(object) click to toggle source
# File lib/delayed/job.rb, line 60
def payload_object=(object)
  self['handler'] = object.to_yaml
end
reschedule(message, backtrace = [], time = nil) click to toggle source
# File lib/delayed/job.rb, line 64
def reschedule(message, backtrace = [], time = nil)
  if self.attempts < MAX_ATTEMPTS
    time ||= Job.db_time_now + (attempts ** 4) + 5

    self.attempts    += 1
    self.run_at       = time
    self.last_error   = message + "\n" + backtrace.join("\n")
    self.unlock
    save!
  else
    logger.info "* [JOB] PERMANENTLY removing #{self.name} because of #{attempts} consequetive failures."
    destroy_failed_jobs ? destroy : update_attribute(:failed_at, Time.now)
  end
end
unlock() click to toggle source
# File lib/delayed/job.rb, line 167
def unlock
  self.locked_at    = nil
  self.locked_by    = nil
end

Protected Instance Methods

before_save() click to toggle source
# File lib/delayed/job.rb, line 237
def before_save
  self.run_at ||= self.class.db_time_now
end

[Validate]

Generated with the Darkfish Rdoc Generator 2.