Active Record 连接池
用于管理 Active Record 数据库连接的连接池基类。
简介
连接池同步线程对有限数量的数据库连接的访问。基本思想是每个线程从池中检出一个数据库连接,使用该连接,然后将连接检入。 ConnectionPool
是完全线程安全的,并且将确保一个连接不能被两个线程同时使用,只要正确遵循 ConnectionPool 的约定。它还将处理线程数量超过连接数量的情况:如果所有连接都被检出,而线程尝试检出一个连接,那么 ConnectionPool
将等待直到其他线程检入一个连接,或者 checkout_timeout
超时。
获取(检出)连接
可以通过几种方式从连接池获取和使用连接
-
只需使用 ActiveRecord::Base.lease_connection。完成对连接的使用,并希望将其返回到池中时,您可以调用 ActiveRecord::Base.connection_handler.clear_active_connections!。这是在与 Action Pack 的请求处理周期结合使用时,Active Record 的默认行为。
-
使用 ActiveRecord::Base.connection_pool.checkout 从池中手动检出一个连接。您有责任在完成时通过调用 ActiveRecord::Base.connection_pool.checkin(connection) 将此连接返回到池中。
-
使用 ActiveRecord::Base.connection_pool.with_connection(&block),它获取一个连接,将其作为唯一参数传递给块,并在块完成后将其返回到池中。
池中的连接实际上是 AbstractAdapter
对象(或与 AbstractAdapter 接口兼容的对象)。
当线程使用上述三种方法之一从池中检出一个连接时,该连接将自动成为在该线程上执行的 ActiveRecord
查询使用的连接。例如,不需要显式地将检出的连接传递给 Rails 模型或查询。
选项
您可以在数据库连接配置中添加一些与连接池相关的选项
-
pool
:池可以管理的最大连接数(默认为 5)。 -
idle_timeout
:连接在池中保持不使用的时间(秒),在超过此时间后,连接将自动断开(默认为 300 秒)。将此值设置为零表示永远保持连接。 -
checkout_timeout
:在放弃并引发超时错误之前,等待连接可用(秒)(默认为 5 秒)。
- 类 ActiveRecord::ConnectionAdapters::ConnectionPool::Queue
- 类 ActiveRecord::ConnectionAdapters::ConnectionPool::Reaper
- A
- C
- D
- F
- I
- L
- N
- R
- S
- W
- MonitorMixin
属性
[R] | async_executor | |
[RW] | automatic_reconnect | |
[RW] | checkout_timeout | |
[R] | db_config | |
[R] | pool_config | |
[R] | reaper | |
[R] | role | |
[R] | shard | |
[R] | size |
类公共方法
install_executor_hooks(executor = ActiveSupport::Executor) 链接
来源:显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 206 def install_executor_hooks(executor = ActiveSupport::Executor) executor.register_hook(ExecutorHooks) end
new(pool_config) 链接
创建一个新的 ConnectionPool
对象。pool_config
是一个 PoolConfig 对象,它描述了数据库连接信息(例如适配器、主机名、用户名、密码等),以及此 ConnectionPool
的最大大小。
默认的 ConnectionPool
最大大小为 5。
来源:显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 226 def initialize(pool_config) super() @pool_config = pool_config @db_config = pool_config.db_config @role = pool_config.role @shard = pool_config.shard @checkout_timeout = db_config.checkout_timeout @idle_timeout = db_config.idle_timeout @size = db_config.pool # This variable tracks the cache of threads mapped to reserved connections, with the # sole purpose of speeding up the +connection+ method. It is not the authoritative # registry of which thread owns which connection. Connection ownership is tracked by # the +connection.owner+ attr on each +connection+ instance. # The invariant works like this: if there is mapping of <tt>thread => conn</tt>, # then that +thread+ does indeed own that +conn+. However, an absence of such # mapping does not mean that the +thread+ doesn't own the said connection. In # that case +conn.owner+ attr should be consulted. # Access and modification of <tt>@leases</tt> does not require # synchronization. @leases = LeaseRegistry.new @connections = [] @automatic_reconnect = true # Connection pool allows for concurrent (outside the main +synchronize+ section) # establishment of new connections. This variable tracks the number of threads # currently in the process of independently establishing connections to the DB. @now_connecting = 0 @threads_blocking_new_connections = 0 @available = ConnectionLeasingQueue.new self @pinned_connection = nil @pinned_connections_depth = 0 @async_executor = build_async_executor @schema_cache = nil @reaper = Reaper.new(self, db_config.reaping_frequency) @reaper.run end
实例公共方法
active_connection?() 链接
如果当前线程正在使用一个打开的连接,则返回 true。
此方法仅适用于通过 lease_connection
或 with_connection
方法获取的连接。通过 checkout
获取的连接不会被 active_connection?
检测到
来源:显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 370 def active_connection? connection_lease.connection end
checkin(conn) 链接
将数据库连接检入池中,表示您不再需要此连接。
conn
:一个 AbstractAdapter
对象,它通过先前调用此池的 checkout
获取。
来源:显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 564 def checkin(conn) return if @pinned_connection.equal?(conn) conn.lock.synchronize do synchronize do connection_lease.clear(conn) conn._run_checkin_callbacks do conn.expire end @available.add conn end end end
checkout(checkout_timeout = @checkout_timeout) 链接
从池中检出一个数据库连接,表示您要使用它。您应该在不再需要它时调用 checkin
。
这可以通过返回和租赁现有连接,或通过创建新的连接并租赁它来完成。
如果所有连接都被租赁并且池已达到容量(意味着当前租赁的连接数量大于或等于设置的大小限制),将引发 ActiveRecord::ConnectionTimeoutError
异常。
返回:一个 AbstractAdapter
对象。
引发
-
ActiveRecord::ConnectionTimeoutError
无法从池中获取连接。
来源:显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 541 def checkout(checkout_timeout = @checkout_timeout) if @pinned_connection @pinned_connection.lock.synchronize do synchronize do @pinned_connection.verify! # Any leased connection must be in @connections otherwise # some methods like #connected? won't behave correctly unless @connections.include?(@pinned_connection) @connections << @pinned_connection end end end @pinned_connection else checkout_and_verify(acquire_connection(checkout_timeout)) end end
clear_reloadable_connections(raise_on_acquisition_timeout = true) 链接
清除将类映射到重新连接的连接的缓存。
引发
-
ActiveRecord::ExclusiveConnectionTimeoutError
如果无法在超时时间内获得池中所有连接的所有权(默认持续时间为spec.db_config.checkout_timeout * 2
秒)。
来源:显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 499 def clear_reloadable_connections(raise_on_acquisition_timeout = true) with_exclusively_acquired_all_connections(raise_on_acquisition_timeout) do synchronize do @connections.each do |conn| if conn.in_use? conn.steal! checkin conn end conn.disconnect! if conn.requires_reloading? end @connections.delete_if(&:requires_reloading?) @available.clear end end end
clear_reloadable_connections!() 链接
清除将类映射到重新连接的连接的缓存。
池首先尝试获得所有连接的所有权。如果在超时时间内(默认持续时间为 spec.db_config.checkout_timeout * 2
秒)无法做到,则池将强制清除缓存并重新加载连接,而不考虑其他连接拥有线程。
来源:显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 523 def clear_reloadable_connections! clear_reloadable_connections(false) end
connected?() 链接
如果连接已打开,则返回 true。
来源:显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 421 def connected? synchronize { @connections.any?(&:connected?) } end
connections() 链接
返回一个包含当前池中连接的数组。访问数组不需要对池进行同步,因为数组是新创建的,并且不会被池保留。
但是;此方法绕过了 ConnectionPool 的线程安全连接访问模式。返回的连接可能被另一个线程拥有,没有拥有,或者偶然地被调用线程拥有。
在没有所有权的情况下调用连接上的方法会受到底层方法的线程安全保证的影响。连接适配器类上的许多方法本质上是多线程不安全的。
来源:显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 436 def connections synchronize { @connections.dup } end
disconnect(raise_on_acquisition_timeout = true) 链接
断开池中的所有连接,并清除池。
引发
-
ActiveRecord::ExclusiveConnectionTimeoutError
如果无法在超时时间内获得池中所有连接的所有权(默认持续时间为spec.db_config.checkout_timeout * 2
秒)。
来源:显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 446 def disconnect(raise_on_acquisition_timeout = true) with_exclusively_acquired_all_connections(raise_on_acquisition_timeout) do synchronize do @connections.each do |conn| if conn.in_use? conn.steal! checkin conn end conn.disconnect! end @connections = [] @leases.clear @available.clear end end end
disconnect!() 链接
断开池中的所有连接,并清除池。
池首先尝试获得所有连接的所有权。如果在超时时间内(默认持续时间为 spec.db_config.checkout_timeout * 2
秒)无法做到,则池将强制断开连接,而不考虑其他连接拥有线程。
来源:显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 469 def disconnect! disconnect(false) end
flush(minimum_idle = @idle_timeout) 链接
断开所有处于空闲状态至少 minimum_idle
秒的连接。当前已检出的连接,或在 minimum_idle
秒前已检入的连接,不会受到影响。
来源:显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 637 def flush(minimum_idle = @idle_timeout) return if minimum_idle.nil? idle_connections = synchronize do return if self.discarded? @connections.select do |conn| !conn.in_use? && conn.seconds_idle >= minimum_idle end.each do |conn| conn.lease @available.delete conn @connections.delete conn end end idle_connections.each do |conn| conn.disconnect! end end
flush!() 链接
断开所有当前处于空闲状态的连接。当前已检出的连接不会受到影响。
来源:显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 659 def flush! reap flush(-1) end
lease_connection() 链接
检索与当前线程关联的连接,或调用 checkout
以在必要时获取一个连接。
lease_connection
可以被调用任意次数;连接被保存在一个以线程为键的缓存中。
来源:显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 309 def lease_connection lease = connection_lease lease.sticky = true lease.connection ||= checkout end
reap() 链接
恢复池中丢失的连接。如果程序员忘记在线程结束时检入连接,或线程意外死亡,则可能发生连接丢失。
来源:显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 614 def reap stale_connections = synchronize do return if self.discarded? @connections.select do |conn| conn.in_use? && !conn.owner.alive? end.each do |conn| conn.steal! end end stale_connections.each do |conn| if conn.active? conn.reset! checkin conn else remove conn end end end
release_connection(existing_lease = nil) 链接
表示线程已完成对当前连接的使用。release_connection
释放连接-线程关联,并将连接返回到池中。
此方法仅适用于通过 lease_connection
或 with_connection
方法获取的连接,通过 checkout
获取的连接不会自动释放。
来源:显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 382 def release_connection(existing_lease = nil) if conn = connection_lease.release checkin conn return true end false end
remove(conn) 链接
从连接池中删除一个连接。连接将保持打开并处于活动状态,但不再由此池管理。
来源:显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 582 def remove(conn) needs_new_connection = false synchronize do remove_connection_from_thread_cache conn @connections.delete conn @available.delete conn # @available.any_waiting? => true means that prior to removing this # conn, the pool was at its max size (@connections.size == @size). # This would mean that any threads stuck waiting in the queue wouldn't # know they could checkout_new_connection, so let's do it for them. # Because condition-wait loop is encapsulated in the Queue class # (that in turn is oblivious to ConnectionPool implementation), threads # that are "stuck" there are helpless. They have no way of creating # new connections and are completely reliant on us feeding available # connections into the Queue. needs_new_connection = @available.any_waiting? end # This is intentionally done outside of the synchronized section as we # would like not to hold the main mutex while checking out new connections. # Thus there is some chance that needs_new_connection information is now # stale, we can live with that (bulk_make_new_connections will make # sure not to exceed the pool's @size limit). bulk_make_new_connections(1) if needs_new_connection end
schema_cache() 链接
来源:显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 279 def schema_cache @schema_cache ||= BoundSchemaReflection.new(schema_reflection, self) end
schema_reflection=(schema_reflection) 链接
来源:显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 283 def schema_reflection=(schema_reflection) pool_config.schema_reflection = schema_reflection @schema_cache = nil end
stat() 链接
返回连接池的使用统计信息。
ActiveRecord::Base.connection_pool.stat # => { size: 15, connections: 1, busy: 1, dead: 0, idle: 0, waiting: 0, checkout_timeout: 5 }
来源:显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 671 def stat synchronize do { size: size, connections: @connections.size, busy: @connections.count { |c| c.in_use? && c.owner.alive? }, dead: @connections.count { |c| c.in_use? && !c.owner.alive? }, idle: @connections.count { |c| !c.in_use? }, waiting: num_waiting_in_queue, checkout_timeout: checkout_timeout } end end
with_connection(prevent_permanent_checkout: false) 链接
将连接池中的连接 yield 给块。如果当前线程尚未检出任何连接,则会从池中检出一个连接,yield 给块,并在块完成时将其返回到池中。如果当前线程已检出一个连接,例如通过 lease_connection
或 with_connection
,则将 yield 该现有连接,并且在块结束时不会将其自动返回到池中;预计此类现有连接将由检出它的代码正确地返回到池中。
来源:显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 399 def with_connection(prevent_permanent_checkout: false) lease = connection_lease sticky_was = lease.sticky lease.sticky = false if prevent_permanent_checkout if lease.connection begin yield lease.connection ensure lease.sticky = sticky_was if prevent_permanent_checkout && !sticky_was end else begin yield lease.connection = checkout ensure lease.sticky = sticky_was if prevent_permanent_checkout && !sticky_was release_connection(lease) unless lease.sticky end end end