Methods

Class Public methods

new()

📝 Source code
# File actioncable/lib/action_cable/connection/stream_event_loop.rb, line 10
      def initialize
        @nio = @executor = @thread = nil
        @map = {}
        @stopping = false
        @todo = Queue.new

        @spawn_mutex = Mutex.new
      end
🔎 See on GitHub

Instance Public methods

attach(io, stream)

📝 Source code
# File actioncable/lib/action_cable/connection/stream_event_loop.rb, line 30
      def attach(io, stream)
        @todo << lambda do
          @map[io] = @nio.register(io, :r)
          @map[io].value = stream
        end
        wakeup
      end
🔎 See on GitHub

detach(io, stream)

📝 Source code
# File actioncable/lib/action_cable/connection/stream_event_loop.rb, line 38
      def detach(io, stream)
        @todo << lambda do
          @nio.deregister io
          @map.delete io
          io.close
        end
        wakeup
      end
🔎 See on GitHub

post(task = nil, &block)

📝 Source code
# File actioncable/lib/action_cable/connection/stream_event_loop.rb, line 23
      def post(task = nil, &block)
        task ||= block

        spawn
        @executor << task
      end
🔎 See on GitHub

stop()

📝 Source code
# File actioncable/lib/action_cable/connection/stream_event_loop.rb, line 56
      def stop
        @stopping = true
        wakeup if @nio
      end
🔎 See on GitHub

timer(interval, &block)

📝 Source code
# File actioncable/lib/action_cable/connection/stream_event_loop.rb, line 19
      def timer(interval, &block)
        Concurrent::TimerTask.new(execution_interval: interval, &block).tap(&:execute)
      end
🔎 See on GitHub

writes_pending(io)

📝 Source code
# File actioncable/lib/action_cable/connection/stream_event_loop.rb, line 47
      def writes_pending(io)
        @todo << lambda do
          if monitor = @map[io]
            monitor.interests = :rw
          end
        end
        wakeup
      end
🔎 See on GitHub