/usr/share/perl5/Gearman/Util.pm is in libgearman-client-perl 1.11-3.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 | package Gearman::Util;
use strict;
# I: to jobserver
# O: out of job server
# W: worker
# C: client of job server
# J: jobserver
our %cmd = (
1 => [ 'I', "can_do" ], # from W: [FUNC]
23 => [ 'I', "can_do_timeout" ], # from W: FUNC[0]TIMEOUT
2 => [ 'I', "cant_do" ], # from W: [FUNC]
3 => [ 'I', "reset_abilities" ], # from W: ---
22 => [ 'I', "set_client_id" ], # W->J: [RANDOM_STRING_NO_WHITESPACE]
4 => [ 'I', "pre_sleep" ], # from W: ---
26 => [ 'I', "option_req" ], # C->J: [OPT]
27 => [ 'O', "option_res" ], # J->C: [OPT]
6 => [ 'O', "noop" ], # J->W ---
7 => [ 'I', "submit_job" ], # C->J FUNC[0]UNIQ[0]ARGS
21 => [ 'I', "submit_job_high" ], # C->J FUNC[0]UNIQ[0]ARGS
18 => [ 'I', "submit_job_bg" ], # C->J " " " " "
8 => [ 'O', "job_created" ], # J->C HANDLE
9 => [ 'I', "grab_job" ], # W->J --
10 => [ 'O', "no_job" ], # J->W --
11 => [ 'O', "job_assign" ], # J->W HANDLE[0]FUNC[0]ARG
12 => [ 'IO', "work_status" ], # W->J/C: HANDLE[0]NUMERATOR[0]DENOMINATOR
13 => [ 'IO', "work_complete" ], # W->J/C: HANDLE[0]RES
14 => [ 'IO', "work_fail" ], # W->J/C: HANDLE
25 => [ 'IO', "work_exception" ], # W->J/C: HANDLE[0]EXCEPTION
15 => [ 'I', "get_status" ], # C->J: HANDLE
20 => [ 'O', "status_res" ], # C->J: HANDLE[0]KNOWN[0]RUNNING[0]NUM[0]DENOM
16 => [ 'I', "echo_req" ], # ?->J TEXT
17 => [ 'O', "echo_res" ], # J->? TEXT
19 => [ 'O', "error" ], # J->? ERRCODE[0]ERR_TEXT
# for worker to declare to the jobserver that this worker is only connected
# to one jobserver, so no polls/grabs will take place, and server is free
# to push "job_assign" packets back down.
24 => [ 'I', "all_yours" ], # W->J ---
);
our %num; # name -> num
while (my ($num, $ary) = each %cmd) {
die if $num{$ary->[1]};
$num{$ary->[1]} = $num;
}
sub cmd_name {
my $num = shift;
my $c = $cmd{$num};
return $c ? $c->[1] : undef;
}
sub pack_req_command {
my $type_arg = shift;
my $type = $num{$type_arg} || $type_arg;
die "Bogus type arg of '$type_arg'" unless $type;
my $arg = $_[0] || '';
my $len = length($arg);
return "\0REQ" . pack("NN", $type, $len) . $arg;
}
sub pack_res_command {
my $type_arg = shift;
my $type = $num{$type_arg} || int($type_arg);
die "Bogus type arg of '$type_arg'" unless $type;
# If they didn't pass in anything to send, make it be an empty string.
$_[0] = '' unless defined $_[0];
my $len = length($_[0]);
return "\0RES" . pack("NN", $type, $len) . $_[0];
}
# returns undef on closed socket or malformed packet
sub read_res_packet {
my $sock = shift;
my $err_ref = shift;
my $buf;
my $rv;
my $err = sub {
my $code = shift;
$sock->close() if $sock->connected;
$$err_ref = $code if ref $err_ref;
return undef;
};
# read the header
$rv = sysread($sock, $buf, 12);
return $err->("read_error") unless defined $rv;
return $err->("eof") unless $rv;
return $err->("malformed_header") unless $rv == 12;
my ($magic, $type, $len) = unpack("a4NN", $buf);
return $err->("malformed_magic") unless $magic eq "\0RES";
if ($len) {
my $readlen = $len;
my $offset = 0;
my $lim = 20 + int( $len / 2**10 );
for (my $i = 0; $readlen > 0 && $i < $lim; $i++) {
# Because we know the length of the data we need to read exactly, the
# most efficient way to do this in perl is with one giant buffer, and
# an appropriate offset passed to sysread.
my $rv = sysread($sock, $buf, $readlen, $offset);
return $err->("short_body") unless $rv > 0;
last unless $rv > 0;
$readlen -= $rv;
$offset += $rv;
}
return $err->("short_body") unless length($buf) == $len;
}
$type = $cmd{$type};
return $err->("bogus_command") unless $type;
return $err->("bogus_command_type") unless index($type->[0], "O") != -1;
return {
'type' => $type->[1],
'len' => $len,
'blobref' => \$buf,
};
}
sub send_req {
my ($sock, $reqref) = @_;
return 0 unless $sock;
my $len = length($$reqref);
local $SIG{PIPE} = 'IGNORE';
my $rv = $sock->syswrite($$reqref, $len);
return 0 unless $rv == $len;
return 1;
}
# given a file descriptor number and a timeout, wait for that descriptor to
# become readable; returns 0 or 1 on if it did or not
sub wait_for_readability {
my ($fileno, $timeout) = @_;
return 0 unless $fileno && $timeout;
my $rin = '';
vec($rin, $fileno, 1) = 1;
my $nfound = select($rin, undef, undef, $timeout);
# nfound can be undef or 0, both failures, or 1, a success
return $nfound ? 1 : 0;
}
1;
|