Class: Rage::Cable::Protocols::Base
- Inherits:
-
Object
- Object
- Rage::Cable::Protocols::Base
- Defined in:
- lib/rage/cable/protocols/base.rb
Overview
A protocol defines the structure, rules and semantics for exchanging data between the client and the server. A protocol class should inherit from Base and implement the following methods:
-
on_open
-
on_message
-
serialize
The optional methods are:
-
protocol_definition
-
on_shutdown
-
on_close
Direct Known Subclasses
Class Method Summary collapse
-
.broadcast(name, data) ⇒ Object
Broadcast data to all clients connected to a stream.
- .init(router) ⇒ Object
- .protocol_definition ⇒ Object
-
.subscribe(connection, name, params) ⇒ Object
Subscribe to a stream.
-
.supports_rpc? ⇒ Boolean
Whether the protocol allows remote procedure calls.
Class Method Details
.broadcast(name, data) ⇒ Object
Broadcast data to all clients connected to a stream.
69 70 71 72 73 |
# File 'lib/rage/cable/protocols/base.rb', line 69 def broadcast(name, data) @subscription_identifiers[name].each do |params| ::Iodine.publish("cable:#{name}:#{stream_id(params)}", serialize(params, data)) end end |
.init(router) ⇒ Object
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/rage/cable/protocols/base.rb', line 25 def init(router) @router = router # Hash<String(stream name) => Set<Hash>(subscription params)> @subscription_identifiers = Hash.new { |hash, key| hash[key] = Set.new } Iodine.on_state(:pre_start) do # this is a fallback to synchronize subscription identifiers across different worker processes; # we expect connections to be distributed among all workers, so this code will almost never be called; # we also synchronize subscriptions with the master process so that the forks that are spun up instead # of the crashed ones also had access to the identifiers; Iodine.subscribe("cable:synchronize") do |_, subscription_msg| stream_name, params = Rage::ParamsParser.json_parse(subscription_msg) @subscription_identifiers[stream_name] << params end end Iodine.on_state(:on_finish) do Iodine.unsubscribe("cable:synchronize") end end |
.protocol_definition ⇒ Object
47 48 49 |
# File 'lib/rage/cable/protocols/base.rb', line 47 def protocol_definition HANDSHAKE_HEADERS end |
.subscribe(connection, name, params) ⇒ Object
Subscribe to a stream.
56 57 58 59 60 61 62 63 |
# File 'lib/rage/cable/protocols/base.rb', line 56 def subscribe(connection, name, params) connection.subscribe("cable:#{name}:#{stream_id(params)}") unless @subscription_identifiers[name].include?(params) @subscription_identifiers[name] << params ::Iodine.publish("cable:synchronize", [name, params].to_json) end end |
.supports_rpc? ⇒ Boolean
Whether the protocol allows remote procedure calls.
78 79 80 |
# File 'lib/rage/cable/protocols/base.rb', line 78 def supports_rpc? true end |