Methods
Class Public methods
new()
Source:
# File actioncable/lib/action_cable/connection/stream_event_loop.rb, line 9
def initialize
@nio = @executor = @thread = nil
@map = {}
@stopping = false
@todo = Queue.new
@spawn_mutex = Mutex.new
end
Instance Public methods
attach(io, stream)
Source:
# File actioncable/lib/action_cable/connection/stream_event_loop.rb, line 29
def attach(io, stream)
@todo << lambda do
@map[io] = @nio.register(io, :r)
@map[io].value = stream
end
wakeup
end
detach(io, stream)
Source:
# File actioncable/lib/action_cable/connection/stream_event_loop.rb, line 37
def detach(io, stream)
@todo << lambda do
@nio.deregister io
@map.delete io
io.close
end
wakeup
end
post(task = nil, &block)
Source:
# File actioncable/lib/action_cable/connection/stream_event_loop.rb, line 22
def post(task = nil, &block)
task ||= block
spawn
@executor << task
end
stop()
Source:
# File actioncable/lib/action_cable/connection/stream_event_loop.rb, line 55
def stop
@stopping = true
wakeup if @nio
end
timer(interval, &block)
Source:
# File actioncable/lib/action_cable/connection/stream_event_loop.rb, line 18
def timer(interval, &block)
Concurrent::TimerTask.new(execution_interval: interval, &block).tap(&:execute)
end
writes_pending(io)
Source:
# File actioncable/lib/action_cable/connection/stream_event_loop.rb, line 46
def writes_pending(io)
@todo << lambda do
if monitor = @map[io]
monitor.interests = :rw
end
end
wakeup
end