Methods
Class Public methods
new(adapter, event_loop)
📝 Source code
# File actioncable/lib/action_cable/subscription_adapter/redis.rb, line 61
def initialize(adapter, event_loop)
super()
@adapter = adapter
@event_loop = event_loop
@subscribe_callbacks = Hash.new { |h, k| h[k] = [] }
@subscription_lock = Mutex.new
@raw_client = nil
@when_connected = []
@thread = nil
end
🔎 See on GitHub
Instance Public methods
add_channel(channel, on_success)
📝 Source code
# File actioncable/lib/action_cable/subscription_adapter/redis.rb, line 128
def add_channel(channel, on_success)
@subscription_lock.synchronize do
ensure_listener_running
@subscribe_callbacks[channel] << on_success
when_connected { send_command("subscribe", channel) }
end
end
🔎 See on GitHub
invoke_callback(*)
📝 Source code
# File actioncable/lib/action_cable/subscription_adapter/redis.rb, line 142
def invoke_callback(*)
@event_loop.post { super }
end
🔎 See on GitHub
listen(conn)
📝 Source code
# File actioncable/lib/action_cable/subscription_adapter/redis.rb, line 77
def listen(conn)
conn.without_reconnect do
original_client = conn.respond_to?(:_client) ? conn._client : conn.client
conn.subscribe("_action_cable_internal") do |on|
on.subscribe do |chan, count|
@subscription_lock.synchronize do
if count == 1
@raw_client = original_client
until @when_connected.empty?
@when_connected.shift.call
end
end
if callbacks = @subscribe_callbacks[chan]
next_callback = callbacks.shift
@event_loop.post(&next_callback) if next_callback
@subscribe_callbacks.delete(chan) if callbacks.empty?
end
end
end
on.message do |chan, message|
broadcast(chan, message)
end
on.unsubscribe do |chan, count|
if count == 0
@subscription_lock.synchronize do
@raw_client = nil
end
end
end
end
end
end
🔎 See on GitHub
remove_channel(channel)
📝 Source code
# File actioncable/lib/action_cable/subscription_adapter/redis.rb, line 136
def remove_channel(channel)
@subscription_lock.synchronize do
when_connected { send_command("unsubscribe", channel) }
end
end
🔎 See on GitHub
shutdown()
📝 Source code
# File actioncable/lib/action_cable/subscription_adapter/redis.rb, line 115
def shutdown
@subscription_lock.synchronize do
return if @thread.nil?
when_connected do
send_command("unsubscribe")
@raw_client = nil
end
end
Thread.pass while @thread.alive?
end
🔎 See on GitHub