/usr/share/perl5/AnyEvent/Memcached/Peer.pm is in libanyevent-memcached-perl 0.06-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 | package #hide
AnyEvent::Memcached::Peer;
use common::sense 2;m{
use strict;
use warnings;
}x;
use base 'AnyEvent::Connection';
use Carp;
use AnyEvent::Connection::Util;
use Scalar::Util qw(weaken);
#use Devel::Leak::Cb;
sub DEBUG () { 0 }
use AnyEvent::Memcached::Conn;
sub new {
my $self = shift->SUPER::new(
rawcon => 'AnyEvent::Memcached::Conn',
reconnect => 1,
@_,
);
$self->{waitingcb} = {};
$self;
}
sub connect {
my $self = shift;
$self->{connecting} and return;
$self->{grd}{con} = $self->reg_cb( connected => sub { $self->{failed} = 0; } );
$self->{grd}{cfl} = $self->reg_cb( connfail => sub { $self->{failed} = 1; } );
$self->{grd}{dis} = $self->reg_cb( disconnect => sub {
shift;shift;
%$self or return;
warn "Peer $self->{host}:$self->{port} disconnected".(@_ ? ": @_" : '')."\n" if $self->{debug};
my $e = @_ ? "@_" : "disconnected";
for ( keys %{$self->{waitingcb}} ) {
if ($self->{waitingcb}{$_}) {
#warn "Cleanup: ",::sub_fullname( $self->{waitingcb}{$_} );
$self->{waitingcb}{$_}(undef,$e);
}
delete $self->{waitingcb}{$_};
}
} );
$self->SUPER::connect(@_);
return;
}
sub conntrack {
my $self = shift;
my ($method,$args,$cb) = @_;
if($self->{connecting} and $self->{failed}) {
warn "Is connecting, have fails => not connected" if DEBUG;
$cb and $cb->(undef, "Not connected");
return;
}
elsif (!$self->{connected}) {
my @args = @$args; # copy to avoid rewriting
warn time()." Not connected, do connect for ".\@args.", ".dumper($args[0]) if DEBUG;
my ($c,$t);
weaken( $self->{waitingcb}{int $cb} = $cb ) if $cb;
weaken( $self );
# This rely on correct event invocation order of Object::Event.
# If this could change, I'll add own queue
$c = $self->reg_cb(
connected => sub {
shift->unreg_me;
#$c or return;
warn "connected cb for ".\@args.", ".dumper($args[0]) if DEBUG;
undef $c;undef $t;
$self or return;
delete $self->{waitingcb}{int $cb} if $cb;
return $self->{con}->$method(@args);
},
);
$t = AnyEvent->timer(
after => $self->{timeout},# + 0.05, # Since there are timers inside connect, we need to delay a bit longer
cb => sub {
#$t or return;
warn time()." timeout $self->{timeout} cb for $args->[0]" if DEBUG;
undef $c;undef $t;
$self or return;
if ($cb){
$self->{waitingcb}{int $cb};
$cb->(undef, "Connect timeout");
}
},
);
$self->connect();
}
else {
Carp::cluck "How do I get here?";
return $self->{con}->$method(@$args);
}
}
sub command {
my $self = shift;
if ($self->{connected}) {
return $self->{con}->command( @_ );
}
else {
my ($cmd,%args) = @_;
$self->conntrack( command => \@_, $args{cb} );
}
}
sub request {
my $self = shift;
if ($self->{connected}) {
return $self->{con}->say(@_);
}
else {
# no cb
$self->conntrack( say => \@_ );
}
}
sub reader {
my $self = shift;
if ($self->{connected}) {
return $self->{con}->reader(@_);
}
else {
my %args = @_;
$self->conntrack( reader => \@_, $args{cb} );
}
}
sub want_command {
my $self = shift;
warn "wanting command";
if ($self->{connected}) {
return $self->{con}->want_command(@_);
}
else {
my %args = @_;
$self->conntrack( want_command => \@_ );
}
}
1;
|