/usr/share/perl5/Gearman/ResponseParser.pm is in libgearman-client-perl 1.11-3.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 | package Gearman::ResponseParser;
use strict;
# this is an abstract base class. See:
# Gearman::ResponseParser::Taskset (for Gearman::Client, the sync version), or
# Gearman::ResponseParser::Danga (for Gearman::Client::Danga, the async version)
# subclasses should call this first, then add their own data in underscore members
sub new {
my $class = shift;
my %opts = @_;
my $src = delete $opts{'source'};
die if %opts;
my $self = bless {
source => $src, # the source object/socket that is primarily feeding this.
}, $class;
$self->reset;
return $self;
}
sub source {
my $self = shift;
return $self->{source};
}
sub on_packet {
my ($self, $packet, $parser) = @_;
die "SUBCLASSES SHOULD OVERRIDE THIS";
}
sub on_error {
my ($self, $errmsg, $parser) = @_;
# NOTE: this interface will evolve.
die "SUBCLASSES SHOULD OVERRIDE THIS";
}
sub reset {
my $self = shift;
$self->{header} = '';
$self->{pkt} = undef;
}
# don't override:
# FUTURE OPTIMIZATION: let caller say "you can own this scalarref", and then we can keep it
# on the initial settin of $self->{data} and avoid copying into our own. overkill for now.
sub parse_data {
my ($self, $data) = @_; # where $data is a scalar or scalarref to parse
my $dataref = ref $data ? $data : \$data;
my $err = sub {
my $code = shift;
$self->on_error($code);
return undef;
};
while (my $lendata = length $$data) {
# read the header
my $hdr_len = length $self->{header};
unless ($hdr_len == 12) {
my $need = 12 - $hdr_len;
$self->{header} .= substr($$dataref, 0, $need, '');
next unless length $self->{header} == 12;
my ($magic, $type, $len) = unpack( "a4NN", $self->{header} );
return $err->("malformed_magic") unless $magic eq "\0RES";
my $blob = "";
$self->{pkt} = {
type => Gearman::Util::cmd_name($type),
len => $len,
blobref => \$blob,
};
next;
}
# how much data haven't we read for the current packet?
my $need = $self->{pkt}{len} - length(${ $self->{pkt}{blobref} });
# copy the MAX(need, have)
my $to_copy = $lendata > $need ? $need : $lendata;
${$self->{pkt}{blobref}} .= substr($$dataref, 0, $to_copy, '');
if ($to_copy == $need) {
$self->on_packet($self->{pkt}, $self);
$self->reset;
}
}
if (defined($self->{pkt}) && length(${ $self->{pkt}{blobref} }) == $self->{pkt}{len}) {
$self->on_packet($self->{pkt}, $self);
$self->reset;
}
}
# don't override:
sub eof {
my $self = shift;
$self->on_error("EOF");
# ERROR if in middle of packet
}
# don't override:
sub parse_sock {
my ($self, $sock) = @_; # $sock is readable, we should sysread it and feed it to $self->parse_data
my $data;
my $rv = sysread($sock, $data, 128 * 1024);
if (! defined $rv) {
$self->on_error("read_error: $!");
return;
}
# FIXME: EAGAIN , EWOULDBLOCK
if (! $rv) {
$self->eof;
return;
}
$self->parse_data(\$data);
}
1;
|