Module: Rage::Ext::ActiveRecord::ConnectionPool

Includes:
ConnectionWithVerify
Defined in:
lib/rage/ext/active_record/connection_pool.rb

Defined Under Namespace

Modules: ConnectionWithVerify Classes: BlackHoleList

Instance Method Summary collapse

Instance Method Details

#__init_rage_extensionObject



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/rage/ext/active_record/connection_pool.rb', line 53

def __init_rage_extension
  # a map of fibers that are currently waiting for a
  # connection in the format of { Fiber => timestamp }
  @__blocked = {}

  # a map of fibers that are currently hodling connections
  # in the format of { Fiber => Connection }
  @__in_use = {}

  # a list of all DB connections that are currently idle
  @__connections = build_new_connections

  # how long a fiber can wait for a connection to become available
  @__checkout_timeout = checkout_timeout

  # how long a connection can be idle for before disconnecting
  @__idle_timeout = respond_to?(:db_config) ? db_config.idle_timeout : @idle_timeout

  # how often should we check for fibers that wait for a connection for too long
  @__timeout_worker_frequency = 0.5

  # reject fibers that wait for a connection for more than `@__checkout_timeout`
  Iodine.run_every((@__timeout_worker_frequency * 1_000).to_i) do
    if @__blocked.length > 0
      current_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
      @__blocked.each do |fiber, blocked_since|
        if (current_time - blocked_since) > @__checkout_timeout
          @__blocked.delete(fiber)
          fiber.raise(ActiveRecord::ConnectionTimeoutError, "could not obtain a connection from the pool within #{@__checkout_timeout} seconds; all pooled connections were in use")
        end
      end
    end
  end

  # monitor connections health
  if Rage.config.internal.should_manually_restore_ar_connections?
    Iodine.run_every(1_000) do
      i = 0
      while i < @__connections.length
        conn = @__connections[i]

        unless conn.__needs_reconnect
          needs_reconnect = !conn.active? rescue true
          if needs_reconnect
            conn.__needs_reconnect = true
            conn.disconnect!
          end
        end

        i += 1
      end
    end
  end

  @release_connection_channel = "ext:ar-connection-released:#{object_id}"

  # resume blocked fibers once connections become available
  Iodine.subscribe(@release_connection_channel) do
    if @__blocked.length > 0 && @__connections.length > 0
      f, _ = @__blocked.shift
      f.resume
    end
  end

  # unsubscribe on shutdown
  Iodine.on_state(:on_finish) do
    Iodine.unsubscribe(@release_connection_channel)
  end
end

#active_connection?Boolean

Returns true if there is an open connection being used for the current fiber.

Returns:

  • (Boolean)


124
125
126
# File 'lib/rage/ext/active_record/connection_pool.rb', line 124

def active_connection?
  @__in_use[Fiber.current]
end

#checkin(conn) ⇒ Object

Check in a database connection back into the pool, indicating that you no longer need this connection.



292
293
294
295
# File 'lib/rage/ext/active_record/connection_pool.rb', line 292

def checkin(conn)
  fiber = @__in_use.key(conn)
  release_connection(fiber)
end

#checkout(_ = nil) ⇒ Object

Check out a database connection from the pool, indicating that you want to use it. You should call #checkin when you no longer need this.



283
284
285
# File 'lib/rage/ext/active_record/connection_pool.rb', line 283

def checkout(_ = nil)
  connection
end

#clear_reloadable_connections(raise_on_acquisition_timeout = true) ⇒ Object



305
306
307
# File 'lib/rage/ext/active_record/connection_pool.rb', line 305

def clear_reloadable_connections(raise_on_acquisition_timeout = true)
  disconnect(raise_on_acquisition_timeout)
end

#clear_reloadable_connections!Object



309
310
311
# File 'lib/rage/ext/active_record/connection_pool.rb', line 309

def clear_reloadable_connections!
  disconnect(false)
end

#connected?Boolean

Returns true if a connection has already been opened.

Returns:

  • (Boolean)


216
217
218
# File 'lib/rage/ext/active_record/connection_pool.rb', line 216

def connected?
  true
end

#connectionObject

Retrieve the connection associated with the current fiber, or obtain one if necessary.



129
130
131
132
133
134
135
136
137
# File 'lib/rage/ext/active_record/connection_pool.rb', line 129

def connection
  @__in_use[Fiber.current] ||= @__connections.shift || begin
    fiber, blocked_since = Fiber.current, Process.clock_gettime(Process::CLOCK_MONOTONIC)
    @__blocked[fiber] = blocked_since
    Fiber.yield

    @__connections.shift
  end
end

#connectionsObject

Returns an array containing the connections currently in the pool.



211
212
213
# File 'lib/rage/ext/active_record/connection_pool.rb', line 211

def connections
  @__connections.to_a
end

#discard!Object

Discards all connections in the pool (even if they’re currently in use!), along with the pool itself. Any further interaction with the pool is undefined.



319
320
321
322
# File 'lib/rage/ext/active_record/connection_pool.rb', line 319

def discard!
  @__discarded = true
  (@__connections + @__in_use.values).each { |conn| conn.discard! }
end

#discarded?Boolean

Returns:

  • (Boolean)


324
325
326
# File 'lib/rage/ext/active_record/connection_pool.rb', line 324

def discarded?
  !!@__discarded
end

#disconnect(raise_on_acquisition_timeout = true, disconnect_attempts = 0) ⇒ Object

Disconnects all connections in the pool, and clears the pool. Raises ActiveRecord::ExclusiveConnectionTimeoutError if unable to gain ownership of all connections in the pool within a timeout interval (default duration is checkout_timeout * 2 seconds).



236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
# File 'lib/rage/ext/active_record/connection_pool.rb', line 236

def disconnect(raise_on_acquisition_timeout = true, disconnect_attempts = 0)
  # allow request fibers to release connections, but block from acquiring new ones
  if disconnect_attempts == 0
    @__connections = BlackHoleList.new(@__connections)
  end

  # if some connections are in use, we will wait for up to `@__checkout_timeout * 2` seconds
  if @__in_use.length > 0 && disconnect_attempts <= @__checkout_timeout * 4
    Iodine.run_after(500) { disconnect(raise_on_acquisition_timeout, disconnect_attempts + 1) }
    return
  end

  pool_connections = @__connections.to_a

  # check if there are still some connections in use
  if @__in_use.length > 0
    raise(ActiveRecord::ExclusiveConnectionTimeoutError, "could not obtain ownership of all database connections") if raise_on_acquisition_timeout
    pool_connections += @__in_use.values
    @__in_use.clear
  end

  # disconnect all connections
  pool_connections.each do |conn|
    conn.disconnect!
    __remove__(conn)
  end

  # create a new pool
  self.automatic_reconnect = true
  @__connections = build_new_connections

  # notify blocked fibers that there are new connections available
  [@__blocked.length, @__connections.length].min.times do
    Iodine.publish(@release_connection_channel, "", Iodine::PubSub::PROCESS)
  end
end

#disconnect!Object

Disconnects all connections in the pool, and clears the pool. The pool first tries to gain ownership of all connections. If unable to do so within a timeout interval (default duration is checkout_timeout * 2 seconds), then the pool is forcefully disconnected without any regard for other connection owning fibers.



277
278
279
# File 'lib/rage/ext/active_record/connection_pool.rb', line 277

def disconnect!
  disconnect(false)
end

#flush(minimum_idle = @__idle_timeout) ⇒ Object

Disconnect all connections that have been idle for at least minimum_idle seconds. Connections currently checked out, or that were checked in less than minimum_idle seconds ago, are unaffected.



178
179
180
181
182
183
184
185
186
187
188
189
190
191
# File 'lib/rage/ext/active_record/connection_pool.rb', line 178

def flush(minimum_idle = @__idle_timeout)
  return if minimum_idle.nil? || @__connections.length == 0

  current_time, i = Process.clock_gettime(Process::CLOCK_MONOTONIC), 0
  while i < @__connections.length
    conn = @__connections[i]
    if conn.__idle_since && current_time - conn.__idle_since >= minimum_idle
      conn.__idle_since = nil
      conn.__needs_reconnect = true
      conn.disconnect!
    end
    i += 1
  end
end

#flush!Object

Disconnect all currently idle connections. Connections currently checked out are unaffected.



194
195
196
197
# File 'lib/rage/ext/active_record/connection_pool.rb', line 194

def flush!
  reap
  flush(-1)
end

#lease_connectionObject



287
288
289
# File 'lib/rage/ext/active_record/connection_pool.rb', line 287

def lease_connection
  connection
end

#num_waiting_in_queueObject



313
314
315
# File 'lib/rage/ext/active_record/connection_pool.rb', line 313

def num_waiting_in_queue
  @__blocked.length
end

#reapObject

Recover lost connections for the pool.



151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
# File 'lib/rage/ext/active_record/connection_pool.rb', line 151

def reap
  crashed_fibers = nil

  @__in_use.each do |fiber, conn|
    unless fiber.alive?
      if conn.active?
        conn.reset!
        (crashed_fibers ||= []) << fiber
      else
        @__in_use.delete(fiber)
        conn.disconnect!
        __remove__(conn)
        self.automatic_reconnect = true
        @__connections += build_new_connections(1)
        Iodine.publish(@release_connection_channel, "", Iodine::PubSub::PROCESS) if @__blocked.length > 0
      end
    end
  end

  if crashed_fibers
    crashed_fibers.each { |fiber| release_connection(fiber) }
  end
end

#release_connection(owner = Fiber.current) ⇒ Object

Signal that the fiber is finished with the current connection and it can be returned to the pool.



140
141
142
143
144
145
146
147
148
# File 'lib/rage/ext/active_record/connection_pool.rb', line 140

def release_connection(owner = Fiber.current)
  if (conn = @__in_use.delete(owner))
    conn.__idle_since = Process.clock_gettime(Process::CLOCK_MONOTONIC)
    @__connections << conn
    Iodine.publish(@release_connection_channel, "", Iodine::PubSub::PROCESS) if @__blocked.length > 0
  end

  conn
end

#remove(conn) ⇒ Object

Remove a connection from the connection pool. The connection will remain open and active but will no longer be managed by this pool.



299
300
301
302
303
# File 'lib/rage/ext/active_record/connection_pool.rb', line 299

def remove(conn)
  __remove__(conn)
  @__in_use.delete_if { |_, c| c == conn }
  @__connections.delete(conn)
end

#statObject

Return connection pool’s usage statistic.



221
222
223
224
225
226
227
228
229
230
231
# File 'lib/rage/ext/active_record/connection_pool.rb', line 221

def stat
  {
    size: size,
    connections: size,
    busy: @__in_use.count { |fiber, _| fiber.alive? },
    dead: @__in_use.count { |fiber, _| !fiber.alive? },
    idle: @__connections.length,
    waiting: @__blocked.length,
    checkout_timeout: @__checkout_timeout
  }
end

#with_connection(_ = nil) ⇒ Object

Yields a connection from the connection pool to the block.



200
201
202
203
204
205
206
207
208
# File 'lib/rage/ext/active_record/connection_pool.rb', line 200

def with_connection(_ = nil)
  unless (conn = @__in_use[Fiber.current])
    conn = connection
    fresh_connection = true
  end
  yield conn
ensure
  release_connection if fresh_connection
end