Proyectos de Subversion Moodle

Rev

Rev 1 | | Comparar con el anterior | Ultima modificación | Ver Log |

Rev Autor Línea Nro. Línea
1 efrain 1
<?php
2
namespace Aws\S3;
3
 
4
use Aws;
5
use Aws\CommandInterface;
6
use Aws\Exception\AwsException;
1441 ariadna 7
use Aws\MetricsBuilder;
1 efrain 8
use GuzzleHttp\Promise;
9
use GuzzleHttp\Promise\PromiseInterface;
10
use GuzzleHttp\Promise\PromisorInterface;
11
use Iterator;
12
 
13
/**
14
 * Transfers files from the local filesystem to S3 or from S3 to the local
15
 * filesystem.
16
 *
17
 * This class does not support copying from the local filesystem to somewhere
18
 * else on the local filesystem or from one S3 bucket to another.
19
 */
20
class Transfer implements PromisorInterface
21
{
22
    private $client;
23
    private $promise;
24
    private $source;
25
    private $sourceMetadata;
26
    private $destination;
27
    private $concurrency;
28
    private $mupThreshold;
29
    private $before;
30
    private $s3Args = [];
31
    private $addContentMD5;
32
 
33
    /**
34
     * When providing the $source argument, you may provide a string referencing
35
     * the path to a directory on disk to upload, an s3 scheme URI that contains
36
     * the bucket and key (e.g., "s3://bucket/key"), or an \Iterator object
37
     * that yields strings containing filenames that are the path to a file on
38
     * disk or an s3 scheme URI. The bucket portion of the s3 URI may be an S3
39
     * access point ARN. The "/key" portion of an s3 URI is optional.
40
     *
41
     * When providing an iterator for the $source argument, you must also
42
     * provide a 'base_dir' key value pair in the $options argument.
43
     *
44
     * The $dest argument can be the path to a directory on disk or an s3
45
     * scheme URI (e.g., "s3://bucket/key").
46
     *
47
     * The options array can contain the following key value pairs:
48
     *
49
     * - base_dir: (string) Base directory of the source, if $source is an
50
     *   iterator. If the $source option is not an array, then this option is
51
     *   ignored.
52
     * - before: (callable) A callback to invoke before each transfer. The
53
     *   callback accepts a single argument: Aws\CommandInterface $command.
54
     *   The provided command will be either a GetObject, PutObject,
55
     *   InitiateMultipartUpload, or UploadPart command.
56
     * - mup_threshold: (int) Size in bytes in which a multipart upload should
57
     *   be used instead of PutObject. Defaults to 20971520 (20 MB).
58
     * - concurrency: (int, default=5) Number of files to upload concurrently.
59
     *   The ideal concurrency value will vary based on the number of files
60
     *   being uploaded and the average size of each file. Generally speaking,
61
     *   smaller files benefit from a higher concurrency while larger files
62
     *   will not.
63
     * - debug: (bool) Set to true to print out debug information for
64
     *   transfers. Set to an fopen() resource to write to a specific stream
65
     *   rather than writing to STDOUT.
66
     *
67
     * @param S3ClientInterface $client  Client used for transfers.
68
     * @param string|Iterator   $source  Where the files are transferred from.
69
     * @param string            $dest    Where the files are transferred to.
70
     * @param array             $options Hash of options.
71
     */
72
    public function __construct(
73
        S3ClientInterface $client,
74
        $source,
75
        $dest,
76
        array $options = []
77
    ) {
78
        $this->client = $client;
79
 
80
        // Prepare the destination.
81
        $this->destination = $this->prepareTarget($dest);
82
        if ($this->destination['scheme'] === 's3') {
83
            $this->s3Args = $this->getS3Args($this->destination['path']);
84
        }
85
 
86
        // Prepare the source.
87
        if (is_string($source)) {
88
            $this->sourceMetadata = $this->prepareTarget($source);
89
            $this->source = $source;
90
        } elseif ($source instanceof Iterator) {
91
            if (empty($options['base_dir'])) {
92
                throw new \InvalidArgumentException('You must provide the source'
93
                    . ' argument as a string or provide the "base_dir" option.');
94
            }
95
 
96
            $this->sourceMetadata = $this->prepareTarget($options['base_dir']);
97
            $this->source = $source;
98
        } else {
99
            throw new \InvalidArgumentException('source must be the path to a '
100
                . 'directory or an iterator that yields file names.');
101
        }
102
 
103
        // Validate schemes.
104
        if ($this->sourceMetadata['scheme'] === $this->destination['scheme']) {
105
            throw new \InvalidArgumentException("You cannot copy from"
106
                . " {$this->sourceMetadata['scheme']} to"
107
                . " {$this->destination['scheme']}."
108
            );
109
        }
110
 
111
        // Handle multipart-related options.
112
        $this->concurrency = isset($options['concurrency'])
113
            ? $options['concurrency']
114
            : MultipartUploader::DEFAULT_CONCURRENCY;
115
        $this->mupThreshold = isset($options['mup_threshold'])
116
            ? $options['mup_threshold']
117
            : 16777216;
118
        if ($this->mupThreshold < MultipartUploader::PART_MIN_SIZE) {
119
            throw new \InvalidArgumentException('mup_threshold must be >= 5MB');
120
        }
121
 
122
        // Handle "before" callback option.
123
        if (isset($options['before'])) {
124
            $this->before = $options['before'];
125
            if (!is_callable($this->before)) {
126
                throw new \InvalidArgumentException('before must be a callable.');
127
            }
128
        }
129
 
130
        // Handle "debug" option.
131
        if (isset($options['debug'])) {
132
            if ($options['debug'] === true) {
133
                $options['debug'] = fopen('php://output', 'w');
134
            }
135
            if (is_resource($options['debug'])) {
136
                $this->addDebugToBefore($options['debug']);
137
            }
138
        }
139
 
140
        // Handle "add_content_md5" option.
141
        $this->addContentMD5 = isset($options['add_content_md5'])
142
            && $options['add_content_md5'] === true;
1441 ariadna 143
        MetricsBuilder::appendMetricsCaptureMiddleware(
144
            $this->client->getHandlerList(),
145
            MetricsBuilder::S3_TRANSFER
146
        );
1 efrain 147
    }
148
 
149
    /**
150
     * Transfers the files.
151
     *
152
     * @return PromiseInterface
153
     */
1441 ariadna 154
    public function promise(): PromiseInterface
1 efrain 155
    {
156
        // If the promise has been created, just return it.
157
        if (!$this->promise) {
158
            // Create an upload/download promise for the transfer.
159
            $this->promise = $this->sourceMetadata['scheme'] === 'file'
160
                ? $this->createUploadPromise()
161
                : $this->createDownloadPromise();
162
        }
163
 
164
        return $this->promise;
165
    }
166
 
167
    /**
168
     * Transfers the files synchronously.
169
     */
170
    public function transfer()
171
    {
172
        $this->promise()->wait();
173
    }
174
 
175
    private function prepareTarget($targetPath)
176
    {
177
        $target = [
178
            'path'   => $this->normalizePath($targetPath),
179
            'scheme' => $this->determineScheme($targetPath),
180
        ];
181
 
182
        if ($target['scheme'] !== 's3' && $target['scheme'] !== 'file') {
183
            throw new \InvalidArgumentException('Scheme must be "s3" or "file".');
184
        }
185
 
186
        return $target;
187
    }
188
 
189
    /**
190
     * Creates an array that contains Bucket and Key by parsing the filename.
191
     *
192
     * @param string $path Path to parse.
193
     *
194
     * @return array
195
     */
196
    private function getS3Args($path)
197
    {
198
        $parts = explode('/', str_replace('s3://', '', $path), 2);
199
        $args = ['Bucket' => $parts[0]];
200
        if (isset($parts[1])) {
201
            $args['Key'] = $parts[1];
202
        }
203
 
204
        return $args;
205
    }
206
 
207
    /**
208
     * Parses the scheme from a filename.
209
     *
210
     * @param string $path Path to parse.
211
     *
212
     * @return string
213
     */
214
    private function determineScheme($path)
215
    {
216
        return !strpos($path, '://') ? 'file' : explode('://', $path)[0];
217
    }
218
 
219
    /**
220
     * Normalize a path so that it has UNIX-style directory separators and no trailing /
221
     *
222
     * @param string $path
223
     *
224
     * @return string
225
     */
226
    private function normalizePath($path)
227
    {
228
        return rtrim(str_replace('\\', '/', $path), '/');
229
    }
230
 
231
    private function resolvesOutsideTargetDirectory($sink, $objectKey)
232
    {
233
        $resolved = [];
234
        $sections = explode('/', $sink);
235
        $targetSectionsLength = count(explode('/', $objectKey));
236
        $targetSections = array_slice($sections, -($targetSectionsLength + 1));
237
        $targetDirectory = $targetSections[0];
238
 
239
        foreach ($targetSections as $section) {
240
            if ($section === '.' || $section === '') {
241
                continue;
242
            }
243
            if ($section === '..') {
244
                array_pop($resolved);
245
                if (empty($resolved) || $resolved[0] !== $targetDirectory) {
246
                    return true;
247
                }
248
            } else {
249
                $resolved []= $section;
250
            }
251
        }
252
        return false;
253
    }
254
 
255
    private function createDownloadPromise()
256
    {
257
        $parts = $this->getS3Args($this->sourceMetadata['path']);
258
        $prefix = "s3://{$parts['Bucket']}/"
259
            . (isset($parts['Key']) ? $parts['Key'] . '/' : '');
260
 
261
        $commands = [];
262
        foreach ($this->getDownloadsIterator() as $object) {
263
            // Prepare the sink.
264
            $objectKey = preg_replace('/^' . preg_quote($prefix, '/') . '/', '', $object);
265
            $sink = $this->destination['path'] . '/' . $objectKey;
266
 
267
            $command = $this->client->getCommand(
268
                'GetObject',
269
                $this->getS3Args($object) + ['@http'  => ['sink'  => $sink]]
270
            );
271
 
272
            if ($this->resolvesOutsideTargetDirectory($sink, $objectKey)) {
273
                throw new AwsException(
274
                    'Cannot download key ' . $objectKey
275
                    . ', its relative path resolves outside the'
276
                    . ' parent directory', $command);
277
            }
278
 
279
            // Create the directory if needed.
280
            $dir = dirname($sink);
281
            if (!is_dir($dir) && !mkdir($dir, 0777, true)) {
282
                throw new \RuntimeException("Could not create dir: {$dir}");
283
            }
284
 
285
            // Create the command.
286
            $commands []= $command;
287
        }
288
 
289
        // Create a GetObject command pool and return the promise.
290
        return (new Aws\CommandPool($this->client, $commands, [
291
            'concurrency' => $this->concurrency,
292
            'before'      => $this->before,
293
            'rejected'    => function ($reason, $idx, Promise\PromiseInterface $p) {
294
                $p->reject($reason);
295
            }
296
        ]))->promise();
297
    }
298
 
299
    private function createUploadPromise()
300
    {
301
        // Map each file into a promise that performs the actual transfer.
302
        $files = \Aws\map($this->getUploadsIterator(), function ($file) {
303
            return (filesize($file) >= $this->mupThreshold)
304
                ? $this->uploadMultipart($file)
305
                : $this->upload($file);
306
        });
307
 
308
        // Create an EachPromise, that will concurrently handle the upload
309
        // operations' yielded promises from the iterator.
310
        return Promise\Each::ofLimitAll($files, $this->concurrency);
311
    }
312
 
313
    /** @return Iterator */
314
    private function getUploadsIterator()
315
    {
316
        if (is_string($this->source)) {
317
            return Aws\filter(
318
                Aws\recursive_dir_iterator($this->sourceMetadata['path']),
319
                function ($file) { return !is_dir($file); }
320
            );
321
        }
322
 
323
        return $this->source;
324
    }
325
 
326
    /** @return Iterator */
327
    private function getDownloadsIterator()
328
    {
329
        if (is_string($this->source)) {
330
            $listArgs = $this->getS3Args($this->sourceMetadata['path']);
331
            if (isset($listArgs['Key'])) {
332
                $listArgs['Prefix'] = $listArgs['Key'] . '/';
333
                unset($listArgs['Key']);
334
            }
335
 
336
            $files = $this->client
337
                ->getPaginator('ListObjects', $listArgs)
338
                ->search('Contents[].Key');
339
            $files = Aws\map($files, function ($key) use ($listArgs) {
340
                return "s3://{$listArgs['Bucket']}/$key";
341
            });
342
            return Aws\filter($files, function ($key) {
343
                return substr($key, -1, 1) !== '/';
344
            });
345
        }
346
 
347
        return $this->source;
348
    }
349
 
350
    private function upload($filename)
351
    {
352
        $args = $this->s3Args;
353
        $args['SourceFile'] = $filename;
354
        $args['Key'] = $this->createS3Key($filename);
355
        $args['AddContentMD5'] = $this->addContentMD5;
356
        $command = $this->client->getCommand('PutObject', $args);
357
        $this->before and call_user_func($this->before, $command);
358
 
359
        return $this->client->executeAsync($command);
360
    }
361
 
362
    private function uploadMultipart($filename)
363
    {
364
        $args = $this->s3Args;
365
        $args['Key'] = $this->createS3Key($filename);
366
        $filename = $filename instanceof \SplFileInfo ? $filename->getPathname() : $filename;
367
 
368
        return (new MultipartUploader($this->client, $filename, [
369
            'bucket'          => $args['Bucket'],
370
            'key'             => $args['Key'],
371
            'before_initiate' => $this->before,
372
            'before_upload'   => $this->before,
373
            'before_complete' => $this->before,
374
            'concurrency'     => $this->concurrency,
375
            'add_content_md5' => $this->addContentMD5
376
        ]))->promise();
377
    }
378
 
379
    private function createS3Key($filename)
380
    {
381
        $filename = $this->normalizePath($filename);
382
        $relative_file_path = ltrim(
383
            preg_replace('#^' . preg_quote($this->sourceMetadata['path']) . '#', '', $filename),
384
            '/\\'
385
        );
386
 
387
        if (isset($this->s3Args['Key'])) {
388
            return rtrim($this->s3Args['Key'], '/').'/'.$relative_file_path;
389
        }
390
 
391
        return $relative_file_path;
392
    }
393
 
394
    private function addDebugToBefore($debug)
395
    {
396
        $before = $this->before;
397
        $sourcePath = $this->sourceMetadata['path'];
398
        $s3Args = $this->s3Args;
399
 
400
        $this->before = static function (
401
            CommandInterface $command
402
        ) use ($before, $debug, $sourcePath, $s3Args) {
403
            // Call the composed before function.
404
            $before and $before($command);
405
 
406
            // Determine the source and dest values based on operation.
407
            switch ($operation = $command->getName()) {
408
                case 'GetObject':
409
                    $source = "s3://{$command['Bucket']}/{$command['Key']}";
410
                    $dest = $command['@http']['sink'];
411
                    break;
412
                case 'PutObject':
413
                    $source = $command['SourceFile'];
414
                    $dest = "s3://{$command['Bucket']}/{$command['Key']}";
415
                    break;
416
                case 'UploadPart':
417
                    $part = $command['PartNumber'];
418
                case 'CreateMultipartUpload':
419
                case 'CompleteMultipartUpload':
420
                    $sourceKey = $command['Key'];
421
                    if (isset($s3Args['Key']) && strpos($sourceKey, $s3Args['Key']) === 0) {
422
                        $sourceKey = substr($sourceKey, strlen($s3Args['Key']) + 1);
423
                    }
424
                    $source = "{$sourcePath}/{$sourceKey}";
425
                    $dest = "s3://{$command['Bucket']}/{$command['Key']}";
426
                    break;
427
                default:
428
                    throw new \UnexpectedValueException(
429
                        "Transfer encountered an unexpected operation: {$operation}."
430
                    );
431
            }
432
 
433
            // Print the debugging message.
434
            $context = sprintf('%s -> %s (%s)', $source, $dest, $operation);
435
            if (isset($part)) {
436
                $context .= " : Part={$part}";
437
            }
438
            fwrite($debug, "Transferring {$context}\n");
439
        };
440
    }
441
}