跳至内容 跳至搜索

Action Cable Connection Base

对于 Action Cable 服务器接受的每个 WebSocket 连接,都会实例化一个 Connection 对象。此实例成为从那里创建的所有频道订阅的父级。传入的消息然后根据 Action Cable 消费者发送的标识符路由到这些频道订阅。该 Connection 本身不处理任何特定于应用程序的逻辑,除了身份验证和授权。

这是一个基本示例

module ApplicationCable
  class Connection < ActionCable::Connection::Base
    identified_by :current_user

    def connect
      self.current_user = find_verified_user
      logger.add_tags current_user.name
    end

    def disconnect
      # Any cleanup work needed when the cable connection is cut.
    end

    private
      def find_verified_user
        User.find_by_identity(cookies.encrypted[:identity_id]) ||
          reject_unauthorized_connection
      end
  end
end

首先,我们声明此连接可以通过其 current_user 来标识。这使我们能够稍后找到为该 current_user 建立的所有连接(并可能断开它们的连接)。您可以声明任意数量的标识索引。声明标识意味着会为该键自动设置一个 attr_accessor。

其次,我们依赖于以下事实:WebSocket 连接是使用来自所发送域的 Cookie 建立的。这使得使用在通过 Web 界面登录时设置的已签署的 Cookie 来授权 WebSocket 连接变得容易。

最后,我们使用当前用户的名称向连接特定的记录器添加一个标签,以便轻松区分日志中的消息。

很简单吧?

方法
B
C
H
N
R
S
包含的模块

属性

[R] env
[R] logger
[R] protocol
[R] server
[R] subscriptions
[R] worker_pool

类公共方法

new(server, env, coder: ActiveSupport::JSON)

# File actioncable/lib/action_cable/connection/base.rb, line 67
def initialize(server, env, coder: ActiveSupport::JSON)
  @server, @env, @coder = server, env, coder

  @worker_pool = server.worker_pool
  @logger = new_tagged_logger

  @websocket      = ActionCable::Connection::WebSocket.new(env, self, event_loop)
  @subscriptions  = ActionCable::Connection::Subscriptions.new(self)
  @message_buffer = ActionCable::Connection::MessageBuffer.new(self)

  @_internal_subscriptions = nil
  @started_at = Time.now
end

实例公共方法

beat()

# File actioncable/lib/action_cable/connection/base.rb, line 147
def beat
  transmit type: ActionCable::INTERNAL[:message_types][:ping], message: Time.now.to_i
end

close(reason: nil, reconnect: true)

关闭 WebSocket 连接。

# File actioncable/lib/action_cable/connection/base.rb, line 120
def close(reason: nil, reconnect: true)
  transmit(
    type: ActionCable::INTERNAL[:message_types][:disconnect],
    reason: reason,
    reconnect: reconnect
  )
  websocket.close
end

handle_channel_command(payload)

# File actioncable/lib/action_cable/connection/base.rb, line 109
def handle_channel_command(payload)
  run_callbacks :command do
    subscriptions.execute_command payload
  end
end

send_async(method, *arguments)

通过线程工作池异步调用连接上的方法。

# File actioncable/lib/action_cable/connection/base.rb, line 131
def send_async(method, *arguments)
  worker_pool.async_invoke(self, method, *arguments)
end

statistics()

返回连接的基本统计信息哈希,其键为 identifierstarted_atsubscriptionsrequest_id。这可以由针对连接的健康检查返回。

# File actioncable/lib/action_cable/connection/base.rb, line 138
def statistics
  {
    identifier: connection_identifier,
    started_at: @started_at,
    subscriptions: subscriptions.identifiers,
    request_id: @env["action_dispatch.request_id"]
  }
end

实例私有方法

cookies()

发起 WebSocket 连接的请求的 Cookie。用于执行授权检查。

# File actioncable/lib/action_cable/connection/base.rb, line 187
def cookies # :doc:
  request.cookie_jar
end

request()

发起 WebSocket 连接的请求在此处可用。这允许访问环境、Cookie 等。

# File actioncable/lib/action_cable/connection/base.rb, line 178
def request # :doc:
  @request ||= begin
    environment = Rails.application.env_config.merge(env) if defined?(Rails.application) && Rails.application
    ActionDispatch::Request.new(environment || env)
  end
end