Class: Rage::Cable::Protocol::ActioncableV1Json

Inherits:
Object
  • Object
show all
Defined in:
lib/rage/cable/protocol/actioncable_v1_json.rb

Overview

A protocol defines the structure, rules and semantics for exchanging data between the client and the server. The class that defines a protocol should respond to the following methods:

  • protocol_definition

  • init

  • on_open

  • on_message

  • serialize

  • subscribe

  • broadcast

The two optional methods are:

  • on_shutdown

  • on_close

Defined Under Namespace

Modules: COMMAND, MESSAGES, REASON, TYPE

Constant Summary collapse

HANDSHAKE_HEADERS =
{ "Sec-WebSocket-Protocol" => "actioncable-v1-json" }

Class Method Summary collapse

Class Method Details

.broadcast(name, data) ⇒ Object

Broadcast data to all clients connected to a stream.

Parameters:

  • name (String)

    the stream name

  • data (Object)

    the data to send



158
159
160
161
162
163
164
165
166
# File 'lib/rage/cable/protocol/actioncable_v1_json.rb', line 158

def self.broadcast(name, data)
  i, identifiers = 0, @subscription_identifiers[name]

  while i < identifiers.length
    params = identifiers[i]
    ::Iodine.publish("cable:#{name}:#{params.hash}", serialize(params, data))
    i += 1
  end
end

.init(router) ⇒ Object

This method serves as a constructor to prepare the object or set up recurring tasks (e.g. heartbeats).

Parameters:



55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/rage/cable/protocol/actioncable_v1_json.rb', line 55

def self.init(router)
  @router = router

  ping_counter = Time.now.to_i
  ::Iodine.run_every(3000) do
    ping_counter += 1
    ::Iodine.publish("cable:ping", { type: TYPE::PING, message: ping_counter }.to_json)
  end

  # Hash<String(stream name) => Array<Hash>(subscription params)>
  @subscription_identifiers = Hash.new { |hash, key| hash[key] = [] }
end

.on_close(connection) ⇒ Object

Note:

This method is optional.

The method should process client disconnections and call Router#process_message.

Parameters:

See Also:



132
133
134
# File 'lib/rage/cable/protocol/actioncable_v1_json.rb', line 132

def self.on_close(connection)
  @router.process_disconnection(connection)
end

.on_message(connection, raw_data) ⇒ Object

The method processes messages from existing connections. It should parse the message, call either Router#process_subscription or Router#process_message, and handle its return value.

Parameters:

See Also:



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/rage/cable/protocol/actioncable_v1_json.rb', line 91

def self.on_message(connection, raw_data)
  parsed_data = Rage::ParamsParser.json_parse(raw_data)

  command, identifier = parsed_data[:command], parsed_data[:identifier]
  params = Rage::ParamsParser.json_parse(identifier)

  # process subscription messages
  if command == COMMAND::SUBSCRIBE
    status = @router.process_subscription(connection, identifier, params[:channel], params)
    if status == :subscribed
      connection.write({ identifier: identifier, type: TYPE::CONFIRM }.to_json)
    elsif status == :rejected
      connection.write({ identifier: identifier, type: TYPE::REJECT }.to_json)
    elsif status == :invalid
      connection.write(MESSAGES::INVALID)
    end

    return
  end

  # process data messages;
  # plain `JSON` is used here to conform with the ActionCable API that passes `data` as a Hash with string keys;
  data = JSON.parse(parsed_data[:data])

  message_status = if command == COMMAND::MESSAGE && data.has_key?("action")
    @router.process_message(connection, identifier, data["action"].to_sym, data)

  elsif command == COMMAND::MESSAGE
    @router.process_message(connection, identifier, :receive, data)
  end

  unless message_status == :processed
    connection.write(MESSAGES::INVALID)
  end
end

.on_open(connection) ⇒ Object

The method is called any time a new WebSocket connection is established. It is expected to call Router#process_connection and handle its return value.

Parameters:

See Also:



73
74
75
76
77
78
79
80
81
82
83
# File 'lib/rage/cable/protocol/actioncable_v1_json.rb', line 73

def self.on_open(connection)
  accepted = @router.process_connection(connection)

  if accepted
    connection.subscribe("cable:ping")
    connection.write(MESSAGES::WELCOME)
  else
    connection.write(MESSAGES::UNAUTHORIZED)
    connection.close
  end
end

.protocol_definitionObject

The method defines the headers to send to the client after the handshake process.



48
49
50
# File 'lib/rage/cable/protocol/actioncable_v1_json.rb', line 48

def self.protocol_definition
  HANDSHAKE_HEADERS
end

.serialize(params, data) ⇒ Object

Serialize a Ruby object into the format the client would understand.

Parameters:

  • params (Hash)

    parameters associated with the client

  • data (Object)

    the object to serialize



140
141
142
# File 'lib/rage/cable/protocol/actioncable_v1_json.rb', line 140

def self.serialize(params, data)
  { identifier: params.to_json, message: data }.to_json
end

.subscribe(connection, name, params) ⇒ Object

Subscribe to a stream.

Parameters:

  • connection (Rage::Cable::WebSocketConnection)

    the connection object

  • name (String)

    the stream name

  • params (Hash)

    parameters associated with the client



149
150
151
152
# File 'lib/rage/cable/protocol/actioncable_v1_json.rb', line 149

def self.subscribe(connection, name, params)
  connection.subscribe("cable:#{name}:#{params.hash}")
  @subscription_identifiers[name] << params unless @subscription_identifiers[name].include?(params)
end