Proyectos de Subversion Moodle

Rev

| Ultima modificación | Ver Log |

Rev Autor Línea Nro. Línea
1 efrain 1
<?php
2
namespace Aws\S3;
3
 
4
use Aws;
5
use Aws\CommandInterface;
6
use Aws\Exception\AwsException;
7
use GuzzleHttp\Promise;
8
use GuzzleHttp\Promise\PromiseInterface;
9
use GuzzleHttp\Promise\PromisorInterface;
10
use Iterator;
11
 
12
/**
13
 * Transfers files from the local filesystem to S3 or from S3 to the local
14
 * filesystem.
15
 *
16
 * This class does not support copying from the local filesystem to somewhere
17
 * else on the local filesystem or from one S3 bucket to another.
18
 */
19
class Transfer implements PromisorInterface
20
{
21
    private $client;
22
    private $promise;
23
    private $source;
24
    private $sourceMetadata;
25
    private $destination;
26
    private $concurrency;
27
    private $mupThreshold;
28
    private $before;
29
    private $s3Args = [];
30
    private $addContentMD5;
31
 
32
    /**
33
     * When providing the $source argument, you may provide a string referencing
34
     * the path to a directory on disk to upload, an s3 scheme URI that contains
35
     * the bucket and key (e.g., "s3://bucket/key"), or an \Iterator object
36
     * that yields strings containing filenames that are the path to a file on
37
     * disk or an s3 scheme URI. The bucket portion of the s3 URI may be an S3
38
     * access point ARN. The "/key" portion of an s3 URI is optional.
39
     *
40
     * When providing an iterator for the $source argument, you must also
41
     * provide a 'base_dir' key value pair in the $options argument.
42
     *
43
     * The $dest argument can be the path to a directory on disk or an s3
44
     * scheme URI (e.g., "s3://bucket/key").
45
     *
46
     * The options array can contain the following key value pairs:
47
     *
48
     * - base_dir: (string) Base directory of the source, if $source is an
49
     *   iterator. If the $source option is not an array, then this option is
50
     *   ignored.
51
     * - before: (callable) A callback to invoke before each transfer. The
52
     *   callback accepts a single argument: Aws\CommandInterface $command.
53
     *   The provided command will be either a GetObject, PutObject,
54
     *   InitiateMultipartUpload, or UploadPart command.
55
     * - mup_threshold: (int) Size in bytes in which a multipart upload should
56
     *   be used instead of PutObject. Defaults to 20971520 (20 MB).
57
     * - concurrency: (int, default=5) Number of files to upload concurrently.
58
     *   The ideal concurrency value will vary based on the number of files
59
     *   being uploaded and the average size of each file. Generally speaking,
60
     *   smaller files benefit from a higher concurrency while larger files
61
     *   will not.
62
     * - debug: (bool) Set to true to print out debug information for
63
     *   transfers. Set to an fopen() resource to write to a specific stream
64
     *   rather than writing to STDOUT.
65
     *
66
     * @param S3ClientInterface $client  Client used for transfers.
67
     * @param string|Iterator   $source  Where the files are transferred from.
68
     * @param string            $dest    Where the files are transferred to.
69
     * @param array             $options Hash of options.
70
     */
71
    public function __construct(
72
        S3ClientInterface $client,
73
        $source,
74
        $dest,
75
        array $options = []
76
    ) {
77
        $this->client = $client;
78
 
79
        // Prepare the destination.
80
        $this->destination = $this->prepareTarget($dest);
81
        if ($this->destination['scheme'] === 's3') {
82
            $this->s3Args = $this->getS3Args($this->destination['path']);
83
        }
84
 
85
        // Prepare the source.
86
        if (is_string($source)) {
87
            $this->sourceMetadata = $this->prepareTarget($source);
88
            $this->source = $source;
89
        } elseif ($source instanceof Iterator) {
90
            if (empty($options['base_dir'])) {
91
                throw new \InvalidArgumentException('You must provide the source'
92
                    . ' argument as a string or provide the "base_dir" option.');
93
            }
94
 
95
            $this->sourceMetadata = $this->prepareTarget($options['base_dir']);
96
            $this->source = $source;
97
        } else {
98
            throw new \InvalidArgumentException('source must be the path to a '
99
                . 'directory or an iterator that yields file names.');
100
        }
101
 
102
        // Validate schemes.
103
        if ($this->sourceMetadata['scheme'] === $this->destination['scheme']) {
104
            throw new \InvalidArgumentException("You cannot copy from"
105
                . " {$this->sourceMetadata['scheme']} to"
106
                . " {$this->destination['scheme']}."
107
            );
108
        }
109
 
110
        // Handle multipart-related options.
111
        $this->concurrency = isset($options['concurrency'])
112
            ? $options['concurrency']
113
            : MultipartUploader::DEFAULT_CONCURRENCY;
114
        $this->mupThreshold = isset($options['mup_threshold'])
115
            ? $options['mup_threshold']
116
            : 16777216;
117
        if ($this->mupThreshold < MultipartUploader::PART_MIN_SIZE) {
118
            throw new \InvalidArgumentException('mup_threshold must be >= 5MB');
119
        }
120
 
121
        // Handle "before" callback option.
122
        if (isset($options['before'])) {
123
            $this->before = $options['before'];
124
            if (!is_callable($this->before)) {
125
                throw new \InvalidArgumentException('before must be a callable.');
126
            }
127
        }
128
 
129
        // Handle "debug" option.
130
        if (isset($options['debug'])) {
131
            if ($options['debug'] === true) {
132
                $options['debug'] = fopen('php://output', 'w');
133
            }
134
            if (is_resource($options['debug'])) {
135
                $this->addDebugToBefore($options['debug']);
136
            }
137
        }
138
 
139
        // Handle "add_content_md5" option.
140
        $this->addContentMD5 = isset($options['add_content_md5'])
141
            && $options['add_content_md5'] === true;
142
    }
143
 
144
    /**
145
     * Transfers the files.
146
     *
147
     * @return PromiseInterface
148
     */
149
    public function promise()
150
    {
151
        // If the promise has been created, just return it.
152
        if (!$this->promise) {
153
            // Create an upload/download promise for the transfer.
154
            $this->promise = $this->sourceMetadata['scheme'] === 'file'
155
                ? $this->createUploadPromise()
156
                : $this->createDownloadPromise();
157
        }
158
 
159
        return $this->promise;
160
    }
161
 
162
    /**
163
     * Transfers the files synchronously.
164
     */
165
    public function transfer()
166
    {
167
        $this->promise()->wait();
168
    }
169
 
170
    private function prepareTarget($targetPath)
171
    {
172
        $target = [
173
            'path'   => $this->normalizePath($targetPath),
174
            'scheme' => $this->determineScheme($targetPath),
175
        ];
176
 
177
        if ($target['scheme'] !== 's3' && $target['scheme'] !== 'file') {
178
            throw new \InvalidArgumentException('Scheme must be "s3" or "file".');
179
        }
180
 
181
        return $target;
182
    }
183
 
184
    /**
185
     * Creates an array that contains Bucket and Key by parsing the filename.
186
     *
187
     * @param string $path Path to parse.
188
     *
189
     * @return array
190
     */
191
    private function getS3Args($path)
192
    {
193
        $parts = explode('/', str_replace('s3://', '', $path), 2);
194
        $args = ['Bucket' => $parts[0]];
195
        if (isset($parts[1])) {
196
            $args['Key'] = $parts[1];
197
        }
198
 
199
        return $args;
200
    }
201
 
202
    /**
203
     * Parses the scheme from a filename.
204
     *
205
     * @param string $path Path to parse.
206
     *
207
     * @return string
208
     */
209
    private function determineScheme($path)
210
    {
211
        return !strpos($path, '://') ? 'file' : explode('://', $path)[0];
212
    }
213
 
214
    /**
215
     * Normalize a path so that it has UNIX-style directory separators and no trailing /
216
     *
217
     * @param string $path
218
     *
219
     * @return string
220
     */
221
    private function normalizePath($path)
222
    {
223
        return rtrim(str_replace('\\', '/', $path), '/');
224
    }
225
 
226
    private function resolvesOutsideTargetDirectory($sink, $objectKey)
227
    {
228
        $resolved = [];
229
        $sections = explode('/', $sink);
230
        $targetSectionsLength = count(explode('/', $objectKey));
231
        $targetSections = array_slice($sections, -($targetSectionsLength + 1));
232
        $targetDirectory = $targetSections[0];
233
 
234
        foreach ($targetSections as $section) {
235
            if ($section === '.' || $section === '') {
236
                continue;
237
            }
238
            if ($section === '..') {
239
                array_pop($resolved);
240
                if (empty($resolved) || $resolved[0] !== $targetDirectory) {
241
                    return true;
242
                }
243
            } else {
244
                $resolved []= $section;
245
            }
246
        }
247
        return false;
248
    }
249
 
250
    private function createDownloadPromise()
251
    {
252
        $parts = $this->getS3Args($this->sourceMetadata['path']);
253
        $prefix = "s3://{$parts['Bucket']}/"
254
            . (isset($parts['Key']) ? $parts['Key'] . '/' : '');
255
 
256
        $commands = [];
257
        foreach ($this->getDownloadsIterator() as $object) {
258
            // Prepare the sink.
259
            $objectKey = preg_replace('/^' . preg_quote($prefix, '/') . '/', '', $object);
260
            $sink = $this->destination['path'] . '/' . $objectKey;
261
 
262
            $command = $this->client->getCommand(
263
                'GetObject',
264
                $this->getS3Args($object) + ['@http'  => ['sink'  => $sink]]
265
            );
266
 
267
            if ($this->resolvesOutsideTargetDirectory($sink, $objectKey)) {
268
                throw new AwsException(
269
                    'Cannot download key ' . $objectKey
270
                    . ', its relative path resolves outside the'
271
                    . ' parent directory', $command);
272
            }
273
 
274
            // Create the directory if needed.
275
            $dir = dirname($sink);
276
            if (!is_dir($dir) && !mkdir($dir, 0777, true)) {
277
                throw new \RuntimeException("Could not create dir: {$dir}");
278
            }
279
 
280
            // Create the command.
281
            $commands []= $command;
282
        }
283
 
284
        // Create a GetObject command pool and return the promise.
285
        return (new Aws\CommandPool($this->client, $commands, [
286
            'concurrency' => $this->concurrency,
287
            'before'      => $this->before,
288
            'rejected'    => function ($reason, $idx, Promise\PromiseInterface $p) {
289
                $p->reject($reason);
290
            }
291
        ]))->promise();
292
    }
293
 
294
    private function createUploadPromise()
295
    {
296
        // Map each file into a promise that performs the actual transfer.
297
        $files = \Aws\map($this->getUploadsIterator(), function ($file) {
298
            return (filesize($file) >= $this->mupThreshold)
299
                ? $this->uploadMultipart($file)
300
                : $this->upload($file);
301
        });
302
 
303
        // Create an EachPromise, that will concurrently handle the upload
304
        // operations' yielded promises from the iterator.
305
        return Promise\Each::ofLimitAll($files, $this->concurrency);
306
    }
307
 
308
    /** @return Iterator */
309
    private function getUploadsIterator()
310
    {
311
        if (is_string($this->source)) {
312
            return Aws\filter(
313
                Aws\recursive_dir_iterator($this->sourceMetadata['path']),
314
                function ($file) { return !is_dir($file); }
315
            );
316
        }
317
 
318
        return $this->source;
319
    }
320
 
321
    /** @return Iterator */
322
    private function getDownloadsIterator()
323
    {
324
        if (is_string($this->source)) {
325
            $listArgs = $this->getS3Args($this->sourceMetadata['path']);
326
            if (isset($listArgs['Key'])) {
327
                $listArgs['Prefix'] = $listArgs['Key'] . '/';
328
                unset($listArgs['Key']);
329
            }
330
 
331
            $files = $this->client
332
                ->getPaginator('ListObjects', $listArgs)
333
                ->search('Contents[].Key');
334
            $files = Aws\map($files, function ($key) use ($listArgs) {
335
                return "s3://{$listArgs['Bucket']}/$key";
336
            });
337
            return Aws\filter($files, function ($key) {
338
                return substr($key, -1, 1) !== '/';
339
            });
340
        }
341
 
342
        return $this->source;
343
    }
344
 
345
    private function upload($filename)
346
    {
347
        $args = $this->s3Args;
348
        $args['SourceFile'] = $filename;
349
        $args['Key'] = $this->createS3Key($filename);
350
        $args['AddContentMD5'] = $this->addContentMD5;
351
        $command = $this->client->getCommand('PutObject', $args);
352
        $this->before and call_user_func($this->before, $command);
353
 
354
        return $this->client->executeAsync($command);
355
    }
356
 
357
    private function uploadMultipart($filename)
358
    {
359
        $args = $this->s3Args;
360
        $args['Key'] = $this->createS3Key($filename);
361
        $filename = $filename instanceof \SplFileInfo ? $filename->getPathname() : $filename;
362
 
363
        return (new MultipartUploader($this->client, $filename, [
364
            'bucket'          => $args['Bucket'],
365
            'key'             => $args['Key'],
366
            'before_initiate' => $this->before,
367
            'before_upload'   => $this->before,
368
            'before_complete' => $this->before,
369
            'concurrency'     => $this->concurrency,
370
            'add_content_md5' => $this->addContentMD5
371
        ]))->promise();
372
    }
373
 
374
    private function createS3Key($filename)
375
    {
376
        $filename = $this->normalizePath($filename);
377
        $relative_file_path = ltrim(
378
            preg_replace('#^' . preg_quote($this->sourceMetadata['path']) . '#', '', $filename),
379
            '/\\'
380
        );
381
 
382
        if (isset($this->s3Args['Key'])) {
383
            return rtrim($this->s3Args['Key'], '/').'/'.$relative_file_path;
384
        }
385
 
386
        return $relative_file_path;
387
    }
388
 
389
    private function addDebugToBefore($debug)
390
    {
391
        $before = $this->before;
392
        $sourcePath = $this->sourceMetadata['path'];
393
        $s3Args = $this->s3Args;
394
 
395
        $this->before = static function (
396
            CommandInterface $command
397
        ) use ($before, $debug, $sourcePath, $s3Args) {
398
            // Call the composed before function.
399
            $before and $before($command);
400
 
401
            // Determine the source and dest values based on operation.
402
            switch ($operation = $command->getName()) {
403
                case 'GetObject':
404
                    $source = "s3://{$command['Bucket']}/{$command['Key']}";
405
                    $dest = $command['@http']['sink'];
406
                    break;
407
                case 'PutObject':
408
                    $source = $command['SourceFile'];
409
                    $dest = "s3://{$command['Bucket']}/{$command['Key']}";
410
                    break;
411
                case 'UploadPart':
412
                    $part = $command['PartNumber'];
413
                case 'CreateMultipartUpload':
414
                case 'CompleteMultipartUpload':
415
                    $sourceKey = $command['Key'];
416
                    if (isset($s3Args['Key']) && strpos($sourceKey, $s3Args['Key']) === 0) {
417
                        $sourceKey = substr($sourceKey, strlen($s3Args['Key']) + 1);
418
                    }
419
                    $source = "{$sourcePath}/{$sourceKey}";
420
                    $dest = "s3://{$command['Bucket']}/{$command['Key']}";
421
                    break;
422
                default:
423
                    throw new \UnexpectedValueException(
424
                        "Transfer encountered an unexpected operation: {$operation}."
425
                    );
426
            }
427
 
428
            // Print the debugging message.
429
            $context = sprintf('%s -> %s (%s)', $source, $dest, $operation);
430
            if (isset($part)) {
431
                $context .= " : Part={$part}";
432
            }
433
            fwrite($debug, "Transferring {$context}\n");
434
        };
435
    }
436
}