/usr/share/perl5/AnyEvent/RabbitMQ/LocalQueue.pm is in libanyevent-rabbitmq-perl 1.17~dfsg-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 | package AnyEvent::RabbitMQ::LocalQueue;
use strict;
use warnings;
our $VERSION = '1.16';
sub new {
my $class = shift;
return bless {
_message_queue => [],
_drain_code_queue => [],
}, $class;
}
sub push {
my $self = shift;
CORE::push @{$self->{_message_queue}}, @_;
return $self->_drain_queue();
}
sub get {
my $self = shift;
CORE::push @{$self->{_drain_code_queue}}, @_;
return $self->_drain_queue();
}
sub _drain_queue {
my $self = shift;
my $message_count = scalar @{$self->{_message_queue}};
my $drain_code_count = scalar @{$self->{_drain_code_queue}};
my $count = $message_count < $drain_code_count
? $message_count : $drain_code_count;
for (1 .. $count) {
&{shift @{$self->{_drain_code_queue}}}(
shift @{$self->{_message_queue}}
);
}
return $self;
}
sub _flush {
my ($self, $frame) = @_;
$self->_drain_queue;
while (my $cb = shift @{$self->{_drain_code_queue}}) {
local $@; # Flush frames immediately, throwing away errors for on-close
eval { $cb->($frame) };
}
}
1;
|