/usr/lib/ruby/vendor_ruby/celluloid/mailbox.rb is in ruby-celluloid 0.16.0-4.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 | require 'thread'
module Celluloid
class MailboxDead < Celluloid::Error; end # you can't receive from the dead
class MailboxShutdown < Celluloid::Error; end # raised if the mailbox can no longer be used
# Actors communicate with asynchronous messages. Messages are buffered in
# Mailboxes until Actors can act upon them.
class Mailbox
include Enumerable
# A unique address at which this mailbox can be found
attr_reader :address
attr_accessor :max_size
def initialize
@address = Celluloid.uuid
@messages = []
@mutex = Mutex.new
@dead = false
@condition = ConditionVariable.new
@max_size = nil
end
# Add a message to the Mailbox
def <<(message)
@mutex.lock
begin
if mailbox_full || @dead
dead_letter(message)
return
end
if message.is_a?(SystemEvent)
# SystemEvents are high priority messages so they get added to the
# head of our message queue instead of the end
@messages.unshift message
else
@messages << message
end
@condition.signal
nil
ensure
@mutex.unlock rescue nil
end
end
# Receive a message from the Mailbox. May return nil and may return before
# the specified timeout.
def check(timeout = nil, &block)
message = nil
@mutex.lock
begin
raise MailboxDead, "attempted to receive from a dead mailbox" if @dead
Timers::Wait.for(timeout) do |remaining|
message = next_message(&block)
break message if message
@condition.wait(@mutex, remaining)
end
ensure
@mutex.unlock rescue nil
end
return message
end
# Receive a letter from the mailbox. Guaranteed to return a message. If
# timeout is exceeded, raise a TimeoutError.
def receive(timeout = nil, &block)
Timers::Wait.for(timeout) do |remaining|
if message = check(timeout, &block)
return message
end
end
raise TimeoutError.new("receive timeout exceeded")
end
# Shut down this mailbox and clean up its contents
def shutdown
raise MailboxDead, "mailbox already shutdown" if @dead
@mutex.lock
begin
yield if block_given?
messages = @messages
@messages = []
@dead = true
ensure
@mutex.unlock rescue nil
end
messages.each do |msg|
dead_letter msg
msg.cleanup if msg.respond_to? :cleanup
end
true
end
# Is the mailbox alive?
def alive?
!@dead
end
# Cast to an array
def to_a
@mutex.synchronize { @messages.dup }
end
# Iterate through the mailbox
def each(&block)
to_a.each(&block)
end
# Inspect the contents of the Mailbox
def inspect
"#<#{self.class}:#{object_id.to_s(16)} @messages=[#{map { |m| m.inspect }.join(', ')}]>"
end
# Number of messages in the Mailbox
def size
@mutex.synchronize { @messages.size }
end
private
# Retrieve the next message in the mailbox
def next_message
message = nil
if block_given?
index = @messages.index do |msg|
yield(msg) || msg.is_a?(SystemEvent)
end
message = @messages.slice!(index, 1).first if index
else
message = @messages.shift
end
message
end
def dead_letter(message)
Logger.debug "Discarded message (mailbox is dead): #{message}" if $CELLULOID_DEBUG
end
def mailbox_full
@max_size && @messages.size >= @max_size
end
end
end
|