Methods
Class Public methods
new()
📝 Source code
# File actioncable/lib/action_cable/connection/stream_event_loop.rb, line 9
def initialize
@nio = @executor = @thread = nil
@map = {}
@stopping = false
@todo = Queue.new
@spawn_mutex = Mutex.new
end
🔎 See on GitHub
Instance Public methods
attach(io, stream)
📝 Source code
# File actioncable/lib/action_cable/connection/stream_event_loop.rb, line 29
def attach(io, stream)
@todo << lambda do
@map[io] = @nio.register(io, :r)
@map[io].value = stream
end
wakeup
end
🔎 See on GitHub
detach(io, stream)
📝 Source code
# File actioncable/lib/action_cable/connection/stream_event_loop.rb, line 37
def detach(io, stream)
@todo << lambda do
@nio.deregister io
@map.delete io
io.close
end
wakeup
end
🔎 See on GitHub
post(task = nil, &block)
📝 Source code
# File actioncable/lib/action_cable/connection/stream_event_loop.rb, line 22
def post(task = nil, &block)
task ||= block
spawn
@executor << task
end
🔎 See on GitHub
stop()
📝 Source code
# File actioncable/lib/action_cable/connection/stream_event_loop.rb, line 55
def stop
@stopping = true
wakeup if @nio
end
🔎 See on GitHub
timer(interval, &block)
📝 Source code
# File actioncable/lib/action_cable/connection/stream_event_loop.rb, line 18
def timer(interval, &block)
Concurrent::TimerTask.new(execution_interval: interval, &block).tap(&:execute)
end
🔎 See on GitHub
writes_pending(io)
📝 Source code
# File actioncable/lib/action_cable/connection/stream_event_loop.rb, line 46
def writes_pending(io)
@todo << lambda do
if monitor = @map[io]
monitor.interests = :rw
end
end
wakeup
end
🔎 See on GitHub