跳至内容 跳至搜索

Action Cable Channel Streams

Streams 允许频道将广播路由到订阅者。正如其他地方所述,广播是一个发布/订阅队列,任何放入其中的数据都会自动发送到此时连接的客户端。不过,它纯粹是一个在线队列。如果您没有在广播发送更新的同一时刻流式传输广播,即使您在发送后连接,您也不会收到该更新。

最常见的是,流式传输的广播直接发送到客户端的订阅者。该频道只是充当两个参与者(广播者和频道订阅者)之间的连接器。以下是一个允许订阅者获取给定页面上的所有新评论的频道的示例

class CommentsChannel < ApplicationCable::Channel
  def follow(data)
    stream_from "comments_for_#{data['recording_id']}"
  end

  def unfollow
    stop_all_streams
  end
end

根据上面的示例,此频道的订阅者将获得任何放入,例如,comments_for_45 广播中的数据,只要它被放入那里。

此频道的示例广播如下所示

ActionCable.server.broadcast "comments_for_45", { author: 'DHH', content: 'Rails is just swell' }

如果您有一个与模型相关的流,那么使用的广播可以从模型和频道生成。以下示例将订阅类似于 comments:Z2lkOi8vVGVzdEFwcC9Qb3N0LzE 的广播。

class CommentsChannel < ApplicationCable::Channel
  def subscribed
    post = Post.find(params[:id])
    stream_for post
  end
end

然后,您可以使用以下命令广播到此频道

CommentsChannel.broadcast_to(@post, @comment)

如果您不希望将广播直接传递给订阅者,还可以提供一个回调,让您更改发送出去的内容。以下示例显示了如何在过程中使用它来提供性能自省

class ChatChannel < ApplicationCable::Channel
  def subscribed
    @room = Chat::Room[params[:room_number]]

    stream_for @room, coder: ActiveSupport::JSON do |message|
      if message['originated_at'].present?
        elapsed_time = (Time.now.to_f - message['originated_at']).round(2)

        ActiveSupport::Notifications.instrument :performance, measurement: 'Chat.message_delay', value: elapsed_time, action: :timing
        logger.info "Message took #{elapsed_time}s to arrive"
      end

      transmit message
    end
  end
end

您可以通过调用 stop_all_streams 来停止从所有广播流式传输。

方法
S

实例公有方法

stop_all_streams()

取消订阅与该频道关联的所有流,来自发布/订阅队列。

# File actioncable/lib/action_cable/channel/streams.rb, line 135
def stop_all_streams
  streams.each do |broadcasting, callback|
    pubsub.unsubscribe broadcasting, callback
    logger.info "#{self.class.name} stopped streaming from #{broadcasting}"
  end.clear
end

stop_stream_for(model)

取消订阅 model 的流。

# File actioncable/lib/action_cable/channel/streams.rb, line 130
def stop_stream_for(model)
  stop_stream_from(broadcasting_for(model))
end

stop_stream_from(broadcasting)

取消订阅来自命名 broadcasting 的流。

# File actioncable/lib/action_cable/channel/streams.rb, line 121
def stop_stream_from(broadcasting)
  callback = streams.delete(broadcasting)
  if callback
    pubsub.unsubscribe(broadcasting, callback)
    logger.info "#{self.class.name} stopped streaming from #{broadcasting}"
  end
end

stream_for(model, callback = nil, coder: nil, &block)

在此频道中开始流式传输 model 的发布/订阅队列。您可以选择传递一个 callback,它将用于代替默认的直接将更新传输到订阅者。

传递 coder: ActiveSupport::JSON 将消息解码为 JSON,然后传递给回调。默认为 coder: nil,不进行解码,传递原始消息。

# File actioncable/lib/action_cable/channel/streams.rb, line 116
def stream_for(model, callback = nil, coder: nil, &block)
  stream_from(broadcasting_for(model), callback || block, coder: coder)
end

stream_from(broadcasting, callback = nil, coder: nil, &block)

开始从名为 broadcasting 的发布/订阅队列流式传输。您可以选择传递一个 callback,它将用于代替默认的直接将更新传输到订阅者。传递 coder: ActiveSupport::JSON 将消息解码为 JSON,然后传递给回调。默认为 coder: nil,不进行解码,传递原始消息。

# File actioncable/lib/action_cable/channel/streams.rb, line 90
def stream_from(broadcasting, callback = nil, coder: nil, &block)
  broadcasting = String(broadcasting)

  # Don't send the confirmation until pubsub#subscribe is successful
  defer_subscription_confirmation!

  # Build a stream handler by wrapping the user-provided callback with a decoder
  # or defaulting to a JSON-decoding retransmitter.
  handler = worker_pool_stream_handler(broadcasting, callback || block, coder: coder)
  streams[broadcasting] = handler

  connection.server.event_loop.post do
    pubsub.subscribe(broadcasting, handler, lambda do
      ensure_confirmation_sent
      logger.info "#{self.class.name} is streaming from #{broadcasting}"
    end)
  end
end

stream_or_reject_for(model)

如果 model 存在,则调用 stream_for 来开始流式传输,否则拒绝订阅。

# File actioncable/lib/action_cable/channel/streams.rb, line 144
def stream_or_reject_for(model)
  if model
    stream_for model
  else
    reject
  end
end