跳至内容 跳至搜索

Active Record 连接池

用于管理 Active Record 数据库连接的连接池基类。

介绍

连接池同步线程对有限数量的数据库连接的访问。基本思想是每个线程从池中检出一个数据库连接,使用该连接,然后将连接检入。 ConnectionPool 是完全线程安全的,只要遵循 ConnectionPool 的约定,它将确保连接不会被两个线程同时使用。它还将处理线程数超过连接数的情况:如果所有连接都被检出,并且一个线程仍然尝试检出一个连接,那么 ConnectionPool 将等待直到其他线程检入一个连接,或者 checkout_timeout 超时。

获取(检出)连接

可以通过多种方式从连接池中获取和使用连接

  1. 只需使用 ActiveRecord::Base.connection。当你完成连接并希望将其返回到池中时,你可以调用 ActiveRecord::Base.connection_handler.clear_active_connections!。这是 Active Record 与 Action Pack 的请求处理周期结合使用时的默认行为。

  2. 使用 ActiveRecord::Base.connection_pool.checkout 手动从池中检出一个连接。你负责在完成时通过调用 ActiveRecord::Base.connection_pool.checkin(connection) 将此连接返回到池中。

  3. 使用 ActiveRecord::Base.connection_pool.with_connection(&block),它获取一个连接,将其作为唯一参数传递给块,并在块完成之后将其返回到池中。

池中的连接实际上是 AbstractAdapter 对象(或与 AbstractAdapter 接口兼容的对象)。

当一个线程使用上述三种方法之一从连接池中检出一个连接时,该连接将自动成为该线程上执行的 ActiveRecord 查询使用的连接。例如,不需要显式地将检出的连接传递给 Rails 模型或查询。

选项

您可以向数据库连接配置添加几个与连接池相关的选项。

  • pool: 连接池可以管理的最大连接数(默认值为 5)。

  • idle_timeout: 连接在池中保持未使用状态的秒数,超过此时间将自动断开连接(默认值为 300 秒)。将其设置为零以永远保留连接。

  • checkout_timeout: 在放弃并引发超时错误之前等待连接变为可用的秒数(默认值为 5 秒)。

命名空间
方法
A
C
D
F
L
N
R
S
W
包含的模块

属性

[R] async_executor
[RW] automatic_reconnect
[RW] checkout_timeout
[R] db_config
[R] pool_config
[R] reaper
[R] role
[R] shard
[R] size

类公共方法

new(pool_config)

创建一个新的 ConnectionPool 对象。pool_config 是一个 PoolConfig 对象,它描述了数据库连接信息(例如适配器、主机名、用户名、密码等),以及此 ConnectionPool 的最大大小。

默认的 ConnectionPool 最大大小为 5。

# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 120
def initialize(pool_config)
  super()

  @pool_config = pool_config
  @db_config = pool_config.db_config
  @role = pool_config.role
  @shard = pool_config.shard

  @checkout_timeout = db_config.checkout_timeout
  @idle_timeout = db_config.idle_timeout
  @size = db_config.pool

  # This variable tracks the cache of threads mapped to reserved connections, with the
  # sole purpose of speeding up the +connection+ method. It is not the authoritative
  # registry of which thread owns which connection. Connection ownership is tracked by
  # the +connection.owner+ attr on each +connection+ instance.
  # The invariant works like this: if there is mapping of <tt>thread => conn</tt>,
  # then that +thread+ does indeed own that +conn+. However, an absence of such
  # mapping does not mean that the +thread+ doesn't own the said connection. In
  # that case +conn.owner+ attr should be consulted.
  # Access and modification of <tt>@thread_cached_conns</tt> does not require
  # synchronization.
  @thread_cached_conns = Concurrent::Map.new(initial_capacity: @size)

  @connections         = []
  @automatic_reconnect = true

  # Connection pool allows for concurrent (outside the main +synchronize+ section)
  # establishment of new connections. This variable tracks the number of threads
  # currently in the process of independently establishing connections to the DB.
  @now_connecting = 0

  @threads_blocking_new_connections = 0

  @available = ConnectionLeasingQueue.new self

  @lock_thread = false

  @async_executor = build_async_executor

  @reaper = Reaper.new(self, db_config.reaping_frequency)
  @reaper.run
end

实例公共方法

active_connection?()

如果当前线程正在使用打开的连接,则返回 true。

此方法仅适用于通过 connectionwith_connection 方法获取的连接。通过 checkout 获取的连接不会被 active_connection? 检测到。

# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 196
def active_connection?
  @thread_cached_conns[connection_cache_key(current_thread)]
end

checkin(conn)

将数据库连接检入连接池,表示您不再需要此连接。

conn:一个 AbstractAdapter 对象,该对象是通过先前调用此池的 checkout 获取的。

# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 363
def checkin(conn)
  conn.lock.synchronize do
    synchronize do
      remove_connection_from_thread_cache conn

      conn._run_checkin_callbacks do
        conn.expire
      end

      conn.lock_thread = nil
      @available.add conn
    end
  end
end

checkout(checkout_timeout = @checkout_timeout)

从连接池中检出一个数据库连接,表示您要使用它。您应该在不再需要它时调用 checkin

这是通过返回和租赁现有连接,或创建新连接并租赁它来完成的。

如果所有连接都被租赁,并且连接池已达到容量(这意味着当前租赁的连接数量大于或等于设置的大小限制),则会引发 ActiveRecord::ConnectionTimeoutError 异常。

返回:一个 AbstractAdapter 对象。

引发

# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 352
def checkout(checkout_timeout = @checkout_timeout)
  connection = checkout_and_verify(acquire_connection(checkout_timeout))
  connection.lock_thread = @lock_thread
  connection
end

clear_reloadable_connections(raise_on_acquisition_timeout = true)

清除映射类的缓存,并重新连接需要重新加载的连接。

引发

# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 310
def clear_reloadable_connections(raise_on_acquisition_timeout = true)
  with_exclusively_acquired_all_connections(raise_on_acquisition_timeout) do
    synchronize do
      @connections.each do |conn|
        if conn.in_use?
          conn.steal!
          checkin conn
        end
        conn.disconnect! if conn.requires_reloading?
      end
      @connections.delete_if(&:requires_reloading?)
      @available.clear
    end
  end
end

clear_reloadable_connections!()

清除映射类的缓存,并重新连接需要重新加载的连接。

池首先尝试获得所有连接的所有权。如果在超时时间间隔内(默认持续时间为 spec.db_config.checkout_timeout * 2 秒)无法做到这一点,则池会强制清除缓存并重新加载连接,而不会考虑其他拥有连接的线程。

# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 334
def clear_reloadable_connections!
  clear_reloadable_connections(false)
end

connected?()

如果连接已打开,则返回 true。

# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 233
def connected?
  synchronize { @connections.any? }
end

connection()

检索与当前线程关联的连接,或者在必要时调用 checkout 来获取一个连接。

connection 可以被调用任意次数;连接保存在一个由线程作为键的缓存中。

# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 181
def connection
  @thread_cached_conns[connection_cache_key(current_thread)] ||= checkout
end

connections()

返回一个包含当前连接池中所有连接的数组。由于该数组是新创建的,并且不会被连接池保留,因此访问该数组不需要对连接池进行同步。

但是,此方法绕过了 ConnectionPool 的线程安全连接访问模式。返回的连接可能被另一个线程拥有,没有被拥有,或者碰巧被调用线程拥有。

在没有所有权的情况下调用连接上的方法将受到底层方法的线程安全保证的约束。连接适配器类中的许多方法本质上是多线程不安全的。

# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 248
def connections
  synchronize { @connections.dup }
end

disconnect(raise_on_acquisition_timeout = true)

断开连接池中的所有连接,并清空连接池。

引发

# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 258
def disconnect(raise_on_acquisition_timeout = true)
  with_exclusively_acquired_all_connections(raise_on_acquisition_timeout) do
    synchronize do
      @connections.each do |conn|
        if conn.in_use?
          conn.steal!
          checkin conn
        end
        conn.disconnect!
      end
      @connections = []
      @available.clear
    end
  end
end

disconnect!()

断开连接池中的所有连接,并清空连接池。

连接池首先尝试获取所有连接的所有权。如果在超时时间间隔内(默认持续时间为 spec.db_config.checkout_timeout * 2 秒)无法做到这一点,则连接池将强制断开连接,而不考虑其他拥有连接的线程。

# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 280
def disconnect!
  disconnect(false)
end

flush(minimum_idle = @idle_timeout)

断开所有处于空闲状态至少 minimum_idle 秒的连接。当前签出的连接,或在 minimum_idle 秒前签入的连接不受影响。

# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 435
def flush(minimum_idle = @idle_timeout)
  return if minimum_idle.nil?

  idle_connections = synchronize do
    return if self.discarded?
    @connections.select do |conn|
      !conn.in_use? && conn.seconds_idle >= minimum_idle
    end.each do |conn|
      conn.lease

      @available.delete conn
      @connections.delete conn
    end
  end

  idle_connections.each do |conn|
    conn.disconnect!
  end
end

flush!()

断开所有当前处于空闲状态的连接。当前签出的连接不受影响。

# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 457
def flush!
  reap
  flush(-1)
end

lock_thread=(lock_thread)

# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 164
def lock_thread=(lock_thread)
  if lock_thread
    @lock_thread = ActiveSupport::IsolatedExecutionState.context
  else
    @lock_thread = nil
  end

  if (active_connection = @thread_cached_conns[connection_cache_key(current_thread)])
    active_connection.lock_thread = @lock_thread
  end
end

reap()

恢复连接池中丢失的连接。如果程序员忘记在线程结束时检入连接,或者线程意外死亡,则可能会发生连接丢失。

# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 412
def reap
  stale_connections = synchronize do
    return if self.discarded?
    @connections.select do |conn|
      conn.in_use? && !conn.owner.alive?
    end.each do |conn|
      conn.steal!
    end
  end

  stale_connections.each do |conn|
    if conn.active?
      conn.reset!
      checkin conn
    else
      remove conn
    end
  end
end

release_connection(owner_thread = ActiveSupport::IsolatedExecutionState.context)

表示线程已完成当前连接。 release_connection 释放连接-线程关联并将连接返回到连接池。

此方法仅适用于通过 connectionwith_connection 方法获得的连接,通过 checkout 获得的连接不会自动释放。

# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 207
def release_connection(owner_thread = ActiveSupport::IsolatedExecutionState.context)
  if conn = @thread_cached_conns.delete(connection_cache_key(owner_thread))
    checkin conn
  end
end

remove(conn)

从连接池中删除连接。该连接将保持打开和活动状态,但将不再由该池管理。

# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 380
def remove(conn)
  needs_new_connection = false

  synchronize do
    remove_connection_from_thread_cache conn

    @connections.delete conn
    @available.delete conn

    # @available.any_waiting? => true means that prior to removing this
    # conn, the pool was at its max size (@connections.size == @size).
    # This would mean that any threads stuck waiting in the queue wouldn't
    # know they could checkout_new_connection, so let's do it for them.
    # Because condition-wait loop is encapsulated in the Queue class
    # (that in turn is oblivious to ConnectionPool implementation), threads
    # that are "stuck" there are helpless. They have no way of creating
    # new connections and are completely reliant on us feeding available
    # connections into the Queue.
    needs_new_connection = @available.any_waiting?
  end

  # This is intentionally done outside of the synchronized section as we
  # would like not to hold the main mutex while checking out new connections.
  # Thus there is some chance that needs_new_connection information is now
  # stale, we can live with that (bulk_make_new_connections will make
  # sure not to exceed the pool's @size limit).
  bulk_make_new_connections(1) if needs_new_connection
end

stat()

返回连接池的使用统计信息。

ActiveRecord::Base.connection_pool.stat # => { size: 15, connections: 1, busy: 1, dead: 0, idle: 0, waiting: 0, checkout_timeout: 5 }
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 469
def stat
  synchronize do
    {
      size: size,
      connections: @connections.size,
      busy: @connections.count { |c| c.in_use? && c.owner.alive? },
      dead: @connections.count { |c| c.in_use? && !c.owner.alive? },
      idle: @connections.count { |c| !c.in_use? },
      waiting: num_waiting_in_queue,
      checkout_timeout: checkout_timeout
    }
  end
end

with_connection()

将连接池中的连接传递给块。如果当前线程尚未检出任何连接,则将从池中检出一个连接,传递给块,并在块完成后返回到池中。如果当前线程已检出连接,例如通过 connectionwith_connection,则该现有连接将是传递的连接,并且它不会在块结束时自动返回到池中;预计检出该现有连接的代码将适当地将其返回到池中。

# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 222
def with_connection
  unless conn = @thread_cached_conns[connection_cache_key(ActiveSupport::IsolatedExecutionState.context)]
    conn = connection
    fresh_connection = true
  end
  yield conn
ensure
  release_connection if fresh_connection
end