This is a default queue implementation that ships with Notifications. It just pushes events to all registered log subscribers.

This class is thread safe. All methods are reentrant.

Namespace

Module

Class

Methods

Class Public methods

new()

📝 Source code
# File activesupport/lib/active_support/notifications/fanout.rb, line 51
      def initialize
        @mutex = Mutex.new
        @string_subscribers = Concurrent::Map.new { |h, k| h.compute_if_absent(k) { [] } }
        @other_subscribers = []
        @all_listeners_for = Concurrent::Map.new
        @groups_for = Concurrent::Map.new
        @silenceable_groups_for = Concurrent::Map.new
      end
🔎 See on GitHub

Instance Public methods

all_listeners_for(name)

📝 Source code
# File activesupport/lib/active_support/notifications/fanout.rb, line 298
      def all_listeners_for(name)
        # this is correctly done double-checked locking (Concurrent::Map's lookups have volatile semantics)
        @all_listeners_for[name] || @mutex.synchronize do
          # use synchronisation when accessing @subscribers
          @all_listeners_for[name] ||=
            @string_subscribers[name] + @other_subscribers.select { |s| s.subscribed_to?(name) }
        end
      end
🔎 See on GitHub

build_handle(name, id, payload)

📝 Source code
# File activesupport/lib/active_support/notifications/fanout.rb, line 273
      def build_handle(name, id, payload)
        Handle.new(self, name, id, payload)
      end
🔎 See on GitHub

finish(name, id, payload, listeners = nil)

📝 Source code
# File activesupport/lib/active_support/notifications/fanout.rb, line 284
      def finish(name, id, payload, listeners = nil)
        handle_stack = IsolatedExecutionState[:_fanout_handle_stack]
        handle = handle_stack.pop
        handle.finish_with_values(name, id, payload)
      end
🔎 See on GitHub

listeners_for(name)

📝 Source code
# File activesupport/lib/active_support/notifications/fanout.rb, line 307
      def listeners_for(name)
        all_listeners_for(name).reject { |s| s.silenced?(name) }
      end
🔎 See on GitHub

listening?(name)

📝 Source code
# File activesupport/lib/active_support/notifications/fanout.rb, line 311
      def listening?(name)
        all_listeners_for(name).any? { |s| !s.silenced?(name) }
      end
🔎 See on GitHub

publish(name, *args)

📝 Source code
# File activesupport/lib/active_support/notifications/fanout.rb, line 290
      def publish(name, *args)
        iterate_guarding_exceptions(listeners_for(name)) { |s| s.publish(name, *args) }
      end
🔎 See on GitHub

publish_event(event)

📝 Source code
# File activesupport/lib/active_support/notifications/fanout.rb, line 294
      def publish_event(event)
        iterate_guarding_exceptions(listeners_for(event.name)) { |s| s.publish_event(event) }
      end
🔎 See on GitHub

start(name, id, payload)

📝 Source code
# File activesupport/lib/active_support/notifications/fanout.rb, line 277
      def start(name, id, payload)
        handle_stack = (IsolatedExecutionState[:_fanout_handle_stack] ||= [])
        handle = build_handle(name, id, payload)
        handle_stack << handle
        handle.start
      end
🔎 See on GitHub

subscribe(pattern = nil, callable = nil, monotonic: false, &block)

📝 Source code
# File activesupport/lib/active_support/notifications/fanout.rb, line 65
      def subscribe(pattern = nil, callable = nil, monotonic: false, &block)
        subscriber = Subscribers.new(pattern, callable || block, monotonic)
        @mutex.synchronize do
          case pattern
          when String
            @string_subscribers[pattern] << subscriber
            clear_cache(pattern)
          when NilClass, Regexp
            @other_subscribers << subscriber
            clear_cache
          else
            raise ArgumentError,  "pattern must be specified as a String, Regexp or empty"
          end
        end
        subscriber
      end
🔎 See on GitHub

unsubscribe(subscriber_or_name)

📝 Source code
# File activesupport/lib/active_support/notifications/fanout.rb, line 82
      def unsubscribe(subscriber_or_name)
        @mutex.synchronize do
          case subscriber_or_name
          when String
            @string_subscribers[subscriber_or_name].clear
            clear_cache(subscriber_or_name)
            @other_subscribers.each { |sub| sub.unsubscribe!(subscriber_or_name) }
          else
            pattern = subscriber_or_name.try(:pattern)
            if String === pattern
              @string_subscribers[pattern].delete(subscriber_or_name)
              clear_cache(pattern)
            else
              @other_subscribers.delete(subscriber_or_name)
              clear_cache
            end
          end
        end
      end
🔎 See on GitHub

wait()

This is a sync queue, so there is no waiting.

📝 Source code
# File activesupport/lib/active_support/notifications/fanout.rb, line 316
      def wait
      end
🔎 See on GitHub