Class: Rage::FiberScheduler
- Inherits:
-
Object
- Object
- Rage::FiberScheduler
- Defined in:
- lib/rage/fiber_scheduler.rb
Constant Summary collapse
- MAX_READ =
65536
Instance Method Summary collapse
-
#address_resolve(hostname) ⇒ Object
result end.
- #block(_blocker, timeout = nil) ⇒ Object
- #close ⇒ Object
- #fiber(&block) ⇒ Object
-
#initialize ⇒ FiberScheduler
constructor
A new instance of FiberScheduler.
- #io_read(io, buffer, length, offset = 0) ⇒ Object
- #io_wait(io, events, timeout = nil) ⇒ Object
- #io_write(io, buffer, length, offset = 0) ⇒ Object
- #kernel_sleep(duration = nil) ⇒ Object
- #unblock(_blocker, fiber) ⇒ Object
Constructor Details
#initialize ⇒ FiberScheduler
Returns a new instance of FiberScheduler.
8 9 10 11 |
# File 'lib/rage/fiber_scheduler.rb', line 8 def initialize @root_fiber = Fiber.current @dns_cache = {} end |
Instance Method Details
#address_resolve(hostname) ⇒ Object
result end
82 83 84 85 86 87 88 89 90 |
# File 'lib/rage/fiber_scheduler.rb', line 82 def address_resolve(hostname) @dns_cache[hostname] ||= begin ::Iodine.run_after(60_000) do @dns_cache[hostname] = nil end Resolv.getaddresses(hostname) end end |
#block(_blocker, timeout = nil) ⇒ Object
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/rage/fiber_scheduler.rb', line 92 def block(_blocker, timeout = nil) f, fulfilled, channel = Fiber.current, false, Fiber.current.__block_channel(true) resume_fiber_block = proc do unless fulfilled fulfilled = true ::Iodine.defer { ::Iodine.unsubscribe(channel) } f.resume end end ::Iodine.subscribe(channel, &resume_fiber_block) if timeout ::Iodine.run_after((timeout * 1000).to_i, &resume_fiber_block) end Fiber.yield end |
#close ⇒ Object
144 145 146 |
# File 'lib/rage/fiber_scheduler.rb', line 144 def close ::Iodine::Scheduler.close end |
#fiber(&block) ⇒ Object
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
# File 'lib/rage/fiber_scheduler.rb', line 115 def fiber(&block) parent = Fiber.current fiber = if parent == @root_fiber # the fiber to wrap a request in Fiber.new(blocking: false) do Fiber.current.__set_id Fiber.current.__set_result(block.call) end else # the fiber was created in the user code logger = Thread.current[:rage_logger] Fiber.new(blocking: false) do Thread.current[:rage_logger] = logger Fiber.current.__set_result(block.call) # send a message for `Fiber.await` to work Iodine.publish("await:#{parent.object_id}", "", Iodine::PubSub::PROCESS) if parent.alive? rescue Exception => e Fiber.current.__set_err(e) Iodine.publish("await:#{parent.object_id}", Fiber::AWAIT_ERROR_MESSAGE, Iodine::PubSub::PROCESS) if parent.alive? end end fiber.resume fiber end |
#io_read(io, buffer, length, offset = 0) ⇒ Object
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/rage/fiber_scheduler.rb', line 25 def io_read(io, buffer, length, offset = 0) length_to_read = if length == 0 buffer.size > MAX_READ ? MAX_READ : buffer.size else length end while true string = ::Iodine::Scheduler.read(io.fileno, length_to_read, offset) if string.nil? return offset end if string.empty? return -Errno::EAGAIN::Errno end buffer.set_string(string, offset) size = string.bytesize offset += size return offset if size < length_to_read || size >= buffer.size Fiber.pause end end |
#io_wait(io, events, timeout = nil) ⇒ Object
13 14 15 16 17 18 19 20 21 22 23 |
# File 'lib/rage/fiber_scheduler.rb', line 13 def io_wait(io, events, timeout = nil) f = Fiber.current ::Iodine::Scheduler.attach(io.fileno, events, timeout&.ceil || 0) { |err| f.resume(err) } err = Fiber.defer(io.fileno) if err && err < 0 err else events end end |
#io_write(io, buffer, length, offset = 0) ⇒ Object
54 55 56 57 58 59 60 61 |
# File 'lib/rage/fiber_scheduler.rb', line 54 def io_write(io, buffer, length, offset = 0) bytes_to_write = length bytes_to_write = buffer.size if length == 0 ::Iodine::Scheduler.write(io.fileno, buffer.get_string, bytes_to_write, offset) bytes_to_write - offset end |
#kernel_sleep(duration = nil) ⇒ Object
64 65 66 |
# File 'lib/rage/fiber_scheduler.rb', line 64 def kernel_sleep(duration = nil) block(nil, duration || 0) end |
#unblock(_blocker, fiber) ⇒ Object
111 112 113 |
# File 'lib/rage/fiber_scheduler.rb', line 111 def unblock(_blocker, fiber) ::Iodine.publish(fiber.__block_channel, "", Iodine::PubSub::PROCESS) end |