/usr/share/php/Aws/DynamoDb/WriteRequestBatch.php is in php-aws-sdk 3.15.1-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 | <?php
namespace Aws\DynamoDb;
use Aws\AwsClientInterface;
use Aws\CommandInterface;
use Aws\CommandPool;
use Aws\Exception\AwsException;
use Aws\ResultInterface;
/**
* The WriteRequestBatch is an object that is capable of efficiently sending
* DynamoDB BatchWriteItem requests from queued up put and delete item requests.
* requests. The batch attempts to send the requests with the fewest requests
* to DynamoDB as possible and also re-queues any unprocessed items to ensure
* that all items are sent.
*/
class WriteRequestBatch
{
/** @var DynamoDbClient DynamoDB client used to perform write operations. */
private $client;
/** @var array Configuration options for the batch. */
private $config;
/** @var array Queue of pending put/delete requests in the batch. */
private $queue;
/**
* Creates a WriteRequestBatch object that is capable of efficiently sending
* DynamoDB BatchWriteItem requests from queued up Put and Delete requests.
*
* @param DynamoDbClient $client DynamoDB client used to send batches.
* @param array $config Batch configuration options.
* - table: (string) DynamoDB table used by the batch, this can be
* overridden for each individual put() or delete() call.
* - batch_size: (int) The size of each batch (default: 25). The batch
* size must be between 2 and 25. If you are sending batches of large
* items, you may consider lowering the batch size, otherwise, you
* should use 25.
* - pool_size: (int) This number dictates how many BatchWriteItem
* requests you would like to do in parallel. For example, if the
* "batch_size" is 25, and "pool_size" is 3, then you would send 3
* BatchWriteItem requests at a time, each with 25 items. Please keep
* your throughput in mind when choosing the "pool_size" option.
* - autoflush: (bool) This option allows the batch to automatically
* flush once there are enough items (i.e., "batch_size" * "pool_size")
* in the queue. This defaults to true, so you must set this to false
* to stop autoflush.
* - before: (callable) Executed before every BatchWriteItem operation.
* It should accept an \Aws\CommandInterface object as its argument.
* - error: Executed if an error was encountered executing a,
* BatchWriteItem operation, otherwise errors are ignored. It should
* accept an \Aws\Exception\AwsException as its argument.
*
* @throws \InvalidArgumentException if the batch size is not between 2 and 25.
*/
public function __construct(DynamoDbClient $client, array $config = [])
{
// Apply defaults
$config += [
'table' => null,
'batch_size' => 25,
'pool_size' => 1,
'autoflush' => true,
'before' => null,
'error' => null
];
// Ensure the batch size is valid
if ($config['batch_size'] > 25 || $config['batch_size'] < 2) {
throw new \InvalidArgumentException('"batch_size" must be between 2 and 25.');
}
// Ensure the callbacks are valid
if ($config['before'] && !is_callable($config['before'])) {
throw new \InvalidArgumentException('"before" must be callable.');
}
if ($config['error'] && !is_callable($config['error'])) {
throw new \InvalidArgumentException('"error" must be callable.');
}
// If autoflush is enabled, set the threshold
if ($config['autoflush']) {
$config['threshold'] = $config['batch_size'] * $config['pool_size'];
}
$this->client = $client;
$this->config = $config;
$this->queue = [];
}
/**
* Adds a put item request to the batch.
*
* @param array $item Data for an item to put. Format:
* [
* 'attribute1' => ['type' => 'value'],
* 'attribute2' => ['type' => 'value'],
* ...
* ]
* @param string|null $table The name of the table. This must be specified
* unless the "table" option was provided in the
* config of the WriteRequestBatch.
*
* @return $this
*/
public function put(array $item, $table = null)
{
$this->queue[] = [
'table' => $this->determineTable($table),
'data' => ['PutRequest' => ['Item' => $item]],
];
$this->autoFlush();
return $this;
}
/**
* Adds a delete item request to the batch.
*
* @param array $key Key of an item to delete. Format:
* [
* 'key1' => ['type' => 'value'],
* ...
* ]
* @param string|null $table The name of the table. This must be specified
* unless the "table" option was provided in the
* config of the WriteRequestBatch.
*
* @return $this
*/
public function delete(array $key, $table = null)
{
$this->queue[] = [
'table' => $this->determineTable($table),
'data' => ['DeleteRequest' => ['Key' => $key]],
];
$this->autoFlush();
return $this;
}
/**
* Flushes the batch by combining all the queued put and delete requests
* into BatchWriteItem commands and executing them. Unprocessed items are
* automatically re-queued.
*
* @param bool $untilEmpty If true, flushing will continue until the queue
* is completely empty. This will make sure that
* unprocessed items are all eventually sent.
*
* @return $this
*/
public function flush($untilEmpty = true)
{
// Send BatchWriteItem requests until the queue is empty
$keepFlushing = true;
while ($this->queue && $keepFlushing) {
$commands = $this->prepareCommands();
$pool = new CommandPool($this->client, $commands, [
'before' => $this->config['before'],
'concurrency' => $this->config['pool_size'],
'fulfilled' => function (ResultInterface $result) {
// Re-queue any unprocessed items
if ($result->hasKey('UnprocessedItems')) {
$this->retryUnprocessed($result['UnprocessedItems']);
}
},
'rejected' => function ($reason) {
if ($reason instanceof AwsException) {
$code = $reason->getAwsErrorCode();
if ($code === 'ProvisionedThroughputExceededException') {
$this->retryUnprocessed($reason->getCommand()['RequestItems']);
} elseif (is_callable($this->config['error'])) {
$this->config['error']($reason);
}
}
}
]);
$pool->promise()->wait();
$keepFlushing = (bool) $untilEmpty;
}
return $this;
}
/**
* Creates BatchWriteItem commands from the items in the queue.
*
* @return CommandInterface[]
*/
private function prepareCommands()
{
// Chunk the queue into batches
$batches = array_chunk($this->queue, $this->config['batch_size']);
$this->queue = [];
// Create BatchWriteItem commands for each batch
$commands = [];
foreach ($batches as $batch) {
$requests = [];
foreach ($batch as $item) {
if (!isset($requests[$item['table']])) {
$requests[$item['table']] = [];
}
$requests[$item['table']][] = $item['data'];
}
$commands[] = $this->client->getCommand(
'BatchWriteItem',
['RequestItems' => $requests]
);
}
return $commands;
}
/**
* Re-queues unprocessed results with the correct data.
*
* @param array $unprocessed Unprocessed items from a result.
*/
private function retryUnprocessed(array $unprocessed)
{
foreach ($unprocessed as $table => $requests) {
foreach ($requests as $request) {
$this->queue[] = [
'table' => $table,
'data' => $request,
];
}
}
}
/**
* If autoflush is enabled and the threshold is met, flush the batch
*/
private function autoFlush()
{
if ($this->config['autoflush']
&& count($this->queue) >= $this->config['threshold']
) {
// Flush only once. Unprocessed items are handled in a later flush.
$this->flush(false);
}
}
/**
* Determine the table name by looking at what was provided and what the
* WriteRequestBatch was originally configured with.
*
* @param string|null $table The table name.
*
* @return string
* @throws \RuntimeException if there was no table specified.
*/
private function determineTable($table)
{
$table = $table ?: $this->config['table'];
if (!$table) {
throw new \RuntimeException('There was no table specified.');
}
return $table;
}
}
|