Active Record 连接池
用于管理 Active Record 数据库连接的连接池基类。
介绍
连接池同步线程对有限数量的数据库连接的访问。基本思想是每个线程从池中检出一个数据库连接,使用该连接,然后将连接检入。 ConnectionPool
是完全线程安全的,只要遵循 ConnectionPool 的约定,它将确保连接不会被两个线程同时使用。它还将处理线程数超过连接数的情况:如果所有连接都被检出,并且一个线程仍然尝试检出一个连接,那么 ConnectionPool
将等待直到其他线程检入一个连接,或者 checkout_timeout
超时。
获取(检出)连接
可以通过多种方式从连接池中获取和使用连接
-
只需使用 ActiveRecord::Base.connection。当你完成连接并希望将其返回到池中时,你可以调用 ActiveRecord::Base.connection_handler.clear_active_connections!。这是 Active Record 与 Action Pack 的请求处理周期结合使用时的默认行为。
-
使用 ActiveRecord::Base.connection_pool.checkout 手动从池中检出一个连接。你负责在完成时通过调用 ActiveRecord::Base.connection_pool.checkin(connection) 将此连接返回到池中。
-
使用 ActiveRecord::Base.connection_pool.with_connection(&block),它获取一个连接,将其作为唯一参数传递给块,并在块完成之后将其返回到池中。
池中的连接实际上是 AbstractAdapter
对象(或与 AbstractAdapter 接口兼容的对象)。
当一个线程使用上述三种方法之一从连接池中检出一个连接时,该连接将自动成为该线程上执行的 ActiveRecord
查询使用的连接。例如,不需要显式地将检出的连接传递给 Rails 模型或查询。
选项
您可以向数据库连接配置添加几个与连接池相关的选项。
-
pool
: 连接池可以管理的最大连接数(默认值为 5)。 -
idle_timeout
: 连接在池中保持未使用状态的秒数,超过此时间将自动断开连接(默认值为 300 秒)。将其设置为零以永远保留连接。 -
checkout_timeout
: 在放弃并引发超时错误之前等待连接变为可用的秒数(默认值为 5 秒)。
- 类 ActiveRecord::ConnectionAdapters::ConnectionPool::Queue
- 类 ActiveRecord::ConnectionAdapters::ConnectionPool::Reaper
- A
- C
- D
- F
- L
- N
- R
- S
- W
属性
[R] | async_executor | |
[RW] | automatic_reconnect | |
[RW] | checkout_timeout | |
[R] | db_config | |
[R] | pool_config | |
[R] | reaper | |
[R] | role | |
[R] | shard | |
[R] | size |
类公共方法
new(pool_config) 链接
创建一个新的 ConnectionPool
对象。pool_config
是一个 PoolConfig 对象,它描述了数据库连接信息(例如适配器、主机名、用户名、密码等),以及此 ConnectionPool
的最大大小。
默认的 ConnectionPool
最大大小为 5。
来源:显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 120 def initialize(pool_config) super() @pool_config = pool_config @db_config = pool_config.db_config @role = pool_config.role @shard = pool_config.shard @checkout_timeout = db_config.checkout_timeout @idle_timeout = db_config.idle_timeout @size = db_config.pool # This variable tracks the cache of threads mapped to reserved connections, with the # sole purpose of speeding up the +connection+ method. It is not the authoritative # registry of which thread owns which connection. Connection ownership is tracked by # the +connection.owner+ attr on each +connection+ instance. # The invariant works like this: if there is mapping of <tt>thread => conn</tt>, # then that +thread+ does indeed own that +conn+. However, an absence of such # mapping does not mean that the +thread+ doesn't own the said connection. In # that case +conn.owner+ attr should be consulted. # Access and modification of <tt>@thread_cached_conns</tt> does not require # synchronization. @thread_cached_conns = Concurrent::Map.new(initial_capacity: @size) @connections = [] @automatic_reconnect = true # Connection pool allows for concurrent (outside the main +synchronize+ section) # establishment of new connections. This variable tracks the number of threads # currently in the process of independently establishing connections to the DB. @now_connecting = 0 @threads_blocking_new_connections = 0 @available = ConnectionLeasingQueue.new self @lock_thread = false @async_executor = build_async_executor @reaper = Reaper.new(self, db_config.reaping_frequency) @reaper.run end
实例公共方法
active_connection?() 链接
如果当前线程正在使用打开的连接,则返回 true。
此方法仅适用于通过 connection
或 with_connection
方法获取的连接。通过 checkout
获取的连接不会被 active_connection?
检测到。
来源:显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 196 def active_connection? @thread_cached_conns[connection_cache_key(current_thread)] end
checkin(conn) 链接
将数据库连接检入连接池,表示您不再需要此连接。
conn
:一个 AbstractAdapter
对象,该对象是通过先前调用此池的 checkout
获取的。
来源:显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 363 def checkin(conn) conn.lock.synchronize do synchronize do remove_connection_from_thread_cache conn conn._run_checkin_callbacks do conn.expire end conn.lock_thread = nil @available.add conn end end end
checkout(checkout_timeout = @checkout_timeout) 链接
从连接池中检出一个数据库连接,表示您要使用它。您应该在不再需要它时调用 checkin
。
这是通过返回和租赁现有连接,或创建新连接并租赁它来完成的。
如果所有连接都被租赁,并且连接池已达到容量(这意味着当前租赁的连接数量大于或等于设置的大小限制),则会引发 ActiveRecord::ConnectionTimeoutError
异常。
返回:一个 AbstractAdapter
对象。
引发
-
ActiveRecord::ConnectionTimeoutError
无法从连接池中获取连接。
来源:显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 352 def checkout(checkout_timeout = @checkout_timeout) connection = checkout_and_verify(acquire_connection(checkout_timeout)) connection.lock_thread = @lock_thread connection end
clear_reloadable_connections(raise_on_acquisition_timeout = true) 链接
清除映射类的缓存,并重新连接需要重新加载的连接。
引发
-
如果在超时时间间隔内(默认持续时间为
spec.db_config.checkout_timeout * 2
秒)无法获得池中所有连接的所有权,则会引发ActiveRecord::ExclusiveConnectionTimeoutError
。
来源:显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 310 def clear_reloadable_connections(raise_on_acquisition_timeout = true) with_exclusively_acquired_all_connections(raise_on_acquisition_timeout) do synchronize do @connections.each do |conn| if conn.in_use? conn.steal! checkin conn end conn.disconnect! if conn.requires_reloading? end @connections.delete_if(&:requires_reloading?) @available.clear end end end
clear_reloadable_connections!() 链接
清除映射类的缓存,并重新连接需要重新加载的连接。
池首先尝试获得所有连接的所有权。如果在超时时间间隔内(默认持续时间为 spec.db_config.checkout_timeout * 2
秒)无法做到这一点,则池会强制清除缓存并重新加载连接,而不会考虑其他拥有连接的线程。
来源:显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 334 def clear_reloadable_connections! clear_reloadable_connections(false) end
connected?() 链接
如果连接已打开,则返回 true。
来源:显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 233 def connected? synchronize { @connections.any? } end
connection() 链接
检索与当前线程关联的连接,或者在必要时调用 checkout
来获取一个连接。
connection
可以被调用任意次数;连接保存在一个由线程作为键的缓存中。
来源:显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 181 def connection @thread_cached_conns[connection_cache_key(current_thread)] ||= checkout end
connections() 链接
返回一个包含当前连接池中所有连接的数组。由于该数组是新创建的,并且不会被连接池保留,因此访问该数组不需要对连接池进行同步。
但是,此方法绕过了 ConnectionPool 的线程安全连接访问模式。返回的连接可能被另一个线程拥有,没有被拥有,或者碰巧被调用线程拥有。
在没有所有权的情况下调用连接上的方法将受到底层方法的线程安全保证的约束。连接适配器类中的许多方法本质上是多线程不安全的。
来源:显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 248 def connections synchronize { @connections.dup } end
disconnect(raise_on_acquisition_timeout = true) 链接
断开连接池中的所有连接,并清空连接池。
引发
-
如果在超时时间间隔内(默认持续时间为
spec.db_config.checkout_timeout * 2
秒)无法获得池中所有连接的所有权,则会引发ActiveRecord::ExclusiveConnectionTimeoutError
。
来源:显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 258 def disconnect(raise_on_acquisition_timeout = true) with_exclusively_acquired_all_connections(raise_on_acquisition_timeout) do synchronize do @connections.each do |conn| if conn.in_use? conn.steal! checkin conn end conn.disconnect! end @connections = [] @available.clear end end end
disconnect!() 链接
断开连接池中的所有连接,并清空连接池。
连接池首先尝试获取所有连接的所有权。如果在超时时间间隔内(默认持续时间为 spec.db_config.checkout_timeout * 2
秒)无法做到这一点,则连接池将强制断开连接,而不考虑其他拥有连接的线程。
来源:显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 280 def disconnect! disconnect(false) end
flush(minimum_idle = @idle_timeout) 链接
断开所有处于空闲状态至少 minimum_idle
秒的连接。当前签出的连接,或在 minimum_idle
秒前签入的连接不受影响。
来源:显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 435 def flush(minimum_idle = @idle_timeout) return if minimum_idle.nil? idle_connections = synchronize do return if self.discarded? @connections.select do |conn| !conn.in_use? && conn.seconds_idle >= minimum_idle end.each do |conn| conn.lease @available.delete conn @connections.delete conn end end idle_connections.each do |conn| conn.disconnect! end end
flush!() 链接
断开所有当前处于空闲状态的连接。当前签出的连接不受影响。
来源:显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 457 def flush! reap flush(-1) end
lock_thread=(lock_thread) 链接
来源: 显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 164 def lock_thread=(lock_thread) if lock_thread @lock_thread = ActiveSupport::IsolatedExecutionState.context else @lock_thread = nil end if (active_connection = @thread_cached_conns[connection_cache_key(current_thread)]) active_connection.lock_thread = @lock_thread end end
reap() 链接
恢复连接池中丢失的连接。如果程序员忘记在线程结束时检入连接,或者线程意外死亡,则可能会发生连接丢失。
来源: 显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 412 def reap stale_connections = synchronize do return if self.discarded? @connections.select do |conn| conn.in_use? && !conn.owner.alive? end.each do |conn| conn.steal! end end stale_connections.each do |conn| if conn.active? conn.reset! checkin conn else remove conn end end end
release_connection(owner_thread = ActiveSupport::IsolatedExecutionState.context) 链接
表示线程已完成当前连接。 release_connection
释放连接-线程关联并将连接返回到连接池。
此方法仅适用于通过 connection
或 with_connection
方法获得的连接,通过 checkout
获得的连接不会自动释放。
来源: 显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 207 def release_connection(owner_thread = ActiveSupport::IsolatedExecutionState.context) if conn = @thread_cached_conns.delete(connection_cache_key(owner_thread)) checkin conn end end
remove(conn) 链接
从连接池中删除连接。该连接将保持打开和活动状态,但将不再由该池管理。
来源: 显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 380 def remove(conn) needs_new_connection = false synchronize do remove_connection_from_thread_cache conn @connections.delete conn @available.delete conn # @available.any_waiting? => true means that prior to removing this # conn, the pool was at its max size (@connections.size == @size). # This would mean that any threads stuck waiting in the queue wouldn't # know they could checkout_new_connection, so let's do it for them. # Because condition-wait loop is encapsulated in the Queue class # (that in turn is oblivious to ConnectionPool implementation), threads # that are "stuck" there are helpless. They have no way of creating # new connections and are completely reliant on us feeding available # connections into the Queue. needs_new_connection = @available.any_waiting? end # This is intentionally done outside of the synchronized section as we # would like not to hold the main mutex while checking out new connections. # Thus there is some chance that needs_new_connection information is now # stale, we can live with that (bulk_make_new_connections will make # sure not to exceed the pool's @size limit). bulk_make_new_connections(1) if needs_new_connection end
stat() 链接
返回连接池的使用统计信息。
ActiveRecord::Base.connection_pool.stat # => { size: 15, connections: 1, busy: 1, dead: 0, idle: 0, waiting: 0, checkout_timeout: 5 }
来源: 显示 | 在 GitHub 上
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 469 def stat synchronize do { size: size, connections: @connections.size, busy: @connections.count { |c| c.in_use? && c.owner.alive? }, dead: @connections.count { |c| c.in_use? && !c.owner.alive? }, idle: @connections.count { |c| !c.in_use? }, waiting: num_waiting_in_queue, checkout_timeout: checkout_timeout } end end
with_connection() 链接
将连接池中的连接传递给块。如果当前线程尚未检出任何连接,则将从池中检出一个连接,传递给块,并在块完成后返回到池中。如果当前线程已检出连接,例如通过 connection
或 with_connection
,则该现有连接将是传递的连接,并且它不会在块结束时自动返回到池中;预计检出该现有连接的代码将适当地将其返回到池中。
来源:显示 | 在 GitHub 上查看
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 222 def with_connection unless conn = @thread_cached_conns[connection_cache_key(ActiveSupport::IsolatedExecutionState.context)] conn = connection fresh_connection = true end yield conn ensure release_connection if fresh_connection end