Methods

Class Public methods

new(adapter, event_loop)

📝 Source code
# File actioncable/lib/action_cable/subscription_adapter/postgresql.rb, line 56
          def initialize(adapter, event_loop)
            super()

            @adapter = adapter
            @event_loop = event_loop
            @queue = Queue.new

            @thread = Thread.new do
              Thread.current.abort_on_exception = true
              listen
            end
          end
🔎 See on GitHub

Instance Public methods

add_channel(channel, on_success)

📝 Source code
# File actioncable/lib/action_cable/subscription_adapter/postgresql.rb, line 100
          def add_channel(channel, on_success)
            @queue.push([:listen, channel, on_success])
          end
🔎 See on GitHub

invoke_callback(*)

📝 Source code
# File actioncable/lib/action_cable/subscription_adapter/postgresql.rb, line 108
          def invoke_callback(*)
            @event_loop.post { super }
          end
🔎 See on GitHub

listen()

📝 Source code
# File actioncable/lib/action_cable/subscription_adapter/postgresql.rb, line 69
          def listen
            @adapter.with_connection do |pg_conn|
              catch :shutdown do
                loop do
                  until @queue.empty?
                    action, channel, callback = @queue.pop(true)

                    case action
                    when :listen
                      pg_conn.exec("LISTEN #{pg_conn.escape_identifier channel}")
                      @event_loop.post(&callback) if callback
                    when :unlisten
                      pg_conn.exec("UNLISTEN #{pg_conn.escape_identifier channel}")
                    when :shutdown
                      throw :shutdown
                    end
                  end

                  pg_conn.wait_for_notify(1) do |chan, pid, message|
                    broadcast(chan, message)
                  end
                end
              end
            end
          end
🔎 See on GitHub

remove_channel(channel)

📝 Source code
# File actioncable/lib/action_cable/subscription_adapter/postgresql.rb, line 104
          def remove_channel(channel)
            @queue.push([:unlisten, channel])
          end
🔎 See on GitHub

shutdown()

📝 Source code
# File actioncable/lib/action_cable/subscription_adapter/postgresql.rb, line 95
          def shutdown
            @queue.push([:shutdown])
            Thread.pass while @thread.alive?
          end
🔎 See on GitHub