This is a default queue implementation that ships with Notifications. It just pushes events to all registered log subscribers.

This class is thread safe. All methods are reentrant.

Namespace

Module

Class

Methods

Class Public methods

new()

📝 Source code
# File activesupport/lib/active_support/notifications/fanout.rb, line 56
def initialize
  @mutex = Mutex.new
  @string_subscribers = Concurrent::Map.new { |h, k| h.compute_if_absent(k) { [] } }
  @other_subscribers = []
  @all_listeners_for = Concurrent::Map.new
  @groups_for = Concurrent::Map.new
end
🔎 See on GitHub

Instance Public methods

all_listeners_for(name)

📝 Source code
# File activesupport/lib/active_support/notifications/fanout.rb, line 319
def all_listeners_for(name)
  # this is correctly done double-checked locking (Concurrent::Map's lookups have volatile semantics)
  @all_listeners_for[name] || @mutex.synchronize do
    # use synchronisation when accessing @subscribers
    @all_listeners_for[name] ||=
      @string_subscribers[name] + @other_subscribers.select { |s| s.subscribed_to?(name) }
  end
end
🔎 See on GitHub

build_handle(name, id, payload)

📝 Source code
# File activesupport/lib/active_support/notifications/fanout.rb, line 286
def build_handle(name, id, payload)
  groups = groups_for(name).map do |group_klass, grouped_listeners|
    group_klass.new(grouped_listeners, name, id, payload)
  end

  if groups.empty?
    NullHandle
  else
    Handle.new(self, name, id, groups, payload)
  end
end
🔎 See on GitHub

finish(name, id, payload, listeners = nil)

📝 Source code
# File activesupport/lib/active_support/notifications/fanout.rb, line 305
def finish(name, id, payload, listeners = nil)
  handle_stack = IsolatedExecutionState[:_fanout_handle_stack]
  handle = handle_stack.pop
  handle.finish_with_values(name, id, payload)
end
🔎 See on GitHub

listeners_for(name)

📝 Source code
# File activesupport/lib/active_support/notifications/fanout.rb, line 328
def listeners_for(name)
  all_listeners_for(name).reject { |s| s.silenced?(name) }
end
🔎 See on GitHub

listening?(name)

📝 Source code
# File activesupport/lib/active_support/notifications/fanout.rb, line 332
def listening?(name)
  all_listeners_for(name).any? { |s| !s.silenced?(name) }
end
🔎 See on GitHub

publish(name, ...)

📝 Source code
# File activesupport/lib/active_support/notifications/fanout.rb, line 311
def publish(name, ...)
  iterate_guarding_exceptions(listeners_for(name)) { |s| s.publish(name, ...) }
end
🔎 See on GitHub

publish_event(event)

📝 Source code
# File activesupport/lib/active_support/notifications/fanout.rb, line 315
def publish_event(event)
  iterate_guarding_exceptions(listeners_for(event.name)) { |s| s.publish_event(event) }
end
🔎 See on GitHub

start(name, id, payload)

📝 Source code
# File activesupport/lib/active_support/notifications/fanout.rb, line 298
def start(name, id, payload)
  handle_stack = (IsolatedExecutionState[:_fanout_handle_stack] ||= [])
  handle = build_handle(name, id, payload)
  handle_stack << handle
  handle.start
end
🔎 See on GitHub

subscribe(pattern = nil, callable = nil, monotonic: false, &block)

📝 Source code
# File activesupport/lib/active_support/notifications/fanout.rb, line 69
def subscribe(pattern = nil, callable = nil, monotonic: false, &block)
  subscriber = Subscribers.new(pattern, callable || block, monotonic)
  @mutex.synchronize do
    case pattern
    when String
      @string_subscribers[pattern] << subscriber
      clear_cache(pattern)
    when NilClass, Regexp
      @other_subscribers << subscriber
      clear_cache
    else
      raise ArgumentError,  "pattern must be specified as a String, Regexp or empty"
    end
  end
  subscriber
end
🔎 See on GitHub

unsubscribe(subscriber_or_name)

📝 Source code
# File activesupport/lib/active_support/notifications/fanout.rb, line 86
def unsubscribe(subscriber_or_name)
  @mutex.synchronize do
    case subscriber_or_name
    when String
      @string_subscribers[subscriber_or_name].clear
      clear_cache(subscriber_or_name)
      @other_subscribers.each { |sub| sub.unsubscribe!(subscriber_or_name) }
    else
      pattern = subscriber_or_name.try(:pattern)
      if String === pattern
        @string_subscribers[pattern].delete(subscriber_or_name)
        clear_cache(pattern)
      else
        @other_subscribers.delete(subscriber_or_name)
        clear_cache
      end
    end
  end
end
🔎 See on GitHub

wait()

This is a sync queue, so there is no waiting.

📝 Source code
# File activesupport/lib/active_support/notifications/fanout.rb, line 337
def wait
end
🔎 See on GitHub

Definition files