Action Cable 频道流
Streams
允许频道将广播路由到订阅者。如在其他地方所讨论的,广播是一个发布/订阅队列,其中放置的任何数据都会自动发送到当时已连接的客户端。不过,它纯粹是一个在线队列。如果您没有在广播发送更新时进行流式传输,则您将不会收到该更新,即使您在发送后才连接也是如此。
最常见的情况是,流式广播直接发送到客户端的订阅者。频道只是充当两方(广播者和频道订阅者)之间的连接器。以下是一个允许订阅者获取给定页面上的所有新评论的频道的示例
class CommentsChannel < ApplicationCable::Channel
def follow(data)
stream_from "comments_for_#{data['recording_id']}"
end
def unfollow
stop_all_streams
end
end
根据上述示例,此频道的订阅者将得到放入广播中的任何数据,比如 comments_for_45
,只要数据被放入其中。
此频道的广播示例如下所示
ActionCable.server.broadcast "comments_for_45", { author: 'DHH', content: 'Rails is just swell' }
如果您有一个与模型相关的流,则可以从模型和频道生成所使用的广播。以下示例将订阅类似 comments:Z2lkOi8vVGVzdEFwcC9Qb3N0LzE
的广播。
class CommentsChannel < ApplicationCable::Channel
def subscribed
post = Post.find(params[:id])
stream_for post
end
end
然后,您可以使用以下方法向此频道广播
CommentsChannel.broadcast_to(@post, @comment)
如果您不想将广播未经过滤地传递给订阅者,您还可以提供一个回调,让您更改发送的内容。以下示例显示了如何使用此方法在过程中提供性能自省
class ChatChannel < ApplicationCable::Channel
def subscribed
@room = Chat::Room[params[:room_number]]
stream_for @room, coder: ActiveSupport::JSON do |message|
if message['originated_at'].present?
elapsed_time = (Time.now.to_f - message['originated_at']).round(2)
ActiveSupport::Notifications.instrument :performance, measurement: 'Chat.message_delay', value: elapsed_time, action: :timing
logger.info "Message took #{elapsed_time}s to arrive"
end
transmit message
end
end
end
您可以通过调用 stop_all_streams
停止从所有广播流式传输。
- S
实例公共方法
stop_all_streams() 链接
取消订阅与该频道关联的所有流,使其不再从 pubsub 队列中接收消息。
来源:显示 | 在 GitHub 上
# File actioncable/lib/action_cable/channel/streams.rb, line 122 def stop_all_streams streams.each do |broadcasting, callback| pubsub.unsubscribe broadcasting, callback logger.info "#{self.class.name} stopped streaming from #{broadcasting}" end.clear end
stop_stream_for(model) 链接
取消订阅 model
的流。
来源:显示 | 在 GitHub 上
# File actioncable/lib/action_cable/channel/streams.rb, line 117 def stop_stream_for(model) stop_stream_from(broadcasting_for(model)) end
stop_stream_from(broadcasting) 链接
取消订阅名为 broadcasting
的流。
来源:显示 | 在 GitHub 上
# File actioncable/lib/action_cable/channel/streams.rb, line 108 def stop_stream_from(broadcasting) callback = streams.delete(broadcasting) if callback pubsub.unsubscribe(broadcasting, callback) logger.info "#{self.class.name} stopped streaming from #{broadcasting}" end end
stream_for(model, callback = nil, coder: nil, &block) 链接
开始在这个频道中为 model
流式传输 pubsub 队列。或者,你可以传递一个 callback
,它将代替默认值,即直接将更新传输给订阅者。
传递 coder: ActiveSupport::JSON
以在传递给回调之前将消息解码为 JSON。默认为 coder: nil
,它不进行解码,传递原始消息。
来源:显示 | 在 GitHub 上
# File actioncable/lib/action_cable/channel/streams.rb, line 103 def stream_for(model, callback = nil, coder: nil, &block) stream_from(broadcasting_for(model), callback || block, coder: coder) end
stream_from(broadcasting, callback = nil, coder: nil, &block) 链接
开始从名为 broadcasting
的 pubsub 队列流式传输。或者,你可以传递一个 callback
,它将代替默认值,即直接将更新传输给订阅者。传递 coder: ActiveSupport::JSON
以在传递给回调之前将消息解码为 JSON。默认为 coder: nil
,它不进行解码,传递原始消息。
来源:显示 | 在 GitHub 上
# File actioncable/lib/action_cable/channel/streams.rb, line 78 def stream_from(broadcasting, callback = nil, coder: nil, &block) broadcasting = String(broadcasting) # Don't send the confirmation until pubsub#subscribe is successful defer_subscription_confirmation! # Build a stream handler by wrapping the user-provided callback with # a decoder or defaulting to a JSON-decoding retransmitter. handler = worker_pool_stream_handler(broadcasting, callback || block, coder: coder) streams[broadcasting] = handler connection.server.event_loop.post do pubsub.subscribe(broadcasting, handler, lambda do ensure_confirmation_sent logger.info "#{self.class.name} is streaming from #{broadcasting}" end) end end
stream_or_reject_for(model) 链接
如果存在,则使用给定的 model
调用 stream_for
以开始流式传输,否则拒绝订阅。
来源:显示 | 在 GitHub 上
# File actioncable/lib/action_cable/channel/streams.rb, line 131 def stream_or_reject_for(model) if model stream_for model else reject end end