Class: Rage::FiberScheduler

Inherits:
Object
  • Object
show all
Defined in:
lib/rage/fiber_scheduler.rb

Constant Summary collapse

MAX_READ =
65536

Instance Method Summary collapse

Constructor Details

#initializeFiberScheduler

Returns a new instance of FiberScheduler.



8
9
10
11
# File 'lib/rage/fiber_scheduler.rb', line 8

def initialize
  @root_fiber = Fiber.current
  @dns_cache = {}
end

Instance Method Details

#address_resolve(hostname) ⇒ Object

result end



83
84
85
86
87
88
89
90
91
# File 'lib/rage/fiber_scheduler.rb', line 83

def address_resolve(hostname)
  @dns_cache[hostname] ||= begin
    ::Iodine.run_after(60_000) do
      @dns_cache[hostname] = nil
    end

    Resolv.getaddresses(hostname)
  end
end

#block(_blocker, timeout = nil) ⇒ Object



93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/rage/fiber_scheduler.rb', line 93

def block(_blocker, timeout = nil)
  f, fulfilled, channel = Fiber.current, false, Fiber.current.__block_channel(true)

  resume_fiber_block = proc do
    unless fulfilled
      fulfilled = true
      ::Iodine.defer { ::Iodine.unsubscribe(channel) }
      f.resume
    end
  end

  ::Iodine.subscribe(channel, &resume_fiber_block)
  if timeout
    ::Iodine.run_after((timeout * 1000).to_i, &resume_fiber_block)
  end

  Fiber.yield
end

#closeObject



149
150
151
# File 'lib/rage/fiber_scheduler.rb', line 149

def close
  ::Iodine::Scheduler.close
end

#fiber(&block) ⇒ Object



116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/rage/fiber_scheduler.rb', line 116

def fiber(&block)
  parent = Fiber.current

  fiber = if parent == @root_fiber
    # the fiber to wrap a request in
    Fiber.new(blocking: false) do
      Fiber.current.__set_id
      Rage::Telemetry.tracer.span_core_fiber_dispatch do
        Fiber.current.__set_result(block.call)
      end
    end
  else
    # the fiber was created in the user code
    logger = Thread.current[:rage_logger]

    Fiber.new(blocking: false) do
      Thread.current[:rage_logger] = logger
      Rage::Telemetry.tracer.span_core_fiber_spawn(parent:) do
        Fiber.current.__set_result(block.call)
      end
      # send a message for `Fiber.await` to work
      Iodine.publish(parent.__await_channel, "", Iodine::PubSub::PROCESS) if parent.alive?
    rescue Exception => e
      Fiber.current.__set_err(e)
      Iodine.publish(parent.__await_channel, Fiber::AWAIT_ERROR_MESSAGE, Iodine::PubSub::PROCESS) if parent.alive?
    end
  end

  fiber.resume

  fiber
end

#io_read(io, buffer, length, offset = 0) ⇒ Object



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/rage/fiber_scheduler.rb', line 25

def io_read(io, buffer, length, offset = 0)
  length_to_read = if length == 0
    buffer.size > MAX_READ ? MAX_READ : buffer.size
  else
    length
  end

  while true
    string = ::Iodine::Scheduler.read(io.fileno, length_to_read, offset)

    if string.nil?
      return offset
    end

    if string.empty?
      return -Errno::EAGAIN::Errno
    end

    buffer.set_string(string, offset)

    size = string.bytesize
    offset += size
    return offset if size < length_to_read || size >= buffer.size

    Fiber.pause
  end
end

#io_wait(io, events, timeout = nil) ⇒ Object



13
14
15
16
17
18
19
20
21
22
23
# File 'lib/rage/fiber_scheduler.rb', line 13

def io_wait(io, events, timeout = nil)
  f = Fiber.current
  ::Iodine::Scheduler.attach(io.fileno, events, timeout&.ceil) { |err| f.resume(err) }

  err = Fiber.defer(io.fileno)
  if err == false || (err && err < 0)
    err
  else
    events
  end
end

#io_write(io, buffer, length, offset = 0) ⇒ Object



54
55
56
57
58
59
60
61
# File 'lib/rage/fiber_scheduler.rb', line 54

def io_write(io, buffer, length, offset = 0)
  bytes_to_write = length
  bytes_to_write = buffer.size if length == 0

  ::Iodine::Scheduler.write(io.fileno, buffer.get_string, bytes_to_write, offset)

  bytes_to_write - offset
end

#kernel_sleep(duration = nil) ⇒ Object



64
65
66
67
# File 'lib/rage/fiber_scheduler.rb', line 64

def kernel_sleep(duration = nil)
  block(nil, duration || 0)
  Fiber.pause if duration.nil? || duration < 1
end

#unblock(_blocker, fiber) ⇒ Object



112
113
114
# File 'lib/rage/fiber_scheduler.rb', line 112

def unblock(_blocker, fiber)
  ::Iodine.publish(fiber.__block_channel, "", Iodine::PubSub::PROCESS)
end