This file is indexed.

/usr/lib/ruby/vendor_ruby/concurrent/executor/timer_set.rb is in ruby-concurrent 1.0.0-3.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
require 'concurrent/scheduled_task'
require 'concurrent/atomic/event'
require 'concurrent/collection/non_concurrent_priority_queue'
require 'concurrent/executor/executor_service'
require 'concurrent/executor/single_thread_executor'

module Concurrent

  autoload :Options, 'concurrent/options'

  # Executes a collection of tasks, each after a given delay. A master task
  # monitors the set and schedules each task for execution at the appropriate
  # time. Tasks are run on the global thread pool or on the supplied executor.
  # Each task is represented as a `ScheduledTask`.
  #
  # @see Concurrent::ScheduledTask
  #
  # @!macro monotonic_clock_warning
  class TimerSet < RubyExecutorService

    # Create a new set of timed tasks.
    #
    # @!macro [attach] executor_options
    #  
    #   @param [Hash] opts the options used to specify the executor on which to perform actions
    #   @option opts [Executor] :executor when set use the given `Executor` instance.
    #     Three special values are also supported: `:task` returns the global task pool,
    #     `:operation` returns the global operation pool, and `:immediate` returns a new
    #     `ImmediateExecutor` object.
    def initialize(opts = {})
      super(opts)
    end

    # Post a task to be execute run after a given delay (in seconds). If the
    # delay is less than 1/100th of a second the task will be immediately post
    # to the executor.
    #
    # @param [Float] delay the number of seconds to wait for before executing the task.
    # @param [Array<Object>] args the arguments passed to the task on execution.
    #
    # @yield the task to be performed.
    #
    # @return [Concurrent::ScheduledTask, false] IVar representing the task if the post
    #   is successful; false after shutdown.
    #
    # @raise [ArgumentError] if the intended execution time is not in the future.
    # @raise [ArgumentError] if no block is given.
    def post(delay, *args, &task)
      raise ArgumentError.new('no block given') unless block_given?
      return false unless running?
      opts = {
        executor: @task_executor,
        args: args,
        timer_set: self
      }
      task = ScheduledTask.execute(delay, opts, &task) # may raise exception
      task.unscheduled? ? false : task
    end

    # Begin an immediate shutdown. In-progress tasks will be allowed to
    # complete but enqueued tasks will be dismissed and no new tasks
    # will be accepted. Has no additional effect if the thread pool is
    # not running.
    def kill
      shutdown
    end

    private :<<

    private

    # Initialize the object.
    #
    # @param [Hash] opts the options to create the object with.
    # @!visibility private
    def ns_initialize(opts)
      @queue          = Collection::NonConcurrentPriorityQueue.new(order: :min)
      @task_executor  = Options.executor_from_options(opts) || Concurrent.global_io_executor
      @timer_executor = SingleThreadExecutor.new
      @condition      = Event.new
      self.auto_terminate = opts.fetch(:auto_terminate, true)
    end

    # Post the task to the internal queue.
    #
    # @note This is intended as a callback method from ScheduledTask
    #   only. It is not intended to be used directly. Post a task
    #   by using the `SchedulesTask#execute` method.
    #
    # @!visibility private
    def post_task(task)
      synchronize{ ns_post_task(task) }
    end

    # @!visibility private
    def ns_post_task(task)
      return false unless ns_running?
      if (task.initial_delay) <= 0.01
        task.executor.post{ task.process_task }
      else
        @queue.push(task)
        # only post the process method when the queue is empty
        @timer_executor.post(&method(:process_tasks)) if @queue.length == 1
        @condition.set
      end
      true
    end

    # Remove the given task from the queue.
    #
    # @note This is intended as a callback method from `ScheduledTask`
    #   only. It is not intended to be used directly. Cancel a task
    #   by using the `ScheduledTask#cancel` method.
    #
    # @!visibility private
    def remove_task(task)
      synchronize{ @queue.delete(task) }
    end

    # `ExecutorService` callback called during shutdown.
    #
    # @!visibility private
    def ns_shutdown_execution
      @queue.clear
      @timer_executor.kill
      stopped_event.set
    end

    # Run a loop and execute tasks in the scheduled order and at the approximate
    # scheduled time. If no tasks remain the thread will exit gracefully so that
    # garbage collection can occur. If there are no ready tasks it will sleep
    # for up to 60 seconds waiting for the next scheduled task.
    #
    # @!visibility private
    def process_tasks
      loop do
        task = synchronize { @condition.reset; @queue.peek }
        break unless task

        now = Concurrent.monotonic_time
        diff = task.schedule_time - now

        if diff <= 0
          # We need to remove the task from the queue before passing
          # it to the executor, to avoid race conditions where we pass
          # the peek'ed task to the executor and then pop a different
          # one that's been added in the meantime.
          #
          # Note that there's no race condition between the peek and
          # this pop - this pop could retrieve a different task from
          # the peek, but that task would be due to fire now anyway
          # (because @queue is a priority queue, and this thread is
          # the only reader, so whatever timer is at the head of the
          # queue now must have the same pop time, or a closer one, as
          # when we peeked).
          task = synchronize { @queue.pop }
          task.executor.post{ task.process_task }
        else
          @condition.wait([diff, 60].min)
        end
      end
    end
  end
end