Class: Rage::Deferred::Queue
- Inherits:
-
Object
- Object
- Rage::Deferred::Queue
- Defined in:
- lib/rage/deferred/queue.rb
Instance Attribute Summary collapse
-
#backlog_size ⇒ Object
readonly
Returns the value of attribute backlog_size.
Instance Method Summary collapse
-
#enqueue(task_metadata, delay: nil, delay_until: nil, task_id: nil) ⇒ Object
Write the task to the storage and schedule it for execution.
-
#initialize(backend) ⇒ Queue
constructor
A new instance of Queue.
-
#schedule(task_id, task_metadata, publish_in: nil) ⇒ Object
Schedule the task for execution.
Constructor Details
Instance Attribute Details
#backlog_size ⇒ Object (readonly)
Returns the value of attribute backlog_size.
4 5 6 |
# File 'lib/rage/deferred/queue.rb', line 4 def backlog_size @backlog_size end |
Instance Method Details
#enqueue(task_metadata, delay: nil, delay_until: nil, task_id: nil) ⇒ Object
Write the task to the storage and schedule it for execution.
13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
# File 'lib/rage/deferred/queue.rb', line 13 def enqueue(, delay: nil, delay_until: nil, task_id: nil) apply_backpressure if @backpressure publish_in, publish_at = if delay delay_i = delay.to_i [delay_i, Time.now.to_i + delay_i] if delay_i > 0 elsif delay_until delay_until_i, current_time_i = delay_until.to_i, Time.now.to_i [delay_until_i - current_time_i, delay_until_i] if delay_until_i > current_time_i end persisted_task_id = @backend.add(, publish_at:, task_id:) schedule(persisted_task_id, , publish_in:) end |
#schedule(task_id, task_metadata, publish_in: nil) ⇒ Object
Schedule the task for execution.
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/rage/deferred/queue.rb', line 29 def schedule(task_id, , publish_in: nil) publish_in_ms = publish_in.to_i * 1_000 if publish_in && publish_in > 0 task = Rage::Deferred::Metadata.get_task() @backlog_size += 1 unless publish_in_ms Iodine.run_after(publish_in_ms) do @backlog_size -= 1 unless publish_in_ms unless Iodine.stopping? Fiber.schedule do Iodine.task_inc! is_completed = task.new.__perform() if is_completed @backend.remove(task_id) else attempts = Rage::Deferred::Metadata.inc_attempts() if task.__should_retry?(attempts) enqueue(, delay: task.__next_retry_in(attempts), task_id:) else @backend.remove(task_id) end end ensure Iodine.task_dec! end end end end |