Namespace

Class

Methods

Constants

ConnectionError = ::Redis::BaseConnectionError

Class Public methods

new(adapter, config_options, event_loop)

📝 Source code
# File actioncable/lib/action_cable/subscription_adapter/redis.rb, line 68
          def initialize(adapter, config_options, event_loop)
            super()

            @adapter = adapter
            @event_loop = event_loop

            @subscribe_callbacks = Hash.new { |h, k| h[k] = [] }
            @subscription_lock = Mutex.new

            @reconnect_attempt = 0
            # Use the same config as used by Redis conn
            @reconnect_attempts = config_options.fetch(:reconnect_attempts, 1)
            @reconnect_attempts = Array.new(@reconnect_attempts, 0) if @reconnect_attempts.is_a?(Integer)

            @subscribed_client = nil

            @when_connected = []

            @thread = nil
          end
🔎 See on GitHub

Instance Public methods

add_channel(channel, on_success)

📝 Source code
# File actioncable/lib/action_cable/subscription_adapter/redis.rb, line 141
          def add_channel(channel, on_success)
            @subscription_lock.synchronize do
              ensure_listener_running
              @subscribe_callbacks[channel] << on_success
              when_connected { @subscribed_client.subscribe(channel) }
            end
          end
🔎 See on GitHub

invoke_callback(*)

📝 Source code
# File actioncable/lib/action_cable/subscription_adapter/redis.rb, line 155
          def invoke_callback(*)
            @event_loop.post { super }
          end
🔎 See on GitHub

listen(conn)

📝 Source code
# File actioncable/lib/action_cable/subscription_adapter/redis.rb, line 89
          def listen(conn)
            conn.without_reconnect do
              original_client = extract_subscribed_client(conn)

              conn.subscribe("_action_cable_internal") do |on|
                on.subscribe do |chan, count|
                  @subscription_lock.synchronize do
                    if count == 1
                      @reconnect_attempt = 0
                      @subscribed_client = original_client

                      until @when_connected.empty?
                        @when_connected.shift.call
                      end
                    end

                    if callbacks = @subscribe_callbacks[chan]
                      next_callback = callbacks.shift
                      @event_loop.post(&next_callback) if next_callback
                      @subscribe_callbacks.delete(chan) if callbacks.empty?
                    end
                  end
                end

                on.message do |chan, message|
                  broadcast(chan, message)
                end

                on.unsubscribe do |chan, count|
                  if count == 0
                    @subscription_lock.synchronize do
                      @subscribed_client = nil
                    end
                  end
                end
              end
            end
          end
🔎 See on GitHub

remove_channel(channel)

📝 Source code
# File actioncable/lib/action_cable/subscription_adapter/redis.rb, line 149
          def remove_channel(channel)
            @subscription_lock.synchronize do
              when_connected { @subscribed_client.unsubscribe(channel) }
            end
          end
🔎 See on GitHub

shutdown()

📝 Source code
# File actioncable/lib/action_cable/subscription_adapter/redis.rb, line 128
          def shutdown
            @subscription_lock.synchronize do
              return if @thread.nil?

              when_connected do
                @subscribed_client.unsubscribe
                @subscribed_client = nil
              end
            end

            Thread.pass while @thread.alive?
          end
🔎 See on GitHub