Class: Rage::Deferred::Backends::Disk

Inherits:
Object
  • Object
show all
Defined in:
lib/rage/deferred/backends/disk.rb

Overview

Rage::Deferred::Backends implements a storage layer to persist deferred tasks. A storage should implement the following instance methods:

  • add - called when a task has to be added to the storage;

  • remove - called when a task has to be removed from the storage;

  • pending_tasks - the method should iterate over the underlying storage and return a list of tasks to replay;

Constant Summary collapse

STORAGE_VERSION =
"0"
STORAGE_SIZE_INCREASE_RATIO =
1.5
DEFAULT_PUBLISH_AT =
"0"
DEFAULT_STORAGE_SIZE_LIMIT =
2_000_000

Instance Method Summary collapse

Constructor Details

#initialize(path:, prefix:, fsync_frequency:) ⇒ Disk

Returns a new instance of Disk.



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/rage/deferred/backends/disk.rb', line 20

def initialize(path:, prefix:, fsync_frequency:)
  @storage_path = path
  @storage_prefix = "#{prefix}#{STORAGE_VERSION}"
  @fsync_frequency = fsync_frequency

  @storage_path.mkpath

  # try to open and take ownership of all storage files in the storage directory
  storage_files = @storage_path.glob("#{@storage_prefix}-*").filter_map do |file_path|
    file = file_path.open("a+b")
    if file.flock(File::LOCK_EX | File::LOCK_NB)
      sleep 0.01 # reduce contention between workers
      file
    else
      file.close
    end
  end

  # if there are no storage files - create one;
  # otherwise the first one is used as the main storage; the rest will be merged into the main storage
  if storage_files.empty?
    @storage = create_storage
  else
    @storage = storage_files[0]
    @recovered_storages = storage_files[1..] if storage_files.length > 1
  end

  # create seed value for the task IDs
  task_id_seed = Time.now.to_i # TODO: ensure timestamps in the file are not higher
  @task_id_base, @task_id_i = "#{task_id_seed}-#{Process.pid}", 0
  Iodine.run_every(1_000) do
    task_id_seed += 1
    @task_id_base, @task_id_i = "#{task_id_seed}-#{Process.pid}", 0
  end

  @storage_size_limit = DEFAULT_STORAGE_SIZE_LIMIT
  @storage_size = @storage.size
  @fsync_scheduled = false
  @should_rotate = false

  # we use different counters for different tasks:
  # delayed tasks are stored in the hash; for regular tasks we only maintain a counter;
  # this information is only used during storage rotation
  @immediate_tasks_in_queue = 0
  @delayed_tasks = {}

  # ensure data is written to disk
  @storage_has_changes = false
  Iodine.run_every(@fsync_frequency) do
    if @storage_has_changes
      @storage_has_changes = false
      @storage.fsync
    end
  end
end

Instance Method Details

#add(task, publish_at: nil, task_id: nil) ⇒ String

Add a record to the log representing a new task.

Parameters:

  • task (Rage::Deferred::Task)
  • publish_at (Integer, nil) (defaults to: nil)
  • task_id (String, nil) (defaults to: nil)

Returns:

  • (String)


81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/rage/deferred/backends/disk.rb', line 81

def add(task, publish_at: nil, task_id: nil)
  serialized_task = Marshal.dump(task).dump

  persisted_task_id = task_id || generate_task_id

  entry = build_add_entry(persisted_task_id, serialized_task, publish_at)
  write_to_storage(entry)

  if publish_at
    @delayed_tasks[persisted_task_id] = [serialized_task, publish_at]
  else
    @immediate_tasks_in_queue += 1
  end

  persisted_task_id
end

#pending_tasksArray<(String, Rage::Deferred::Task, Integer, Integer)>

Return a list of pending tasks in the storage.

Returns:

  • (Array<(String, Rage::Deferred::Task, Integer, Integer)>)

    Array<(String, Rage::Deferred::Task, Integer, Integer)>



115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# File 'lib/rage/deferred/backends/disk.rb', line 115

def pending_tasks
  if @recovered_storages
    # `@recovered_storages` will only be present if the server has previously crashed and left
    # some storage files behind, or if the new cluster is started with fewer workers than before;
    # TLDR: this code is expected to execute very rarely
    @recovered_storages.each { |storage| recover_tasks(storage) }
  end

  tasks = {}
  corrupted_tasks_count = 0

  # find pending tasks in the storage
  @storage.tap(&:rewind).each_line(chomp: true) do |entry|
    signature, op, payload = entry[0...8], entry[9...12], entry[9..]
    next if signature&.empty? || payload&.empty? || op&.empty?

    unless signature == Zlib.crc32(payload).to_s(16).rjust(8, "0")
      corrupted_tasks_count += 1
      next
    end

    if op == "add"
      task_id = entry[13...entry.index(":", 13).to_i]
      tasks[task_id] = entry
    elsif op == "rem"
      task_id = entry[13..]
      tasks.delete(task_id)
    end
  end

  if corrupted_tasks_count != 0
    puts "WARNING: Detected #{corrupted_tasks_count} corrupted deferred task(s)"
  end

  tasks.filter_map do |task_id, entry|
    _, _, _, serialized_publish_at, serialized_task = entry.split(":", 5)

    task = Marshal.load(serialized_task.undump)

    publish_at = (serialized_publish_at == DEFAULT_PUBLISH_AT ? nil : serialized_publish_at.to_i)

    if publish_at
      @delayed_tasks[task_id] = [serialized_task, publish_at]
    else
      @immediate_tasks_in_queue += 1
    end

    [task_id, task, publish_at]

  rescue ArgumentError, NameError => e
    puts "ERROR: Can't deserialize the task with id #{task_id}: (#{e.class}) #{e.message}"
    nil
  end
end

#remove(task_id) ⇒ Object

Add a record to the log representing a task removal.

Parameters:

  • task_id (String)


100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/rage/deferred/backends/disk.rb', line 100

def remove(task_id)
  write_to_storage(build_remove_entry(task_id))

  if @delayed_tasks.has_key?(task_id)
    @delayed_tasks.delete(task_id)
  else
    @immediate_tasks_in_queue -= 1
  end

  # rotate the storage once the size is over the limit and all non-delayed tasks are processed
  rotate_storage if @should_rotate && @immediate_tasks_in_queue == 0
end