Class: Rage::FiberScheduler

Inherits:
Object
  • Object
show all
Defined in:
lib/rage/fiber_scheduler.rb

Constant Summary collapse

MAX_READ =
65536

Instance Method Summary collapse

Constructor Details

#initializeFiberScheduler

Returns a new instance of FiberScheduler.



8
9
10
11
# File 'lib/rage/fiber_scheduler.rb', line 8

def initialize
  @root_fiber = Fiber.current
  @dns_cache = {}
end

Instance Method Details

#address_resolve(hostname) ⇒ Object

result end



82
83
84
85
86
87
88
89
90
# File 'lib/rage/fiber_scheduler.rb', line 82

def address_resolve(hostname)
  @dns_cache[hostname] ||= begin
    ::Iodine.run_after(60_000) do
      @dns_cache[hostname] = nil
    end

    Resolv.getaddresses(hostname)
  end
end

#block(_blocker, timeout = nil) ⇒ Object



92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/rage/fiber_scheduler.rb', line 92

def block(_blocker, timeout = nil)
  f, fulfilled, channel = Fiber.current, false, Fiber.current.__block_channel(true)

  resume_fiber_block = proc do
    unless fulfilled
      fulfilled = true
      ::Iodine.defer { ::Iodine.unsubscribe(channel) }
      f.resume
    end
  end

  ::Iodine.subscribe(channel, &resume_fiber_block)
  if timeout
    ::Iodine.run_after((timeout * 1000).to_i, &resume_fiber_block)
  end

  Fiber.yield
end

#closeObject



144
145
146
# File 'lib/rage/fiber_scheduler.rb', line 144

def close
  ::Iodine::Scheduler.close
end

#fiber(&block) ⇒ Object



115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/rage/fiber_scheduler.rb', line 115

def fiber(&block)
  parent = Fiber.current

  fiber = if parent == @root_fiber
    # the fiber to wrap a request in
    Fiber.new(blocking: false) do
      Fiber.current.__set_id
      Fiber.current.__set_result(block.call)
    end
  else
    # the fiber was created in the user code
    logger = Thread.current[:rage_logger]

    Fiber.new(blocking: false) do
      Thread.current[:rage_logger] = logger
      Fiber.current.__set_result(block.call)
      # send a message for `Fiber.await` to work
      Iodine.publish("await:#{parent.object_id}", "", Iodine::PubSub::PROCESS) if parent.alive?
    rescue Exception => e
      Fiber.current.__set_err(e)
      Iodine.publish("await:#{parent.object_id}", Fiber::AWAIT_ERROR_MESSAGE, Iodine::PubSub::PROCESS) if parent.alive?
    end
  end

  fiber.resume

  fiber
end

#io_read(io, buffer, length, offset = 0) ⇒ Object



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/rage/fiber_scheduler.rb', line 25

def io_read(io, buffer, length, offset = 0)
  length_to_read = if length == 0
    buffer.size > MAX_READ ? MAX_READ : buffer.size
  else
    length
  end

  while true
    string = ::Iodine::Scheduler.read(io.fileno, length_to_read, offset)

    if string.nil?
      return offset
    end

    if string.empty?
      return -Errno::EAGAIN::Errno
    end

    buffer.set_string(string, offset)

    size = string.bytesize
    offset += size
    return offset if size < length_to_read || size >= buffer.size

    Fiber.pause
  end
end

#io_wait(io, events, timeout = nil) ⇒ Object



13
14
15
16
17
18
19
20
21
22
23
# File 'lib/rage/fiber_scheduler.rb', line 13

def io_wait(io, events, timeout = nil)
  f = Fiber.current
  ::Iodine::Scheduler.attach(io.fileno, events, timeout&.ceil || 0) { |err| f.resume(err) }

  err = Fiber.defer(io.fileno)
  if err && err < 0
    err
  else
    events
  end
end

#io_write(io, buffer, length, offset = 0) ⇒ Object



54
55
56
57
58
59
60
61
# File 'lib/rage/fiber_scheduler.rb', line 54

def io_write(io, buffer, length, offset = 0)
  bytes_to_write = length
  bytes_to_write = buffer.size if length == 0

  ::Iodine::Scheduler.write(io.fileno, buffer.get_string, bytes_to_write, offset)

  bytes_to_write - offset
end

#kernel_sleep(duration = nil) ⇒ Object



64
65
66
# File 'lib/rage/fiber_scheduler.rb', line 64

def kernel_sleep(duration = nil)
  block(nil, duration || 0)
end

#unblock(_blocker, fiber) ⇒ Object



111
112
113
# File 'lib/rage/fiber_scheduler.rb', line 111

def unblock(_blocker, fiber)
  ::Iodine.publish(fiber.__block_channel, "", Iodine::PubSub::PROCESS)
end