Class: Rage::Deferred::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/rage/deferred/queue.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(backend) ⇒ Queue

Returns a new instance of Queue.



6
7
8
9
10
# File 'lib/rage/deferred/queue.rb', line 6

def initialize(backend)
  @backend = backend
  @backlog_size = 0
  @backpressure = Rage.config.deferred.backpressure
end

Instance Attribute Details

#backlog_sizeObject (readonly)

Returns the value of attribute backlog_size.



4
5
6
# File 'lib/rage/deferred/queue.rb', line 4

def backlog_size
  @backlog_size
end

Instance Method Details

#enqueue(task_metadata, delay: nil, delay_until: nil, task_id: nil) ⇒ Object

Write the task to the storage and schedule it for execution.



13
14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/rage/deferred/queue.rb', line 13

def enqueue(, delay: nil, delay_until: nil, task_id: nil)
  apply_backpressure if @backpressure

  publish_in, publish_at = if delay
    delay_i = delay.to_i
    [delay_i, Time.now.to_i + delay_i] if delay_i > 0
  elsif delay_until
    delay_until_i, current_time_i = delay_until.to_i, Time.now.to_i
    [delay_until_i - current_time_i, delay_until_i] if delay_until_i > current_time_i
  end

  persisted_task_id = @backend.add(, publish_at:, task_id:)
  schedule(persisted_task_id, , publish_in:)
end

#schedule(task_id, task_metadata, publish_in: nil) ⇒ Object

Schedule the task for execution.



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/rage/deferred/queue.rb', line 29

def schedule(task_id, , publish_in: nil)
  publish_in_ms = publish_in.to_i * 1_000 if publish_in && publish_in > 0
  task = Rage::Deferred::Metadata.get_task()
  @backlog_size += 1 unless publish_in_ms

  Iodine.run_after(publish_in_ms) do
    @backlog_size -= 1 unless publish_in_ms

    unless Iodine.stopping?
      Fiber.schedule do
        Iodine.task_inc!

        is_completed = task.new.__perform()

        if is_completed
          @backend.remove(task_id)
        else
          attempts = Rage::Deferred::Metadata.inc_attempts()
          if task.__should_retry?(attempts)
            enqueue(, delay: task.__next_retry_in(attempts), task_id:)
          else
            @backend.remove(task_id)
          end
        end

      ensure
        Iodine.task_dec!
      end
    end
  end
end