Class: Rage::Cable::Protocol::ActioncableV1Json

Inherits:
Object
  • Object
show all
Defined in:
lib/rage/cable/protocol/actioncable_v1_json.rb

Overview

A protocol defines the structure, rules and semantics for exchanging data between the client and the server. The class that defines a protocol should respond to the following methods:

  • protocol_definition

  • init

  • on_open

  • on_message

  • serialize

  • subscribe

  • broadcast

The two optional methods are:

  • on_shutdown

  • on_close

It is likely that all logic around @subscription_identifiers has nothing to do with the protocol itself and should be extracted into another class. We’ll refactor this once we start working on a new protocol.

Defined Under Namespace

Modules: COMMAND, MESSAGES, REASON, TYPE

Constant Summary collapse

HANDSHAKE_HEADERS =
{ "Sec-WebSocket-Protocol" => "actioncable-v1-json" }

Class Method Summary collapse

Class Method Details

.broadcast(name, data) ⇒ Object

Broadcast data to all clients connected to a stream.

Parameters:

  • name (String)

    the stream name

  • data (Object)

    the data to send



184
185
186
187
188
# File 'lib/rage/cable/protocol/actioncable_v1_json.rb', line 184

def self.broadcast(name, data)
  @subscription_identifiers[name].each do |params|
    ::Iodine.publish("cable:#{name}:#{Zlib.crc32(params.to_s)}", serialize(params, data))
  end
end

.init(router) ⇒ Object

This method serves as a constructor to prepare the object or set up recurring tasks (e.g. heartbeats).

Parameters:



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/rage/cable/protocol/actioncable_v1_json.rb', line 61

def self.init(router)
  @router = router

  Iodine.on_state(:on_start) do
    ping_counter = Time.now.to_i

    Iodine.run_every(3000) do
      ping_counter += 1
      Iodine.publish("cable:ping", { type: TYPE::PING, message: ping_counter }.to_json, Iodine::PubSub::PROCESS)
    end
  end

  # Hash<String(stream name) => Set<Hash>(subscription params)>
  @subscription_identifiers = Hash.new { |hash, key| hash[key] = Set.new }

  # this is a fallback to synchronize subscription identifiers across different worker processes;
  # we expect connections to be distributed among all workers, so this code will almost never be called;
  # we also synchronize subscriptions with the master process so that the forks that are spun up instead
  # of the crashed ones also had access to the identifiers;
  Iodine.subscribe("cable:synchronize") do |_, subscription_msg|
    stream_name, params = Rage::ParamsParser.json_parse(subscription_msg)
    @subscription_identifiers[stream_name] << params
  end

  Iodine.on_state(:on_finish) do
    Iodine.unsubscribe("cable:synchronize")
  end
end

.on_close(connection) ⇒ Object

Note:

This method is optional.

The method should process client disconnections and call Router#process_message.

Parameters:

See Also:



154
155
156
# File 'lib/rage/cable/protocol/actioncable_v1_json.rb', line 154

def self.on_close(connection)
  @router.process_disconnection(connection)
end

.on_message(connection, raw_data) ⇒ Object

The method processes messages from existing connections. It should parse the message, call either Router#process_subscription or Router#process_message, and handle its return value.

Parameters:

See Also:



113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/rage/cable/protocol/actioncable_v1_json.rb', line 113

def self.on_message(connection, raw_data)
  parsed_data = Rage::ParamsParser.json_parse(raw_data)

  command, identifier = parsed_data[:command], parsed_data[:identifier]
  params = Rage::ParamsParser.json_parse(identifier)

  # process subscription messages
  if command == COMMAND::SUBSCRIBE
    status = @router.process_subscription(connection, identifier, params[:channel], params)
    if status == :subscribed
      connection.write({ identifier: identifier, type: TYPE::CONFIRM }.to_json)
    elsif status == :rejected
      connection.write({ identifier: identifier, type: TYPE::REJECT }.to_json)
    elsif status == :invalid
      connection.write(MESSAGES::INVALID)
    end

    return
  end

  # process data messages;
  # plain `JSON` is used here to conform with the ActionCable API that passes `data` as a Hash with string keys;
  data = JSON.parse(parsed_data[:data])

  message_status = if command == COMMAND::MESSAGE && data.has_key?("action")
    @router.process_message(connection, identifier, data["action"].to_sym, data)

  elsif command == COMMAND::MESSAGE
    @router.process_message(connection, identifier, :receive, data)
  end

  unless message_status == :processed
    connection.write(MESSAGES::INVALID)
  end
end

.on_open(connection) ⇒ Object

The method is called any time a new WebSocket connection is established. It is expected to call Router#process_connection and handle its return value.

Parameters:

See Also:



95
96
97
98
99
100
101
102
103
104
105
# File 'lib/rage/cable/protocol/actioncable_v1_json.rb', line 95

def self.on_open(connection)
  accepted = @router.process_connection(connection)

  if accepted
    connection.subscribe("cable:ping")
    connection.write(MESSAGES::WELCOME)
  else
    connection.write(MESSAGES::UNAUTHORIZED)
    connection.close
  end
end

.protocol_definitionObject

The method defines the headers to send to the client after the handshake process.



54
55
56
# File 'lib/rage/cable/protocol/actioncable_v1_json.rb', line 54

def self.protocol_definition
  HANDSHAKE_HEADERS
end

.serialize(params, data) ⇒ Object

Serialize a Ruby object into the format the client would understand.

Parameters:

  • params (Hash)

    parameters associated with the client

  • data (Object)

    the object to serialize



162
163
164
# File 'lib/rage/cable/protocol/actioncable_v1_json.rb', line 162

def self.serialize(params, data)
  { identifier: params.to_json, message: data }.to_json
end

.subscribe(connection, name, params) ⇒ Object

Subscribe to a stream.

Parameters:

  • connection (Rage::Cable::WebSocketConnection)

    the connection object

  • name (String)

    the stream name

  • params (Hash)

    parameters associated with the client



171
172
173
174
175
176
177
178
# File 'lib/rage/cable/protocol/actioncable_v1_json.rb', line 171

def self.subscribe(connection, name, params)
  connection.subscribe("cable:#{name}:#{Zlib.crc32(params.to_s)}")

  unless @subscription_identifiers[name].include?(params)
    @subscription_identifiers[name] << params
    ::Iodine.publish("cable:synchronize", [name, params].to_json)
  end
end