Class: Rage::Deferred::Backends::Disk
- Inherits:
-
Object
- Object
- Rage::Deferred::Backends::Disk
- Defined in:
- lib/rage/deferred/backends/disk.rb
Overview
Rage::Deferred::Backends
implements a storage layer to persist deferred tasks. A storage should implement the following instance methods:
-
add
- called when a task has to be added to the storage; -
remove
- called when a task has to be removed from the storage; -
pending_tasks
- the method should iterate over the underlying storage and return a list of tasks to replay;
Constant Summary collapse
- STORAGE_VERSION =
"0"
- STORAGE_SIZE_INCREASE_RATIO =
1.5
- DEFAULT_PUBLISH_AT =
"0"
- DEFAULT_STORAGE_SIZE_LIMIT =
2_000_000
Instance Method Summary collapse
-
#add(task, publish_at: nil, task_id: nil) ⇒ String
Add a record to the log representing a new task.
-
#initialize(path:, prefix:, fsync_frequency:) ⇒ Disk
constructor
A new instance of Disk.
-
#pending_tasks ⇒ Array<(String, Rage::Deferred::Task, Integer, Integer)>
Return a list of pending tasks in the storage.
-
#remove(task_id) ⇒ Object
Add a record to the log representing a task removal.
Constructor Details
#initialize(path:, prefix:, fsync_frequency:) ⇒ Disk
Returns a new instance of Disk.
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/rage/deferred/backends/disk.rb', line 20 def initialize(path:, prefix:, fsync_frequency:) @storage_path = path @storage_prefix = "#{prefix}#{STORAGE_VERSION}" @fsync_frequency = fsync_frequency @storage_path.mkpath # try to open and take ownership of all storage files in the storage directory storage_files = @storage_path.glob("#{@storage_prefix}-*").filter_map do |file_path| file = file_path.open("a+b") if file.flock(File::LOCK_EX | File::LOCK_NB) sleep 0.01 # reduce contention between workers file else file.close end end # if there are no storage files - create one; # otherwise the first one is used as the main storage; the rest will be merged into the main storage if storage_files.empty? @storage = create_storage else @storage = storage_files[0] @recovered_storages = storage_files[1..] if storage_files.length > 1 end # create seed value for the task IDs task_id_seed = Time.now.to_i # TODO: ensure timestamps in the file are not higher @task_id_base, @task_id_i = "#{task_id_seed}-#{Process.pid}", 0 Iodine.run_every(1_000) do task_id_seed += 1 @task_id_base, @task_id_i = "#{task_id_seed}-#{Process.pid}", 0 end @storage_size_limit = DEFAULT_STORAGE_SIZE_LIMIT @storage_size = @storage.size @fsync_scheduled = false @should_rotate = false # we use different counters for different tasks: # delayed tasks are stored in the hash; for regular tasks we only maintain a counter; # this information is only used during storage rotation @immediate_tasks_in_queue = 0 @delayed_tasks = {} # ensure data is written to disk @storage_has_changes = false Iodine.run_every(@fsync_frequency) do if @storage_has_changes @storage_has_changes = false @storage.fsync end end end |
Instance Method Details
#add(task, publish_at: nil, task_id: nil) ⇒ String
Add a record to the log representing a new task.
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/rage/deferred/backends/disk.rb', line 81 def add(task, publish_at: nil, task_id: nil) serialized_task = Marshal.dump(task).dump persisted_task_id = task_id || generate_task_id entry = build_add_entry(persisted_task_id, serialized_task, publish_at) write_to_storage(entry) if publish_at @delayed_tasks[persisted_task_id] = [serialized_task, publish_at] else @immediate_tasks_in_queue += 1 end persisted_task_id end |
#pending_tasks ⇒ Array<(String, Rage::Deferred::Task, Integer, Integer)>
Return a list of pending tasks in the storage.
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
# File 'lib/rage/deferred/backends/disk.rb', line 115 def pending_tasks if @recovered_storages # `@recovered_storages` will only be present if the server has previously crashed and left # some storage files behind, or if the new cluster is started with fewer workers than before; # TLDR: this code is expected to execute very rarely @recovered_storages.each { |storage| recover_tasks(storage) } end tasks = {} corrupted_tasks_count = 0 # find pending tasks in the storage @storage.tap(&:rewind).each_line(chomp: true) do |entry| signature, op, payload = entry[0...8], entry[9...12], entry[9..] next if signature&.empty? || payload&.empty? || op&.empty? unless signature == Zlib.crc32(payload).to_s(16).rjust(8, "0") corrupted_tasks_count += 1 next end if op == "add" task_id = entry[13...entry.index(":", 13).to_i] tasks[task_id] = entry elsif op == "rem" task_id = entry[13..] tasks.delete(task_id) end end if corrupted_tasks_count != 0 puts "WARNING: Detected #{corrupted_tasks_count} corrupted deferred task(s)" end tasks.filter_map do |task_id, entry| _, _, _, serialized_publish_at, serialized_task = entry.split(":", 5) task = Marshal.load(serialized_task.undump) publish_at = (serialized_publish_at == DEFAULT_PUBLISH_AT ? nil : serialized_publish_at.to_i) if publish_at @delayed_tasks[task_id] = [serialized_task, publish_at] else @immediate_tasks_in_queue += 1 end [task_id, task, publish_at] rescue ArgumentError, NameError => e puts "ERROR: Can't deserialize the task with id #{task_id}: (#{e.class}) #{e.}" nil end end |
#remove(task_id) ⇒ Object
Add a record to the log representing a task removal.
100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/rage/deferred/backends/disk.rb', line 100 def remove(task_id) write_to_storage(build_remove_entry(task_id)) if @delayed_tasks.has_key?(task_id) @delayed_tasks.delete(task_id) else @immediate_tasks_in_queue -= 1 end # rotate the storage once the size is over the limit and all non-delayed tasks are processed rotate_storage if @should_rotate && @immediate_tasks_in_queue == 0 end |