How do I use ActiveJob to track the progress of long-running remote tasks?
Yesterday I wrote a post about initializing AWS Elemental MediaConvert job. It's a common case that you initiate a task and have to wait some unspecified time for the result. All what you have is an external system job id.
In such cases, I use a very simple pattern based on ActiveJob's retry_on
and discard_on
methods.
retry_on
reschedules job for re-execution after specific delay, for a specific number of attempts. What I like about it is that this mechanism catches exceptions prior to your underlying queuing system (Sidekiq, for instance).
In practical terms, that means you have a different retry interval when you just wait for the task to be finished than when, for example, a timeout error occurs.
You can also pass a block that will be invoked if the retry attempts fail rather than let the exception bubble up. It's a good place to inform developers that the task wasn't finished in a rational time, or at least log it.
discard_on
is to specify an exception discarding the job. In my example, I don't want to check progress any more if the task gets canceled in the external system. Again, this is the case that should be noticed.
So all you have to do is pass a block that will be invoked.
The following example references the implementation of MediaConverterJobInspector you can find in the previous post .
module Converting
class TrackConvertJobOnConvertJobInitialized < ActiveJob::Base
prepend RailsEventStore::AsyncHandler
class TrackConvertJobError < StandardError
attr_reader :s3_key, :media_convert_job_id
def initialize(s3_key:, media_convert_job_id:)
super
@s3_key = s3_key
@media_convert_job_id = media_convert_job_id
end
end
ConvertJobCanceled = Class.new(TrackConvertJobError)
ConvertJobNotFinishedYet = Class.new(TrackConvertJobError)
TIME_INTERVAL = 20.seconds
TIME_LIMIT = 1.hour
retry_on(ConvertJobNotFinishedYet, wait: TIME_INTERVAL, attempts: TIME_LIMIT / TIME_INTERVAL) do |job, error|
Rails.configuration.event_store.publish(
ContentConvertJobTimeLimitExceeded.new(
data: {
media_convert_job_id: error.media_convert_job_id,
s3_key: error.s3_key
}
)
)
end
discard_on ConvertJobCanceled do |job, error|
Rails.configuration.event_store.publish(
ContentConvertJobCanceled.new(
data: {
media_convert_job_id: error.media_convert_job_id,
s3_key: error.s3_key
}
)
)
end
def perform(event)
s3_key = event.data[:s3_key]
media_convert_job_id = event.data[:media_convert_job_id]
inspector = job_inspector_class.new(media_convert_job_id)
inspector.call
case inspector.status
when "COMPLETE"
event_store.publish ContentConvertJobCompleted.new(data: { media_convert_job_id: media_convert_job_id, s3_key: s3_key })
when "ERROR"
event_store.publish ContentConvertJobFailed.new(data: { media_convert_job_id: media_convert_job_id, s3_key: s3_key })
when "CANCELED"
raise ConvertJobCanceled.new(s3_key: s3_key, media_convert_job_id: media_convert_job_id)
else
raise ConvertJobNotFinishedYet.new(s3_key: s3_key, media_convert_job_id: media_convert_job_id)
end
end
private
def event_store
Rails.configuration.event_store
end
def job_inspector_class
Rails.env.test? ? FakeConvertJobInspector : MediaConverterJobInspector
end
end
end