1 |
efrain |
1 |
<?php
|
|
|
2 |
namespace Aws\S3;
|
|
|
3 |
|
|
|
4 |
use Aws\AwsClientInterface;
|
|
|
5 |
use Aws\S3\Exception\DeleteMultipleObjectsException;
|
|
|
6 |
use GuzzleHttp\Promise;
|
|
|
7 |
use GuzzleHttp\Promise\PromisorInterface;
|
|
|
8 |
use GuzzleHttp\Promise\PromiseInterface;
|
|
|
9 |
|
|
|
10 |
/**
|
|
|
11 |
* Efficiently deletes many objects from a single Amazon S3 bucket using an
|
|
|
12 |
* iterator that yields keys. Deletes are made using the DeleteObjects API
|
|
|
13 |
* operation.
|
|
|
14 |
*
|
|
|
15 |
* $s3 = new Aws\S3\Client([
|
|
|
16 |
* 'region' => 'us-west-2',
|
|
|
17 |
* 'version' => 'latest'
|
|
|
18 |
* ]);
|
|
|
19 |
*
|
|
|
20 |
* $listObjectsParams = ['Bucket' => 'foo', 'Prefix' => 'starts/with/'];
|
|
|
21 |
* $delete = Aws\S3\BatchDelete::fromListObjects($s3, $listObjectsParams);
|
|
|
22 |
* // Asynchronously delete
|
|
|
23 |
* $promise = $delete->promise();
|
|
|
24 |
* // Force synchronous completion
|
|
|
25 |
* $delete->delete();
|
|
|
26 |
*
|
|
|
27 |
* When using one of the batch delete creational static methods, you can supply
|
|
|
28 |
* an associative array of options:
|
|
|
29 |
*
|
|
|
30 |
* - before: Function invoked before executing a command. The function is
|
|
|
31 |
* passed the command that is about to be executed. This can be useful
|
|
|
32 |
* for logging, adding custom request headers, etc.
|
|
|
33 |
* - batch_size: The size of each delete batch. Defaults to 1000.
|
|
|
34 |
*
|
|
|
35 |
* @link http://docs.aws.amazon.com/AmazonS3/latest/API/multiobjectdeleteapi.html
|
|
|
36 |
*/
|
|
|
37 |
class BatchDelete implements PromisorInterface
|
|
|
38 |
{
|
|
|
39 |
private $bucket;
|
|
|
40 |
/** @var AwsClientInterface */
|
|
|
41 |
private $client;
|
|
|
42 |
/** @var callable */
|
|
|
43 |
private $before;
|
|
|
44 |
/** @var PromiseInterface */
|
|
|
45 |
private $cachedPromise;
|
|
|
46 |
/** @var callable */
|
|
|
47 |
private $promiseCreator;
|
|
|
48 |
private $batchSize = 1000;
|
|
|
49 |
private $queue = [];
|
|
|
50 |
|
|
|
51 |
/**
|
|
|
52 |
* Creates a BatchDelete object from all of the paginated results of a
|
|
|
53 |
* ListObjects operation. Each result that is returned by the ListObjects
|
|
|
54 |
* operation will be deleted.
|
|
|
55 |
*
|
|
|
56 |
* @param AwsClientInterface $client AWS Client to use.
|
|
|
57 |
* @param array $listObjectsParams ListObjects API parameters
|
|
|
58 |
* @param array $options BatchDelete options.
|
|
|
59 |
*
|
|
|
60 |
* @return BatchDelete
|
|
|
61 |
*/
|
|
|
62 |
public static function fromListObjects(
|
|
|
63 |
AwsClientInterface $client,
|
|
|
64 |
array $listObjectsParams,
|
|
|
65 |
array $options = []
|
|
|
66 |
) {
|
|
|
67 |
$iter = $client->getPaginator('ListObjects', $listObjectsParams);
|
|
|
68 |
$bucket = $listObjectsParams['Bucket'];
|
|
|
69 |
$fn = function (BatchDelete $that) use ($iter) {
|
|
|
70 |
return $iter->each(function ($result) use ($that) {
|
|
|
71 |
$promises = [];
|
|
|
72 |
if (is_array($result['Contents'])) {
|
|
|
73 |
foreach ($result['Contents'] as $object) {
|
|
|
74 |
if ($promise = $that->enqueue($object)) {
|
|
|
75 |
$promises[] = $promise;
|
|
|
76 |
}
|
|
|
77 |
}
|
|
|
78 |
}
|
|
|
79 |
return $promises ? Promise\Utils::all($promises) : null;
|
|
|
80 |
});
|
|
|
81 |
};
|
|
|
82 |
|
|
|
83 |
return new self($client, $bucket, $fn, $options);
|
|
|
84 |
}
|
|
|
85 |
|
|
|
86 |
/**
|
|
|
87 |
* Creates a BatchDelete object from an iterator that yields results.
|
|
|
88 |
*
|
|
|
89 |
* @param AwsClientInterface $client AWS Client to use to execute commands
|
|
|
90 |
* @param string $bucket Bucket where the objects are stored
|
|
|
91 |
* @param \Iterator $iter Iterator that yields assoc arrays
|
|
|
92 |
* @param array $options BatchDelete options
|
|
|
93 |
*
|
|
|
94 |
* @return BatchDelete
|
|
|
95 |
*/
|
|
|
96 |
public static function fromIterator(
|
|
|
97 |
AwsClientInterface $client,
|
|
|
98 |
$bucket,
|
|
|
99 |
\Iterator $iter,
|
|
|
100 |
array $options = []
|
|
|
101 |
) {
|
|
|
102 |
$fn = function (BatchDelete $that) use ($iter) {
|
|
|
103 |
return Promise\Coroutine::of(function () use ($that, $iter) {
|
|
|
104 |
foreach ($iter as $obj) {
|
|
|
105 |
if ($promise = $that->enqueue($obj)) {
|
|
|
106 |
yield $promise;
|
|
|
107 |
}
|
|
|
108 |
}
|
|
|
109 |
});
|
|
|
110 |
};
|
|
|
111 |
|
|
|
112 |
return new self($client, $bucket, $fn, $options);
|
|
|
113 |
}
|
|
|
114 |
|
|
|
115 |
/**
|
|
|
116 |
* @return PromiseInterface
|
|
|
117 |
*/
|
|
|
118 |
public function promise()
|
|
|
119 |
{
|
|
|
120 |
if (!$this->cachedPromise) {
|
|
|
121 |
$this->cachedPromise = $this->createPromise();
|
|
|
122 |
}
|
|
|
123 |
|
|
|
124 |
return $this->cachedPromise;
|
|
|
125 |
}
|
|
|
126 |
|
|
|
127 |
/**
|
|
|
128 |
* Synchronously deletes all of the objects.
|
|
|
129 |
*
|
|
|
130 |
* @throws DeleteMultipleObjectsException on error.
|
|
|
131 |
*/
|
|
|
132 |
public function delete()
|
|
|
133 |
{
|
|
|
134 |
$this->promise()->wait();
|
|
|
135 |
}
|
|
|
136 |
|
|
|
137 |
/**
|
|
|
138 |
* @param AwsClientInterface $client Client used to transfer the requests
|
|
|
139 |
* @param string $bucket Bucket to delete from.
|
|
|
140 |
* @param callable $promiseFn Creates a promise.
|
|
|
141 |
* @param array $options Hash of options used with the batch
|
|
|
142 |
*
|
|
|
143 |
* @throws \InvalidArgumentException if the provided batch_size is <= 0
|
|
|
144 |
*/
|
|
|
145 |
private function __construct(
|
|
|
146 |
AwsClientInterface $client,
|
|
|
147 |
$bucket,
|
|
|
148 |
callable $promiseFn,
|
|
|
149 |
array $options = []
|
|
|
150 |
) {
|
|
|
151 |
$this->client = $client;
|
|
|
152 |
$this->bucket = $bucket;
|
|
|
153 |
$this->promiseCreator = $promiseFn;
|
|
|
154 |
|
|
|
155 |
if (isset($options['before'])) {
|
|
|
156 |
if (!is_callable($options['before'])) {
|
|
|
157 |
throw new \InvalidArgumentException('before must be callable');
|
|
|
158 |
}
|
|
|
159 |
$this->before = $options['before'];
|
|
|
160 |
}
|
|
|
161 |
|
|
|
162 |
if (isset($options['batch_size'])) {
|
|
|
163 |
if ($options['batch_size'] <= 0) {
|
|
|
164 |
throw new \InvalidArgumentException('batch_size is not > 0');
|
|
|
165 |
}
|
|
|
166 |
$this->batchSize = min($options['batch_size'], 1000);
|
|
|
167 |
}
|
|
|
168 |
}
|
|
|
169 |
|
|
|
170 |
private function enqueue(array $obj)
|
|
|
171 |
{
|
|
|
172 |
$this->queue[] = $obj;
|
|
|
173 |
return count($this->queue) >= $this->batchSize
|
|
|
174 |
? $this->flushQueue()
|
|
|
175 |
: null;
|
|
|
176 |
}
|
|
|
177 |
|
|
|
178 |
private function flushQueue()
|
|
|
179 |
{
|
|
|
180 |
static $validKeys = ['Key' => true, 'VersionId' => true];
|
|
|
181 |
|
|
|
182 |
if (count($this->queue) === 0) {
|
|
|
183 |
return null;
|
|
|
184 |
}
|
|
|
185 |
|
|
|
186 |
$batch = [];
|
|
|
187 |
while ($obj = array_shift($this->queue)) {
|
|
|
188 |
$batch[] = array_intersect_key($obj, $validKeys);
|
|
|
189 |
}
|
|
|
190 |
|
|
|
191 |
$command = $this->client->getCommand('DeleteObjects', [
|
|
|
192 |
'Bucket' => $this->bucket,
|
|
|
193 |
'Delete' => ['Objects' => $batch]
|
|
|
194 |
]);
|
|
|
195 |
|
|
|
196 |
if ($this->before) {
|
|
|
197 |
call_user_func($this->before, $command);
|
|
|
198 |
}
|
|
|
199 |
|
|
|
200 |
return $this->client->executeAsync($command)
|
|
|
201 |
->then(function ($result) {
|
|
|
202 |
if (!empty($result['Errors'])) {
|
|
|
203 |
throw new DeleteMultipleObjectsException(
|
|
|
204 |
$result['Deleted'] ?: [],
|
|
|
205 |
$result['Errors']
|
|
|
206 |
);
|
|
|
207 |
}
|
|
|
208 |
return $result;
|
|
|
209 |
});
|
|
|
210 |
}
|
|
|
211 |
|
|
|
212 |
/**
|
|
|
213 |
* Returns a promise that will clean up any references when it completes.
|
|
|
214 |
*
|
|
|
215 |
* @return PromiseInterface
|
|
|
216 |
*/
|
|
|
217 |
private function createPromise()
|
|
|
218 |
{
|
|
|
219 |
// Create the promise
|
|
|
220 |
$promise = call_user_func($this->promiseCreator, $this);
|
|
|
221 |
$this->promiseCreator = null;
|
|
|
222 |
|
|
|
223 |
// Cleans up the promise state and references.
|
|
|
224 |
$cleanup = function () {
|
|
|
225 |
$this->before = $this->client = $this->queue = null;
|
|
|
226 |
};
|
|
|
227 |
|
|
|
228 |
// When done, ensure cleanup and that any remaining are processed.
|
|
|
229 |
return $promise->then(
|
|
|
230 |
function () use ($cleanup) {
|
|
|
231 |
return Promise\Create::promiseFor($this->flushQueue())
|
|
|
232 |
->then($cleanup);
|
|
|
233 |
},
|
|
|
234 |
function ($reason) use ($cleanup) {
|
|
|
235 |
$cleanup();
|
|
|
236 |
return Promise\Create::rejectionFor($reason);
|
|
|
237 |
}
|
|
|
238 |
);
|
|
|
239 |
}
|
|
|
240 |
}
|