Class: Rage::Cable::Adapters::Redis

Inherits:
Base
  • Object
show all
Defined in:
lib/rage/cable/adapters/redis.rb

Constant Summary collapse

REDIS_STREAM_NAME =
"rage:cable:messages"
DEFAULT_REDIS_OPTIONS =
{ reconnect_attempts: [0.05, 0.1, 0.5] }
REDIS_MIN_VERSION_SUPPORTED =
Gem::Version.create(6)

Instance Method Summary collapse

Methods inherited from Base

#pick_a_worker

Constructor Details

#initialize(config) ⇒ Redis

Returns a new instance of Redis.



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/rage/cable/adapters/redis.rb', line 19

def initialize(config)
  @redis_stream = if (prefix = config.delete(:channel_prefix))
    "#{prefix}:#{REDIS_STREAM_NAME}"
  else
    REDIS_STREAM_NAME
  end

  @redis_config = RedisClient.config(**DEFAULT_REDIS_OPTIONS.merge(config))
  @server_uuid = SecureRandom.uuid

  redis_version = get_redis_version
  if redis_version < REDIS_MIN_VERSION_SUPPORTED
    raise "Redis adapter only supports Redis 6+. Detected Redis version: #{redis_version}."
  end

  @trimming_strategy = redis_version < Gem::Version.create("6.2.0") ? :maxlen : :minid

  pick_a_worker { poll }
end

Instance Method Details

#publish(stream_name, data) ⇒ Object



39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/rage/cable/adapters/redis.rb', line 39

def publish(stream_name, data)
  message_uuid = SecureRandom.uuid

  publish_redis.call(
    "XADD",
    @redis_stream,
    trimming_method, "~", trimming_value,
    "*",
    "1", stream_name,
    "2", data.to_json,
    "3", @server_uuid,
    "4", message_uuid
  )
end