Proyectos de Subversion Moodle

Rev

| Ultima modificación | Ver Log |

Rev Autor Línea Nro. Línea
1 efrain 1
<?php
2
namespace Aws\S3;
3
 
4
use Aws\AwsClientInterface;
5
use Aws\S3\Exception\DeleteMultipleObjectsException;
6
use GuzzleHttp\Promise;
7
use GuzzleHttp\Promise\PromisorInterface;
8
use GuzzleHttp\Promise\PromiseInterface;
9
 
10
/**
11
 * Efficiently deletes many objects from a single Amazon S3 bucket using an
12
 * iterator that yields keys. Deletes are made using the DeleteObjects API
13
 * operation.
14
 *
15
 *     $s3 = new Aws\S3\Client([
16
 *         'region' => 'us-west-2',
17
 *         'version' => 'latest'
18
 *     ]);
19
 *
20
 *     $listObjectsParams = ['Bucket' => 'foo', 'Prefix' => 'starts/with/'];
21
 *     $delete = Aws\S3\BatchDelete::fromListObjects($s3, $listObjectsParams);
22
 *     // Asynchronously delete
23
 *     $promise = $delete->promise();
24
 *     // Force synchronous completion
25
 *     $delete->delete();
26
 *
27
 * When using one of the batch delete creational static methods, you can supply
28
 * an associative array of options:
29
 *
30
 * - before: Function invoked before executing a command. The function is
31
 *   passed the command that is about to be executed. This can be useful
32
 *   for logging, adding custom request headers, etc.
33
 * - batch_size: The size of each delete batch. Defaults to 1000.
34
 *
35
 * @link http://docs.aws.amazon.com/AmazonS3/latest/API/multiobjectdeleteapi.html
36
 */
37
class BatchDelete implements PromisorInterface
38
{
39
    private $bucket;
40
    /** @var AwsClientInterface */
41
    private $client;
42
    /** @var callable */
43
    private $before;
44
    /** @var PromiseInterface */
45
    private $cachedPromise;
46
    /** @var callable */
47
    private $promiseCreator;
48
    private $batchSize = 1000;
49
    private $queue = [];
50
 
51
    /**
52
     * Creates a BatchDelete object from all of the paginated results of a
53
     * ListObjects operation. Each result that is returned by the ListObjects
54
     * operation will be deleted.
55
     *
56
     * @param AwsClientInterface $client            AWS Client to use.
57
     * @param array              $listObjectsParams ListObjects API parameters
58
     * @param array              $options           BatchDelete options.
59
     *
60
     * @return BatchDelete
61
     */
62
    public static function fromListObjects(
63
        AwsClientInterface $client,
64
        array $listObjectsParams,
65
        array $options = []
66
    ) {
67
        $iter = $client->getPaginator('ListObjects', $listObjectsParams);
68
        $bucket = $listObjectsParams['Bucket'];
69
        $fn = function (BatchDelete $that) use ($iter) {
70
            return $iter->each(function ($result) use ($that) {
71
                $promises = [];
72
                if (is_array($result['Contents'])) {
73
                    foreach ($result['Contents'] as $object) {
74
                        if ($promise = $that->enqueue($object)) {
75
                            $promises[] = $promise;
76
                        }
77
                    }
78
                }
79
                return $promises ? Promise\Utils::all($promises) : null;
80
            });
81
        };
82
 
83
        return new self($client, $bucket, $fn, $options);
84
    }
85
 
86
    /**
87
     * Creates a BatchDelete object from an iterator that yields results.
88
     *
89
     * @param AwsClientInterface $client  AWS Client to use to execute commands
90
     * @param string             $bucket  Bucket where the objects are stored
91
     * @param \Iterator          $iter    Iterator that yields assoc arrays
92
     * @param array              $options BatchDelete options
93
     *
94
     * @return BatchDelete
95
     */
96
    public static function fromIterator(
97
        AwsClientInterface $client,
98
        $bucket,
99
        \Iterator $iter,
100
        array $options = []
101
    ) {
102
        $fn = function (BatchDelete $that) use ($iter) {
103
            return Promise\Coroutine::of(function () use ($that, $iter) {
104
                foreach ($iter as $obj) {
105
                    if ($promise = $that->enqueue($obj)) {
106
                        yield $promise;
107
                    }
108
                }
109
            });
110
        };
111
 
112
        return new self($client, $bucket, $fn, $options);
113
    }
114
 
115
    /**
116
     * @return PromiseInterface
117
     */
118
    public function promise()
119
    {
120
        if (!$this->cachedPromise) {
121
            $this->cachedPromise = $this->createPromise();
122
        }
123
 
124
        return $this->cachedPromise;
125
    }
126
 
127
    /**
128
     * Synchronously deletes all of the objects.
129
     *
130
     * @throws DeleteMultipleObjectsException on error.
131
     */
132
    public function delete()
133
    {
134
        $this->promise()->wait();
135
    }
136
 
137
    /**
138
     * @param AwsClientInterface $client    Client used to transfer the requests
139
     * @param string             $bucket    Bucket to delete from.
140
     * @param callable           $promiseFn Creates a promise.
141
     * @param array              $options   Hash of options used with the batch
142
     *
143
     * @throws \InvalidArgumentException if the provided batch_size is <= 0
144
     */
145
    private function __construct(
146
        AwsClientInterface $client,
147
        $bucket,
148
        callable $promiseFn,
149
        array $options = []
150
    ) {
151
        $this->client = $client;
152
        $this->bucket = $bucket;
153
        $this->promiseCreator = $promiseFn;
154
 
155
        if (isset($options['before'])) {
156
            if (!is_callable($options['before'])) {
157
                throw new \InvalidArgumentException('before must be callable');
158
            }
159
            $this->before = $options['before'];
160
        }
161
 
162
        if (isset($options['batch_size'])) {
163
            if ($options['batch_size'] <= 0) {
164
                throw new \InvalidArgumentException('batch_size is not > 0');
165
            }
166
            $this->batchSize = min($options['batch_size'], 1000);
167
        }
168
    }
169
 
170
    private function enqueue(array $obj)
171
    {
172
        $this->queue[] = $obj;
173
        return count($this->queue) >= $this->batchSize
174
            ? $this->flushQueue()
175
            : null;
176
    }
177
 
178
    private function flushQueue()
179
    {
180
        static $validKeys = ['Key' => true, 'VersionId' => true];
181
 
182
        if (count($this->queue) === 0) {
183
            return null;
184
        }
185
 
186
        $batch = [];
187
        while ($obj = array_shift($this->queue)) {
188
            $batch[] = array_intersect_key($obj, $validKeys);
189
        }
190
 
191
        $command = $this->client->getCommand('DeleteObjects', [
192
            'Bucket' => $this->bucket,
193
            'Delete' => ['Objects' => $batch]
194
        ]);
195
 
196
        if ($this->before) {
197
            call_user_func($this->before, $command);
198
        }
199
 
200
        return $this->client->executeAsync($command)
201
            ->then(function ($result) {
202
                if (!empty($result['Errors'])) {
203
                    throw new DeleteMultipleObjectsException(
204
                        $result['Deleted'] ?: [],
205
                        $result['Errors']
206
                    );
207
                }
208
                return $result;
209
            });
210
    }
211
 
212
    /**
213
     * Returns a promise that will clean up any references when it completes.
214
     *
215
     * @return PromiseInterface
216
     */
217
    private function createPromise()
218
    {
219
        // Create the promise
220
        $promise = call_user_func($this->promiseCreator, $this);
221
        $this->promiseCreator = null;
222
 
223
        // Cleans up the promise state and references.
224
        $cleanup = function () {
225
            $this->before = $this->client = $this->queue = null;
226
        };
227
 
228
        // When done, ensure cleanup and that any remaining are processed.
229
        return $promise->then(
230
            function () use ($cleanup)  {
231
                return Promise\Create::promiseFor($this->flushQueue())
232
                    ->then($cleanup);
233
            },
234
            function ($reason) use ($cleanup)  {
235
                $cleanup();
236
                return Promise\Create::rejectionFor($reason);
237
            }
238
        );
239
    }
240
}