Class: Rage::Cable::Protocol::ActioncableV1Json
- Inherits:
-
Object
- Object
- Rage::Cable::Protocol::ActioncableV1Json
- Defined in:
- lib/rage/cable/protocol/actioncable_v1_json.rb
Overview
A protocol defines the structure, rules and semantics for exchanging data between the client and the server. The class that defines a protocol should respond to the following methods:
-
protocol_definition
-
init
-
on_open
-
on_message
-
serialize
-
subscribe
-
broadcast
The two optional methods are:
-
on_shutdown
-
on_close
It is likely that all logic around @subscription_identifiers
has nothing to do with the protocol itself and should be extracted into another class. We’ll refactor this once we start working on a new protocol.
Defined Under Namespace
Modules: COMMAND, MESSAGES, REASON, TYPE
Constant Summary collapse
- HANDSHAKE_HEADERS =
{ "Sec-WebSocket-Protocol" => "actioncable-v1-json" }
Class Method Summary collapse
-
.broadcast(name, data) ⇒ Object
Broadcast data to all clients connected to a stream.
-
.init(router) ⇒ Object
This method serves as a constructor to prepare the object or set up recurring tasks (e.g. heartbeats).
-
.on_close(connection) ⇒ Object
The method should process client disconnections and call Router#process_message.
-
.on_message(connection, raw_data) ⇒ Object
The method processes messages from existing connections.
-
.on_open(connection) ⇒ Object
The method is called any time a new WebSocket connection is established.
-
.protocol_definition ⇒ Object
The method defines the headers to send to the client after the handshake process.
-
.serialize(params, data) ⇒ Object
Serialize a Ruby object into the format the client would understand.
-
.subscribe(connection, name, params) ⇒ Object
Subscribe to a stream.
Class Method Details
.broadcast(name, data) ⇒ Object
Broadcast data to all clients connected to a stream.
184 185 186 187 188 |
# File 'lib/rage/cable/protocol/actioncable_v1_json.rb', line 184 def self.broadcast(name, data) @subscription_identifiers[name].each do |params| ::Iodine.publish("cable:#{name}:#{Zlib.crc32(params.to_s)}", serialize(params, data)) end end |
.init(router) ⇒ Object
This method serves as a constructor to prepare the object or set up recurring tasks (e.g. heartbeats).
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/rage/cable/protocol/actioncable_v1_json.rb', line 61 def self.init(router) @router = router Iodine.on_state(:on_start) do ping_counter = Time.now.to_i Iodine.run_every(3000) do ping_counter += 1 Iodine.publish("cable:ping", { type: TYPE::PING, message: ping_counter }.to_json, Iodine::PubSub::PROCESS) end end # Hash<String(stream name) => Set<Hash>(subscription params)> @subscription_identifiers = Hash.new { |hash, key| hash[key] = Set.new } # this is a fallback to synchronize subscription identifiers across different worker processes; # we expect connections to be distributed among all workers, so this code will almost never be called; # we also synchronize subscriptions with the master process so that the forks that are spun up instead # of the crashed ones also had access to the identifiers; Iodine.subscribe("cable:synchronize") do |_, subscription_msg| stream_name, params = Rage::ParamsParser.json_parse(subscription_msg) @subscription_identifiers[stream_name] << params end Iodine.on_state(:on_finish) do Iodine.unsubscribe("cable:synchronize") end end |
.on_close(connection) ⇒ Object
This method is optional.
The method should process client disconnections and call Router#process_message.
154 155 156 |
# File 'lib/rage/cable/protocol/actioncable_v1_json.rb', line 154 def self.on_close(connection) @router.process_disconnection(connection) end |
.on_message(connection, raw_data) ⇒ Object
The method processes messages from existing connections. It should parse the message, call either Router#process_subscription or Router#process_message, and handle its return value.
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 |
# File 'lib/rage/cable/protocol/actioncable_v1_json.rb', line 113 def self.(connection, raw_data) parsed_data = Rage::ParamsParser.json_parse(raw_data) command, identifier = parsed_data[:command], parsed_data[:identifier] params = Rage::ParamsParser.json_parse(identifier) # process subscription messages if command == COMMAND::SUBSCRIBE status = @router.process_subscription(connection, identifier, params[:channel], params) if status == :subscribed connection.write({ identifier: identifier, type: TYPE::CONFIRM }.to_json) elsif status == :rejected connection.write({ identifier: identifier, type: TYPE::REJECT }.to_json) elsif status == :invalid connection.write(MESSAGES::INVALID) end return end # process data messages; # plain `JSON` is used here to conform with the ActionCable API that passes `data` as a Hash with string keys; data = JSON.parse(parsed_data[:data]) = if command == COMMAND::MESSAGE && data.has_key?("action") @router.(connection, identifier, data["action"].to_sym, data) elsif command == COMMAND::MESSAGE @router.(connection, identifier, :receive, data) end unless == :processed connection.write(MESSAGES::INVALID) end end |
.on_open(connection) ⇒ Object
The method is called any time a new WebSocket connection is established. It is expected to call Router#process_connection and handle its return value.
95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/rage/cable/protocol/actioncable_v1_json.rb', line 95 def self.on_open(connection) accepted = @router.process_connection(connection) if accepted connection.subscribe("cable:ping") connection.write(MESSAGES::WELCOME) else connection.write(MESSAGES::UNAUTHORIZED) connection.close end end |
.protocol_definition ⇒ Object
The method defines the headers to send to the client after the handshake process.
54 55 56 |
# File 'lib/rage/cable/protocol/actioncable_v1_json.rb', line 54 def self.protocol_definition HANDSHAKE_HEADERS end |
.serialize(params, data) ⇒ Object
Serialize a Ruby object into the format the client would understand.
162 163 164 |
# File 'lib/rage/cable/protocol/actioncable_v1_json.rb', line 162 def self.serialize(params, data) { identifier: params.to_json, message: data }.to_json end |
.subscribe(connection, name, params) ⇒ Object
Subscribe to a stream.
171 172 173 174 175 176 177 178 |
# File 'lib/rage/cable/protocol/actioncable_v1_json.rb', line 171 def self.subscribe(connection, name, params) connection.subscribe("cable:#{name}:#{Zlib.crc32(params.to_s)}") unless @subscription_identifiers[name].include?(params) @subscription_identifiers[name] << params ::Iodine.publish("cable:synchronize", [name, params].to_json) end end |