/usr/share/php/Predis/PubSub/DispatcherLoop.php is in libphp-predis 0.8.3-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 | <?php
/*
* This file is part of the Predis package.
*
* (c) Daniele Alessandri <suppakilla@gmail.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Predis\PubSub;
use Predis\ClientInterface;
/**
* Method-dispatcher loop built around the client-side abstraction of a Redis
* Publish / Subscribe context.
*
* @author Daniele Alessandri <suppakilla@gmail.com>
*/
class DispatcherLoop
{
private $client;
private $pubSubContext;
private $callbacks;
private $defaultCallback;
private $subscriptionCallback;
/**
* @param ClientInterface Client instance used by the context.
*/
public function __construct(ClientInterface $client)
{
$this->callbacks = array();
$this->client = $client;
$this->pubSubContext = $client->pubSub();
}
/**
* Checks if the passed argument is a valid callback.
*
* @param mixed A callback.
*/
protected function validateCallback($callable)
{
if (!is_callable($callable)) {
throw new \InvalidArgumentException("A valid callable object must be provided");
}
}
/**
* Returns the underlying Publish / Subscribe context.
*
* @return PubSubContext
*/
public function getPubSubContext()
{
return $this->pubSubContext;
}
/**
* Sets a callback that gets invoked upon new subscriptions.
*
* @param mixed $callable A callback.
*/
public function subscriptionCallback($callable = null)
{
if (isset($callable)) {
$this->validateCallback($callable);
}
$this->subscriptionCallback = $callable;
}
/**
* Sets a callback that gets invoked when a message is received on a
* channel that does not have an associated callback.
*
* @param mixed $callable A callback.
*/
public function defaultCallback($callable = null)
{
if (isset($callable)) {
$this->validateCallback($callable);
}
$this->subscriptionCallback = $callable;
}
/**
* Binds a callback to a channel.
*
* @param string $channel Channel name.
* @param Callable $callback A callback.
*/
public function attachCallback($channel, $callback)
{
$this->validateCallback($callback);
$this->callbacks[$channel] = $callback;
$this->pubSubContext->subscribe($channel);
}
/**
* Stops listening to a channel and removes the associated callback.
*
* @param string $channel Redis channel.
*/
public function detachCallback($channel)
{
if (isset($this->callbacks[$channel])) {
unset($this->callbacks[$channel]);
$this->pubSubContext->unsubscribe($channel);
}
}
/**
* Starts the dispatcher loop.
*/
public function run()
{
foreach ($this->pubSubContext as $message) {
$kind = $message->kind;
if ($kind !== PubSubContext::MESSAGE && $kind !== PubSubContext::PMESSAGE) {
if (isset($this->subscriptionCallback)) {
$callback = $this->subscriptionCallback;
call_user_func($callback, $message);
}
continue;
}
if (isset($this->callbacks[$message->channel])) {
$callback = $this->callbacks[$message->channel];
call_user_func($callback, $message->payload);
} else if (isset($this->defaultCallback)) {
$callback = $this->defaultCallback;
call_user_func($callback, $message);
}
}
}
/**
* Terminates the dispatcher loop.
*/
public function stop()
{
$this->pubSubContext->closeContext();
}
}
|