Zero Downtime Delayed Jobs

Here is a script I created that will do Zero Downtime Delayed Job restarts. This script will spin up a new set of delayed jobs workers and send a signal to the currently running workers to shut down after they are finished working on current work. The current script/delayed_job restart would only wait up to some timeout value and then would kill the job, potentially losing any work that it was working on. I store this in a file called delayed_job_restart.sh. Make sure you

chmod +x delayed_job_restart.sh

Feedback is welcome!

##############
CONFIGURATION
##############

# Set the rails environment.
RAILS_ENV=development

# This is the location of your rails application. (Rails.root)
APP_DIR=/path/to/app

# Whatever queues you want to handle.
QUEUES=queue1,queue2

# Number of delayed job instances.
NUMJOBS=6

########################
DONT EDIT BELOW HERE
UNLESS YOU UNDERSTAND IT
########################
# This is the file that will hold the location of
# the new round of delayed job pids.
NEW_PID_DIR_FILE=$APP_DIR/pids/delayed_job_pid_dir.new

# This is the directory where the new delayed job pids will live. We
# create a random directory.
NEW_PID_DIR=$APP_DIR/pids/delayed_job/`cat /dev/urandom | env LC_CTYPE=C tr -cd ‘a-f0-9’ | head -c 32`

# Make the directory and create the
# pid file.
mkdir -p $NEW_PID_DIR
touch $NEW_PID_DIR_FILE

# Put the pid dir into the pid file.
echo $NEW_PID_DIR > $NEW_PID_DIR_FILE

echo “Starting new delayed job processes…”
cd $APP_DIR
RAILS_ENV=$RAILS_ENV bundle exec script/delayed_job start –queues=$QUEUES –pid-dir=`cat $NEW_PID_DIR_FILE` -n $NUMJOBS
echo “Done.”

# This is the current pid dir.
PID_DIR_FILE=$APP_DIR/pids/delayed_job_pid_dir

# If the pid dir exists, let tell them to shut down
# when they are done.
if [ -f $PID_DIR_FILE ]; then
echo “Sending signal to stop old delayed job processes…”
PID_DIR=`cat $PID_DIR_FILE`
for f in $PID_DIR/*.pid
  do
    PID=`cat $f`
    echo ” TERM to $PID”
    kill -TERM $PID
  done
  echo “Done.”
  echo “Removing old PID directory.”
  rm -rf $PID_DIR
fi

# Since we have a set of new jobs running,
# lets move the pid dir to the “current” dir.
mv $NEW_PID_DIR_FILE $PID_DIR_FILE

How to use Delayed Job to handle your Carrierwave processing

This tutorial builds on my previous post about how to add FFMPEG processing to Carrierwave. Here I will show you my attempt at being able to utilize Delayed::Job to do the heavy lifting of processing when uploading files using Carrierwave. Remember, this could probably use some improvement, but it is a great starting point. So lets begin. The first thing you will need to do is add Delayed::Job to your application:

# Gemfile
gem "delayed_job"

Next you need to create the migration and migrate the database:

rails generate delayed_job
rake db:migrate

Now we get to the good part. Lets create a module to include into Carrierwave that will support holding off on doing the processing until Delayed::Job gets around to it:

# lib/carrier_wave/delayed_job.rb
module CarrierWave
  module Delayed
    module Job
      module ActiveRecordInterface
        def delay_carrierwave
          @delay_carrierwave ||= true
        end

        def delay_carrierwave=(delay)
          @delay_carrierwave = delay
        end

        def perform
          asset_name = self.class.uploader_options.keys.first
          self.send(asset_name).versions.each_pair do |key, value|
            value.process_without_delay!
          end
        end

        private

        def enqueue
          ::Delayed::Job.enqueue self
        end
      end

      def self.included(base)
        base.extend ClassMethods
      end

      module ClassMethods
        def self.extended(base)
          base.send(:include, InstanceMethods)
          base.alias_method_chain :process!, :delay
          ::ActiveRecord::Base.send(
            :include,
            CarrierWave::Delayed::Job::ActiveRecordInterface
          )
        end

        module InstanceMethods
          def process_with_delay!(new_file)
            process_without_delay!(new_file) unless model.delay_carrierwave
          end
        end
      end
    end
  end
end

Awesome! Now we need to tie this into our Uploader:

# app/uploaders/asset_uploader.rb
require File.join(Rails.root, "lib", "carrier_wave", "ffmpeg")
require File.join(Rails.root, "lib", "carrier_wave", "delayed_job") # New

class AssetUploader < CarrierWave::Uploader::Base
  include CarrierWave::Delayed::Job # New
  include CarrierWave::FFMPEG

  # Choose what kind of storage to use for this
  uploader: storage :file

  # Override the directory where uploaded files will be stored.
  # This is a sensible default for uploaders that are meant to be mounted:
  def store_dir
    "#{Rails.root}/uploads/#{model.class.to_s.underscore}/#{mounted_as}/#{model.id}"
  end

  # Add a version, utilizing our processor
  version :bitrate_128k do
    process :resample => "128k"
  end
end

The last thing we have to do is update our model to queue up delayed job:

# app/models/asset.rb
class Asset < ActiveRecord::Base
  mount_uploader :asset, AssetUploader
  
  after_save :enqueue # New
end

There you have it. Now when you create a new Asset, associate a file, and save it, it shouldn’t run the processes, but instead create a Delayed::Job record. Then Delayed::Job should pick it up and run the processors on it. This may not be perfect, but at least its a start! Thanks for reading!