This is a default queue implementation that ships with Notifications. It just pushes events to all registered log subscribers.

This class is thread safe. All methods are reentrant.

Namespace

Module

Class

Methods

Class Public methods

new()

📝 Source code
# File activesupport/lib/active_support/notifications/fanout.rb, line 50
      def initialize
        @mutex = Mutex.new
        @string_subscribers = Concurrent::Map.new { |h, k| h.compute_if_absent(k) { [] } }
        @other_subscribers = []
        @all_listeners_for = Concurrent::Map.new
        @groups_for = Concurrent::Map.new
        @silenceable_groups_for = Concurrent::Map.new
      end
🔎 See on GitHub

Instance Public methods

all_listeners_for(name)

📝 Source code
# File activesupport/lib/active_support/notifications/fanout.rb, line 297
      def all_listeners_for(name)
        # this is correctly done double-checked locking (Concurrent::Map's lookups have volatile semantics)
        @all_listeners_for[name] || @mutex.synchronize do
          # use synchronisation when accessing @subscribers
          @all_listeners_for[name] ||=
            @string_subscribers[name] + @other_subscribers.select { |s| s.subscribed_to?(name) }
        end
      end
🔎 See on GitHub

build_handle(name, id, payload)

📝 Source code
# File activesupport/lib/active_support/notifications/fanout.rb, line 272
      def build_handle(name, id, payload)
        Handle.new(self, name, id, payload)
      end
🔎 See on GitHub

finish(name, id, payload, listeners = nil)

📝 Source code
# File activesupport/lib/active_support/notifications/fanout.rb, line 283
      def finish(name, id, payload, listeners = nil)
        handle_stack = IsolatedExecutionState[:_fanout_handle_stack]
        handle = handle_stack.pop
        handle.finish_with_values(name, id, payload)
      end
🔎 See on GitHub

listeners_for(name)

📝 Source code
# File activesupport/lib/active_support/notifications/fanout.rb, line 306
      def listeners_for(name)
        all_listeners_for(name).reject { |s| s.silenced?(name) }
      end
🔎 See on GitHub

listening?(name)

📝 Source code
# File activesupport/lib/active_support/notifications/fanout.rb, line 310
      def listening?(name)
        all_listeners_for(name).any? { |s| !s.silenced?(name) }
      end
🔎 See on GitHub

publish(name, *args)

📝 Source code
# File activesupport/lib/active_support/notifications/fanout.rb, line 289
      def publish(name, *args)
        iterate_guarding_exceptions(listeners_for(name)) { |s| s.publish(name, *args) }
      end
🔎 See on GitHub

publish_event(event)

📝 Source code
# File activesupport/lib/active_support/notifications/fanout.rb, line 293
      def publish_event(event)
        iterate_guarding_exceptions(listeners_for(event.name)) { |s| s.publish_event(event) }
      end
🔎 See on GitHub

start(name, id, payload)

📝 Source code
# File activesupport/lib/active_support/notifications/fanout.rb, line 276
      def start(name, id, payload)
        handle_stack = (IsolatedExecutionState[:_fanout_handle_stack] ||= [])
        handle = build_handle(name, id, payload)
        handle_stack << handle
        handle.start
      end
🔎 See on GitHub

subscribe(pattern = nil, callable = nil, monotonic: false, &block)

📝 Source code
# File activesupport/lib/active_support/notifications/fanout.rb, line 64
      def subscribe(pattern = nil, callable = nil, monotonic: false, &block)
        subscriber = Subscribers.new(pattern, callable || block, monotonic)
        @mutex.synchronize do
          case pattern
          when String
            @string_subscribers[pattern] << subscriber
            clear_cache(pattern)
          when NilClass, Regexp
            @other_subscribers << subscriber
            clear_cache
          else
            raise ArgumentError,  "pattern must be specified as a String, Regexp or empty"
          end
        end
        subscriber
      end
🔎 See on GitHub

unsubscribe(subscriber_or_name)

📝 Source code
# File activesupport/lib/active_support/notifications/fanout.rb, line 81
      def unsubscribe(subscriber_or_name)
        @mutex.synchronize do
          case subscriber_or_name
          when String
            @string_subscribers[subscriber_or_name].clear
            clear_cache(subscriber_or_name)
            @other_subscribers.each { |sub| sub.unsubscribe!(subscriber_or_name) }
          else
            pattern = subscriber_or_name.try(:pattern)
            if String === pattern
              @string_subscribers[pattern].delete(subscriber_or_name)
              clear_cache(pattern)
            else
              @other_subscribers.delete(subscriber_or_name)
              clear_cache
            end
          end
        end
      end
🔎 See on GitHub

wait()

This is a sync queue, so there is no waiting.

📝 Source code
# File activesupport/lib/active_support/notifications/fanout.rb, line 315
      def wait
      end
🔎 See on GitHub