ActiveRecord::Base
# File lib/delayed/job.rb, line 36 def self.clear_locks! update_all("locked_by = null, locked_at = null", ["locked_by = ?", worker_name]) end
# File lib/delayed/job.rb, line 79 def self.enqueue(*args, &block) object = block_given? ? EvaledJob.new(&block) : args.shift unless object.respond_to?(:perform) || block_given? raise ArgumentError, 'Cannot enqueue items which do not respond to perform' end priority = args.first || 0 run_at = args.second Job.create(:payload_object => object, :priority => priority.to_i, :run_at => run_at) end
# File lib/delayed/job.rb, line 92 def self.find_available(limit = 5, max_run_time = MAX_RUN_TIME) time_now = db_time_now sql = NextTaskSQL.dup conditions = [time_now, time_now - max_run_time, worker_name] if self.min_priority sql << ' AND (priority >= ?)' conditions << min_priority end if self.max_priority sql << ' AND (priority <= ?)' conditions << max_priority end conditions.unshift(sql) records = ActiveRecord::Base.silence do find(:all, :conditions => conditions, :order => NextTaskOrder, :limit => limit) end records.sort { rand() } end
Moved into its own method so that new_relic can trace it.
# File lib/delayed/job.rb, line 199 def self.invoke_job(job, &block) block.call(job) end
This is a good hook if you need to report job processing errors in additional or different ways
# File lib/delayed/job.rb, line 173 def self.log_exception(job, error) logger.error "* [JOB] #{job.name} failed with #{error.class.name}: #{error.message} - #{job.attempts} failed attempts" logger.error(error) end
Get the payload of the next job we can get an exclusive lock on. If no jobs are left we return nil
# File lib/delayed/job.rb, line 121 def self.reserve(max_run_time = MAX_RUN_TIME, &block) # We get up to 5 jobs from the db. In face we cannot get exclusive access to a job we try the next. # this leads to a more even distribution of jobs across the worker processes find_available(5, max_run_time).each do |job| begin logger.info "* [JOB] aquiring lock on #{job.name}" job.lock_exclusively!(max_run_time, worker_name) runtime = Benchmark.realtime do invoke_job(job.payload_object, &block) job.destroy end logger.info "* [JOB] #{job.name} completed after %.4f" % runtime return job rescue LockError # We did not get the lock, some other worker process must have logger.warn "* [JOB] failed to aquire exclusive lock for #{job.name}" rescue StandardError => e job.reschedule e.message, e.backtrace log_exception(job, e) return job end end nil end
# File lib/delayed/job.rb, line 40 def failed? failed_at end
This method is used internally by reserve method to ensure exclusive access to the given job. It will rise a LockError if it cannot get this lock.
# File lib/delayed/job.rb, line 151 def lock_exclusively!(max_run_time, worker = worker_name) now = self.class.db_time_now affected_rows = if locked_by != worker # We don't own this job so we will update the locked_by name and the locked_at self.class.update_all(["locked_at = ?, locked_by = ?", now, worker], ["id = ? and (locked_at is null or locked_at < ?)", id, (now - max_run_time.to_i)]) else # We already own this job, this may happen if the job queue crashes. # Simply resume and update the locked_at self.class.update_all(["locked_at = ?", now], ["id = ? and locked_by = ?", id, worker]) end raise LockError.new("Attempted to aquire exclusive lock failed") unless affected_rows == 1 self.locked_at = now self.locked_by = worker end
# File lib/delayed/job.rb, line 49 def name @name ||= begin payload = payload_object if payload.respond_to?(:display_name) payload.display_name else payload.class.name end end end
# File lib/delayed/job.rb, line 45 def payload_object @payload_object ||= deserialize(self['handler']) end
# File lib/delayed/job.rb, line 60 def payload_object=(object) self['handler'] = object.to_yaml end
# File lib/delayed/job.rb, line 64 def reschedule(message, backtrace = [], time = nil) if self.attempts < MAX_ATTEMPTS time ||= Job.db_time_now + (attempts ** 4) + 5 self.attempts += 1 self.run_at = time self.last_error = message + "\n" + backtrace.join("\n") self.unlock save! else logger.info "* [JOB] PERMANENTLY removing #{self.name} because of #{attempts} consequetive failures." destroy_failed_jobs ? destroy : update_attribute(:failed_at, Time.now) end end
Generated with the Darkfish Rdoc Generator 2.