跳至内容 跳至搜索

Active Record 连接池

用于管理 Active Record 数据库连接的连接池基类。

简介

连接池同步线程对有限数量的数据库连接的访问。基本思想是每个线程从池中检出一个数据库连接,使用该连接,然后将连接检入。 ConnectionPool 是完全线程安全的,并且将确保一个连接不能被两个线程同时使用,只要正确遵循 ConnectionPool 的约定。它还将处理线程数量超过连接数量的情况:如果所有连接都被检出,而线程尝试检出一个连接,那么 ConnectionPool 将等待直到其他线程检入一个连接,或者 checkout_timeout 超时。

获取(检出)连接

可以通过几种方式从连接池获取和使用连接

  1. 只需使用 ActiveRecord::Base.lease_connection。完成对连接的使用,并希望将其返回到池中时,您可以调用 ActiveRecord::Base.connection_handler.clear_active_connections!。这是在与 Action Pack 的请求处理周期结合使用时,Active Record 的默认行为。

  2. 使用 ActiveRecord::Base.connection_pool.checkout 从池中手动检出一个连接。您有责任在完成时通过调用 ActiveRecord::Base.connection_pool.checkin(connection) 将此连接返回到池中。

  3. 使用 ActiveRecord::Base.connection_pool.with_connection(&block),它获取一个连接,将其作为唯一参数传递给块,并在块完成后将其返回到池中。

池中的连接实际上是 AbstractAdapter 对象(或与 AbstractAdapter 接口兼容的对象)。

当线程使用上述三种方法之一从池中检出一个连接时,该连接将自动成为在该线程上执行的 ActiveRecord 查询使用的连接。例如,不需要显式地将检出的连接传递给 Rails 模型或查询。

选项

您可以在数据库连接配置中添加一些与连接池相关的选项

  • pool:池可以管理的最大连接数(默认为 5)。

  • idle_timeout:连接在池中保持不使用的时间(秒),在超过此时间后,连接将自动断开(默认为 300 秒)。将此值设置为零表示永远保持连接。

  • checkout_timeout:在放弃并引发超时错误之前,等待连接可用(秒)(默认为 5 秒)。

命名空间
方法
A
C
D
F
I
L
N
R
S
W
包含模块

属性

[R] async_executor
[RW] automatic_reconnect
[RW] checkout_timeout
[R] db_config
[R] pool_config
[R] reaper
[R] role
[R] shard
[R] size

类公共方法

install_executor_hooks(executor = ActiveSupport::Executor)

# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 206
def install_executor_hooks(executor = ActiveSupport::Executor)
  executor.register_hook(ExecutorHooks)
end

new(pool_config)

创建一个新的 ConnectionPool 对象。pool_config 是一个 PoolConfig 对象,它描述了数据库连接信息(例如适配器、主机名、用户名、密码等),以及此 ConnectionPool 的最大大小。

默认的 ConnectionPool 最大大小为 5。

# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 226
def initialize(pool_config)
  super()

  @pool_config = pool_config
  @db_config = pool_config.db_config
  @role = pool_config.role
  @shard = pool_config.shard

  @checkout_timeout = db_config.checkout_timeout
  @idle_timeout = db_config.idle_timeout
  @size = db_config.pool

  # This variable tracks the cache of threads mapped to reserved connections, with the
  # sole purpose of speeding up the +connection+ method. It is not the authoritative
  # registry of which thread owns which connection. Connection ownership is tracked by
  # the +connection.owner+ attr on each +connection+ instance.
  # The invariant works like this: if there is mapping of <tt>thread => conn</tt>,
  # then that +thread+ does indeed own that +conn+. However, an absence of such
  # mapping does not mean that the +thread+ doesn't own the said connection. In
  # that case +conn.owner+ attr should be consulted.
  # Access and modification of <tt>@leases</tt> does not require
  # synchronization.
  @leases = LeaseRegistry.new

  @connections         = []
  @automatic_reconnect = true

  # Connection pool allows for concurrent (outside the main +synchronize+ section)
  # establishment of new connections. This variable tracks the number of threads
  # currently in the process of independently establishing connections to the DB.
  @now_connecting = 0

  @threads_blocking_new_connections = 0

  @available = ConnectionLeasingQueue.new self
  @pinned_connection = nil
  @pinned_connections_depth = 0

  @async_executor = build_async_executor

  @schema_cache = nil

  @reaper = Reaper.new(self, db_config.reaping_frequency)
  @reaper.run
end

实例公共方法

active_connection?()

如果当前线程正在使用一个打开的连接,则返回 true。

此方法仅适用于通过 lease_connectionwith_connection 方法获取的连接。通过 checkout 获取的连接不会被 active_connection? 检测到

# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 370
def active_connection?
  connection_lease.connection
end

checkin(conn)

将数据库连接检入池中,表示您不再需要此连接。

conn:一个 AbstractAdapter 对象,它通过先前调用此池的 checkout 获取。

# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 564
def checkin(conn)
  return if @pinned_connection.equal?(conn)

  conn.lock.synchronize do
    synchronize do
      connection_lease.clear(conn)

      conn._run_checkin_callbacks do
        conn.expire
      end

      @available.add conn
    end
  end
end

checkout(checkout_timeout = @checkout_timeout)

从池中检出一个数据库连接,表示您要使用它。您应该在不再需要它时调用 checkin

这可以通过返回和租赁现有连接,或通过创建新的连接并租赁它来完成。

如果所有连接都被租赁并且池已达到容量(意味着当前租赁的连接数量大于或等于设置的大小限制),将引发 ActiveRecord::ConnectionTimeoutError 异常。

返回:一个 AbstractAdapter 对象。

引发

# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 541
def checkout(checkout_timeout = @checkout_timeout)
  if @pinned_connection
    @pinned_connection.lock.synchronize do
      synchronize do
        @pinned_connection.verify!
        # Any leased connection must be in @connections otherwise
        # some methods like #connected? won't behave correctly
        unless @connections.include?(@pinned_connection)
          @connections << @pinned_connection
        end
      end
    end
    @pinned_connection
  else
    checkout_and_verify(acquire_connection(checkout_timeout))
  end
end

clear_reloadable_connections(raise_on_acquisition_timeout = true)

清除将类映射到重新连接的连接的缓存。

引发

# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 499
def clear_reloadable_connections(raise_on_acquisition_timeout = true)
  with_exclusively_acquired_all_connections(raise_on_acquisition_timeout) do
    synchronize do
      @connections.each do |conn|
        if conn.in_use?
          conn.steal!
          checkin conn
        end
        conn.disconnect! if conn.requires_reloading?
      end
      @connections.delete_if(&:requires_reloading?)
      @available.clear
    end
  end
end

clear_reloadable_connections!()

清除将类映射到重新连接的连接的缓存。

池首先尝试获得所有连接的所有权。如果在超时时间内(默认持续时间为 spec.db_config.checkout_timeout * 2 秒)无法做到,则池将强制清除缓存并重新加载连接,而不考虑其他连接拥有线程。

# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 523
def clear_reloadable_connections!
  clear_reloadable_connections(false)
end

connected?()

如果连接已打开,则返回 true。

# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 421
def connected?
  synchronize { @connections.any?(&:connected?) }
end

connections()

返回一个包含当前池中连接的数组。访问数组不需要对池进行同步,因为数组是新创建的,并且不会被池保留。

但是;此方法绕过了 ConnectionPool 的线程安全连接访问模式。返回的连接可能被另一个线程拥有,没有拥有,或者偶然地被调用线程拥有。

在没有所有权的情况下调用连接上的方法会受到底层方法的线程安全保证的影响。连接适配器类上的许多方法本质上是多线程不安全的。

# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 436
def connections
  synchronize { @connections.dup }
end

disconnect(raise_on_acquisition_timeout = true)

断开池中的所有连接,并清除池。

引发

# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 446
def disconnect(raise_on_acquisition_timeout = true)
  with_exclusively_acquired_all_connections(raise_on_acquisition_timeout) do
    synchronize do
      @connections.each do |conn|
        if conn.in_use?
          conn.steal!
          checkin conn
        end
        conn.disconnect!
      end
      @connections = []
      @leases.clear
      @available.clear
    end
  end
end

disconnect!()

断开池中的所有连接,并清除池。

池首先尝试获得所有连接的所有权。如果在超时时间内(默认持续时间为 spec.db_config.checkout_timeout * 2 秒)无法做到,则池将强制断开连接,而不考虑其他连接拥有线程。

# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 469
def disconnect!
  disconnect(false)
end

flush(minimum_idle = @idle_timeout)

断开所有处于空闲状态至少 minimum_idle 秒的连接。当前已检出的连接,或在 minimum_idle 秒前已检入的连接,不会受到影响。

# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 637
def flush(minimum_idle = @idle_timeout)
  return if minimum_idle.nil?

  idle_connections = synchronize do
    return if self.discarded?
    @connections.select do |conn|
      !conn.in_use? && conn.seconds_idle >= minimum_idle
    end.each do |conn|
      conn.lease

      @available.delete conn
      @connections.delete conn
    end
  end

  idle_connections.each do |conn|
    conn.disconnect!
  end
end

flush!()

断开所有当前处于空闲状态的连接。当前已检出的连接不会受到影响。

# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 659
def flush!
  reap
  flush(-1)
end

lease_connection()

检索与当前线程关联的连接,或调用 checkout 以在必要时获取一个连接。

lease_connection 可以被调用任意次数;连接被保存在一个以线程为键的缓存中。

# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 309
def lease_connection
  lease = connection_lease
  lease.sticky = true
  lease.connection ||= checkout
end

reap()

恢复池中丢失的连接。如果程序员忘记在线程结束时检入连接,或线程意外死亡,则可能发生连接丢失。

# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 614
def reap
  stale_connections = synchronize do
    return if self.discarded?
    @connections.select do |conn|
      conn.in_use? && !conn.owner.alive?
    end.each do |conn|
      conn.steal!
    end
  end

  stale_connections.each do |conn|
    if conn.active?
      conn.reset!
      checkin conn
    else
      remove conn
    end
  end
end

release_connection(existing_lease = nil)

表示线程已完成对当前连接的使用。release_connection 释放连接-线程关联,并将连接返回到池中。

此方法仅适用于通过 lease_connectionwith_connection 方法获取的连接,通过 checkout 获取的连接不会自动释放。

# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 382
def release_connection(existing_lease = nil)
  if conn = connection_lease.release
    checkin conn
    return true
  end
  false
end

remove(conn)

从连接池中删除一个连接。连接将保持打开并处于活动状态,但不再由此池管理。

# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 582
def remove(conn)
  needs_new_connection = false

  synchronize do
    remove_connection_from_thread_cache conn

    @connections.delete conn
    @available.delete conn

    # @available.any_waiting? => true means that prior to removing this
    # conn, the pool was at its max size (@connections.size == @size).
    # This would mean that any threads stuck waiting in the queue wouldn't
    # know they could checkout_new_connection, so let's do it for them.
    # Because condition-wait loop is encapsulated in the Queue class
    # (that in turn is oblivious to ConnectionPool implementation), threads
    # that are "stuck" there are helpless. They have no way of creating
    # new connections and are completely reliant on us feeding available
    # connections into the Queue.
    needs_new_connection = @available.any_waiting?
  end

  # This is intentionally done outside of the synchronized section as we
  # would like not to hold the main mutex while checking out new connections.
  # Thus there is some chance that needs_new_connection information is now
  # stale, we can live with that (bulk_make_new_connections will make
  # sure not to exceed the pool's @size limit).
  bulk_make_new_connections(1) if needs_new_connection
end

schema_cache()

# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 279
def schema_cache
  @schema_cache ||= BoundSchemaReflection.new(schema_reflection, self)
end

schema_reflection=(schema_reflection)

# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 283
def schema_reflection=(schema_reflection)
  pool_config.schema_reflection = schema_reflection
  @schema_cache = nil
end

stat()

返回连接池的使用统计信息。

ActiveRecord::Base.connection_pool.stat # => { size: 15, connections: 1, busy: 1, dead: 0, idle: 0, waiting: 0, checkout_timeout: 5 }
# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 671
def stat
  synchronize do
    {
      size: size,
      connections: @connections.size,
      busy: @connections.count { |c| c.in_use? && c.owner.alive? },
      dead: @connections.count { |c| c.in_use? && !c.owner.alive? },
      idle: @connections.count { |c| !c.in_use? },
      waiting: num_waiting_in_queue,
      checkout_timeout: checkout_timeout
    }
  end
end

with_connection(prevent_permanent_checkout: false)

将连接池中的连接 yield 给块。如果当前线程尚未检出任何连接,则会从池中检出一个连接,yield 给块,并在块完成时将其返回到池中。如果当前线程已检出一个连接,例如通过 lease_connectionwith_connection,则将 yield 该现有连接,并且在块结束时不会将其自动返回到池中;预计此类现有连接将由检出它的代码正确地返回到池中。

# File activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb, line 399
def with_connection(prevent_permanent_checkout: false)
  lease = connection_lease
  sticky_was = lease.sticky
  lease.sticky = false if prevent_permanent_checkout

  if lease.connection
    begin
      yield lease.connection
    ensure
      lease.sticky = sticky_was if prevent_permanent_checkout && !sticky_was
    end
  else
    begin
      yield lease.connection = checkout
    ensure
      lease.sticky = sticky_was if prevent_permanent_checkout && !sticky_was
      release_connection(lease) unless lease.sticky
    end
  end
end