/usr/share/perl5/Mojo/Pg/PubSub.pm is in libmojo-pg-perl 4.04-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 | package Mojo::Pg::PubSub;
use Mojo::Base 'Mojo::EventEmitter';
use Mojo::JSON qw(from_json to_json);
use Scalar::Util 'weaken';
has 'pg';
sub DESTROY { Mojo::Util::_global_destruction() or shift->reset }
sub json { ++$_[0]{json}{$_[1]} and return $_[0] }
sub listen {
my ($self, $name, $cb) = @_;
$self->_db->listen($name) unless @{$self->{chans}{$name} ||= []};
push @{$self->{chans}{$name}}, $cb;
return $cb;
}
sub notify { $_[0]->_db->notify(_json(@_)) and return $_[0] }
sub reset {
my $self = shift;
delete @$self{qw(chans json pid)};
return unless my $db = delete $self->{db};
++$db->dbh->{private_mojo_no_reuse} and $db->_unwatch;
}
sub unlisten {
my ($self, $name, $cb) = @_;
my $chan = $self->{chans}{$name};
@$chan = $cb ? grep { $cb ne $_ } @$chan : ();
$self->_db->unlisten($name) and delete $self->{chans}{$name} unless @$chan;
return $self;
}
sub _db {
my $self = shift;
return $self->{db} if $self->{db};
my $db = $self->{db} = $self->pg->db;
weaken $db->{pg};
weaken $self;
$db->on(
notification => sub {
my ($db, $name, $pid, $payload) = @_;
$payload = eval { from_json $payload } if $self->{json}{$name};
for my $cb (@{$self->{chans}{$name}}) { $self->$cb($payload) }
}
);
$db->once(close => sub { $self->{pg} and $self->_db if delete $self->{db} });
$db->listen($_) for keys %{$self->{chans}}, 'mojo.pubsub';
$self->emit(reconnect => $db);
return $db;
}
sub _json { $_[1], $_[0]{json}{$_[1]} ? to_json $_[2] : $_[2] }
1;
=encoding utf8
=head1 NAME
Mojo::Pg::PubSub - Publish/Subscribe
=head1 SYNOPSIS
use Mojo::Pg::PubSub;
my $pubsub = Mojo::Pg::PubSub->new(pg => $pg);
my $cb = $pubsub->listen(foo => sub {
my ($pubsub, $payload) = @_;
say "Received: $payload";
});
$pubsub->notify(foo => 'I ♥ Mojolicious!');
$pubsub->unlisten(foo => $cb);
=head1 DESCRIPTION
L<Mojo::Pg::PubSub> is a scalable implementation of the publish/subscribe
pattern used by L<Mojo::Pg>. It is based on PostgreSQL notifications and allows
many consumers to share the same database connection, to avoid many common
scalability problems.
=head1 EVENTS
L<Mojo::Pg::PubSub> inherits all events from L<Mojo::EventEmitter> and can
emit the following new ones.
=head2 reconnect
$pubsub->on(reconnect => sub {
my ($pubsub, $db) = @_;
...
});
Emitted after switching to a new database connection for sending and receiving
notifications.
=head1 ATTRIBUTES
L<Mojo::Pg::PubSub> implements the following attributes.
=head2 pg
my $pg = $pubsub->pg;
$pubsub = $pubsub->pg(Mojo::Pg->new);
L<Mojo::Pg> object this publish/subscribe container belongs to.
=head1 METHODS
L<Mojo::Pg::PubSub> inherits all methods from L<Mojo::EventEmitter> and
implements the following new ones.
=head2 json
$pubsub = $pubsub->json('foo');
Activate automatic JSON encoding and decoding with L<Mojo::JSON/"to_json"> and
L<Mojo::JSON/"from_json"> for a channel.
# Send and receive data structures
$pubsub->json('foo')->listen(foo => sub {
my ($pubsub, $payload) = @_;
say $payload->{bar};
});
$pubsub->notify(foo => {bar => 'I ♥ Mojolicious!'});
=head2 listen
my $cb = $pubsub->listen(foo => sub {...});
Subscribe to a channel, there is no limit on how many subscribers a channel can
have. Automatic decoding of JSON text to Perl values can be activated with
L</"json">.
# Subscribe to the same channel twice
$pubsub->listen(foo => sub {
my ($pubsub, $payload) = @_;
say "One: $payload";
});
$pubsub->listen(foo => sub {
my ($pubsub, $payload) = @_;
say "Two: $payload";
});
=head2 notify
$pubsub = $pubsub->notify('foo');
$pubsub = $pubsub->notify(foo => 'I ♥ Mojolicious!');
$pubsub = $pubsub->notify(foo => {bar => 'baz'});
Notify a channel. Automatic encoding of Perl values to JSON text can be
activated with L</"json">.
=head2 reset
$pubsub->reset;
Reset all subscriptions and the database connection. This is usually done after
a new process has been forked, to prevent the child process from stealing
notifications meant for the parent process.
=head2 unlisten
$pubsub = $pubsub->unlisten('foo');
$pubsub = $pubsub->unlisten(foo => $cb);
Unsubscribe from a channel.
=head1 SEE ALSO
L<Mojo::Pg>, L<Mojolicious::Guides>, L<http://mojolicious.org>.
=cut
|