How do I use ActiveJob to track the progress of long-running remote tasks?

Yesterday I wrote a post about initializing AWS Elemental MediaConvert job. It's a common case that you initiate a task and have to wait some unspecified time for the result. All what you have is an external system job id.

In such cases, I use a very simple pattern based on ActiveJob's retry_on and discard_on methods.

retry_on reschedules job for re-execution after specific delay, for a specific number of attempts. What I like about it is that this mechanism catches exceptions prior to your underlying queuing system (Sidekiq, for instance). In practical terms, that means you have a different retry interval when you just wait for the task to be finished than when, for example, a timeout error occurs.

You can also pass a block that will be invoked if the retry attempts fail rather than let the exception bubble up. It's a good place to inform developers that the task wasn't finished in a rational time, or at least log it.

discard_on is to specify an exception discarding the job. In my example, I don't want to check progress any more if the task gets canceled in the external system. Again, this is the case that should be noticed. So all you have to do is pass a block that will be invoked.

The following example references the implementation of MediaConverterJobInspector you can find in the previous post .

module Converting
  class TrackConvertJobOnConvertJobInitialized < ActiveJob::Base
    prepend RailsEventStore::AsyncHandler

    class TrackConvertJobError < StandardError
      attr_reader :s3_key, :media_convert_job_id

      def initialize(s3_key:, media_convert_job_id:)
        super
        @s3_key = s3_key
        @media_convert_job_id = media_convert_job_id
      end
    end

    ConvertJobCanceled = Class.new(TrackConvertJobError)
    ConvertJobNotFinishedYet = Class.new(TrackConvertJobError)

    TIME_INTERVAL = 20.seconds
    TIME_LIMIT = 1.hour

    retry_on(ConvertJobNotFinishedYet, wait: TIME_INTERVAL, attempts: TIME_LIMIT / TIME_INTERVAL) do |job, error|
      Rails.configuration.event_store.publish(
        ContentConvertJobTimeLimitExceeded.new(
          data: {
            media_convert_job_id: error.media_convert_job_id,
            s3_key: error.s3_key
          }
        )
      )
    end

    discard_on ConvertJobCanceled do |job, error|
      Rails.configuration.event_store.publish(
        ContentConvertJobCanceled.new(
          data: {
            media_convert_job_id: error.media_convert_job_id,
            s3_key: error.s3_key
          }
        )
      )
    end

    def perform(event)
      s3_key = event.data[:s3_key]
      media_convert_job_id = event.data[:media_convert_job_id]
      inspector = job_inspector_class.new(media_convert_job_id)
      inspector.call
      case inspector.status
      when "COMPLETE"
        event_store.publish ContentConvertJobCompleted.new(data: { media_convert_job_id: media_convert_job_id, s3_key: s3_key })
      when "ERROR"
        event_store.publish ContentConvertJobFailed.new(data: { media_convert_job_id: media_convert_job_id, s3_key: s3_key })
      when "CANCELED"
        raise ConvertJobCanceled.new(s3_key: s3_key, media_convert_job_id: media_convert_job_id)
      else
        raise ConvertJobNotFinishedYet.new(s3_key: s3_key, media_convert_job_id: media_convert_job_id)
      end
    end

    private

    def event_store
      Rails.configuration.event_store
    end

    def job_inspector_class
      Rails.env.test? ? FakeConvertJobInspector : MediaConverterJobInspector
    end
  end
end