Class: Rage::Cable::Protocol::ActioncableV1Json
- Inherits:
-
Object
- Object
- Rage::Cable::Protocol::ActioncableV1Json
- Defined in:
- lib/rage/cable/protocol/actioncable_v1_json.rb
Overview
A protocol defines the structure, rules and semantics for exchanging data between the client and the server. The class that defines a protocol should respond to the following methods:
-
protocol_definition
-
init
-
on_open
-
on_message
-
serialize
-
subscribe
-
broadcast
The two optional methods are:
-
on_shutdown
-
on_close
It is likely that all logic around @subscription_identifiers
has nothing to do with the protocol itself and should be extracted into another class. We’ll refactor this once we start working on a new protocol.
Defined Under Namespace
Modules: COMMAND, MESSAGES, REASON, TYPE
Constant Summary collapse
- HANDSHAKE_HEADERS =
{ "Sec-WebSocket-Protocol" => "actioncable-v1-json" }
Class Method Summary collapse
-
.broadcast(name, data) ⇒ Object
Broadcast data to all clients connected to a stream.
-
.init(router) ⇒ Object
This method serves as a constructor to prepare the object or set up recurring tasks (e.g. heartbeats).
-
.on_close(connection) ⇒ Object
The method should process client disconnections and call Router#process_message.
-
.on_message(connection, raw_data) ⇒ Object
The method processes messages from existing connections.
-
.on_open(connection) ⇒ Object
The method is called any time a new WebSocket connection is established.
-
.protocol_definition ⇒ Object
The method defines the headers to send to the client after the handshake process.
-
.serialize(params, data) ⇒ Object
Serialize a Ruby object into the format the client would understand.
-
.subscribe(connection, name, params) ⇒ Object
Subscribe to a stream.
Class Method Details
.broadcast(name, data) ⇒ Object
Broadcast data to all clients connected to a stream.
186 187 188 189 190 |
# File 'lib/rage/cable/protocol/actioncable_v1_json.rb', line 186 def self.broadcast(name, data) @subscription_identifiers[name].each do |params| ::Iodine.publish("cable:#{name}:#{Zlib.crc32(params.to_s)}", serialize(params, data)) end end |
.init(router) ⇒ Object
This method serves as a constructor to prepare the object or set up recurring tasks (e.g. heartbeats).
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/rage/cable/protocol/actioncable_v1_json.rb', line 61 def self.init(router) @router = router Iodine.on_state(:on_start) do ping_counter = Time.now.to_i Iodine.run_every(3000) do ping_counter += 1 Iodine.publish("cable:ping", { type: TYPE::PING, message: ping_counter }.to_json, Iodine::PubSub::PROCESS) end end # Hash<String(stream name) => Set<Hash>(subscription params)> @subscription_identifiers = Hash.new { |hash, key| hash[key] = Set.new } Iodine.on_state(:pre_start) do # this is a fallback to synchronize subscription identifiers across different worker processes; # we expect connections to be distributed among all workers, so this code will almost never be called; # we also synchronize subscriptions with the master process so that the forks that are spun up instead # of the crashed ones also had access to the identifiers; Iodine.subscribe("cable:synchronize") do |_, subscription_msg| stream_name, params = Rage::ParamsParser.json_parse(subscription_msg) @subscription_identifiers[stream_name] << params end end Iodine.on_state(:on_finish) do Iodine.unsubscribe("cable:synchronize") end end |
.on_close(connection) ⇒ Object
This method is optional.
The method should process client disconnections and call Router#process_message.
156 157 158 |
# File 'lib/rage/cable/protocol/actioncable_v1_json.rb', line 156 def self.on_close(connection) @router.process_disconnection(connection) end |
.on_message(connection, raw_data) ⇒ Object
The method processes messages from existing connections. It should parse the message, call either Router#process_subscription or Router#process_message, and handle its return value.
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
# File 'lib/rage/cable/protocol/actioncable_v1_json.rb', line 115 def self.(connection, raw_data) parsed_data = Rage::ParamsParser.json_parse(raw_data) command, identifier = parsed_data[:command], parsed_data[:identifier] params = Rage::ParamsParser.json_parse(identifier) # process subscription messages if command == COMMAND::SUBSCRIBE status = @router.process_subscription(connection, identifier, params[:channel], params) if status == :subscribed connection.write({ identifier: identifier, type: TYPE::CONFIRM }.to_json) elsif status == :rejected connection.write({ identifier: identifier, type: TYPE::REJECT }.to_json) elsif status == :invalid connection.write(MESSAGES::INVALID) end return end # process data messages; # plain `JSON` is used here to conform with the ActionCable API that passes `data` as a Hash with string keys; data = JSON.parse(parsed_data[:data]) = if command == COMMAND::MESSAGE && data.has_key?("action") @router.(connection, identifier, data["action"].to_sym, data) elsif command == COMMAND::MESSAGE @router.(connection, identifier, :receive, data) end unless == :processed connection.write(MESSAGES::INVALID) end end |
.on_open(connection) ⇒ Object
The method is called any time a new WebSocket connection is established. It is expected to call Router#process_connection and handle its return value.
97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/rage/cable/protocol/actioncable_v1_json.rb', line 97 def self.on_open(connection) accepted = @router.process_connection(connection) if accepted connection.subscribe("cable:ping") connection.write(MESSAGES::WELCOME) else connection.write(MESSAGES::UNAUTHORIZED) connection.close end end |
.protocol_definition ⇒ Object
The method defines the headers to send to the client after the handshake process.
54 55 56 |
# File 'lib/rage/cable/protocol/actioncable_v1_json.rb', line 54 def self.protocol_definition HANDSHAKE_HEADERS end |
.serialize(params, data) ⇒ Object
Serialize a Ruby object into the format the client would understand.
164 165 166 |
# File 'lib/rage/cable/protocol/actioncable_v1_json.rb', line 164 def self.serialize(params, data) { identifier: params.to_json, message: data }.to_json end |
.subscribe(connection, name, params) ⇒ Object
Subscribe to a stream.
173 174 175 176 177 178 179 180 |
# File 'lib/rage/cable/protocol/actioncable_v1_json.rb', line 173 def self.subscribe(connection, name, params) connection.subscribe("cable:#{name}:#{Zlib.crc32(params.to_s)}") unless @subscription_identifiers[name].include?(params) @subscription_identifiers[name] << params ::Iodine.publish("cable:synchronize", [name, params].to_json) end end |