1 |
efrain |
1 |
<?php
|
|
|
2 |
namespace Aws\S3;
|
|
|
3 |
|
|
|
4 |
use Aws;
|
|
|
5 |
use Aws\CommandInterface;
|
|
|
6 |
use Aws\Exception\AwsException;
|
|
|
7 |
use GuzzleHttp\Promise;
|
|
|
8 |
use GuzzleHttp\Promise\PromiseInterface;
|
|
|
9 |
use GuzzleHttp\Promise\PromisorInterface;
|
|
|
10 |
use Iterator;
|
|
|
11 |
|
|
|
12 |
/**
|
|
|
13 |
* Transfers files from the local filesystem to S3 or from S3 to the local
|
|
|
14 |
* filesystem.
|
|
|
15 |
*
|
|
|
16 |
* This class does not support copying from the local filesystem to somewhere
|
|
|
17 |
* else on the local filesystem or from one S3 bucket to another.
|
|
|
18 |
*/
|
|
|
19 |
class Transfer implements PromisorInterface
|
|
|
20 |
{
|
|
|
21 |
private $client;
|
|
|
22 |
private $promise;
|
|
|
23 |
private $source;
|
|
|
24 |
private $sourceMetadata;
|
|
|
25 |
private $destination;
|
|
|
26 |
private $concurrency;
|
|
|
27 |
private $mupThreshold;
|
|
|
28 |
private $before;
|
|
|
29 |
private $s3Args = [];
|
|
|
30 |
private $addContentMD5;
|
|
|
31 |
|
|
|
32 |
/**
|
|
|
33 |
* When providing the $source argument, you may provide a string referencing
|
|
|
34 |
* the path to a directory on disk to upload, an s3 scheme URI that contains
|
|
|
35 |
* the bucket and key (e.g., "s3://bucket/key"), or an \Iterator object
|
|
|
36 |
* that yields strings containing filenames that are the path to a file on
|
|
|
37 |
* disk or an s3 scheme URI. The bucket portion of the s3 URI may be an S3
|
|
|
38 |
* access point ARN. The "/key" portion of an s3 URI is optional.
|
|
|
39 |
*
|
|
|
40 |
* When providing an iterator for the $source argument, you must also
|
|
|
41 |
* provide a 'base_dir' key value pair in the $options argument.
|
|
|
42 |
*
|
|
|
43 |
* The $dest argument can be the path to a directory on disk or an s3
|
|
|
44 |
* scheme URI (e.g., "s3://bucket/key").
|
|
|
45 |
*
|
|
|
46 |
* The options array can contain the following key value pairs:
|
|
|
47 |
*
|
|
|
48 |
* - base_dir: (string) Base directory of the source, if $source is an
|
|
|
49 |
* iterator. If the $source option is not an array, then this option is
|
|
|
50 |
* ignored.
|
|
|
51 |
* - before: (callable) A callback to invoke before each transfer. The
|
|
|
52 |
* callback accepts a single argument: Aws\CommandInterface $command.
|
|
|
53 |
* The provided command will be either a GetObject, PutObject,
|
|
|
54 |
* InitiateMultipartUpload, or UploadPart command.
|
|
|
55 |
* - mup_threshold: (int) Size in bytes in which a multipart upload should
|
|
|
56 |
* be used instead of PutObject. Defaults to 20971520 (20 MB).
|
|
|
57 |
* - concurrency: (int, default=5) Number of files to upload concurrently.
|
|
|
58 |
* The ideal concurrency value will vary based on the number of files
|
|
|
59 |
* being uploaded and the average size of each file. Generally speaking,
|
|
|
60 |
* smaller files benefit from a higher concurrency while larger files
|
|
|
61 |
* will not.
|
|
|
62 |
* - debug: (bool) Set to true to print out debug information for
|
|
|
63 |
* transfers. Set to an fopen() resource to write to a specific stream
|
|
|
64 |
* rather than writing to STDOUT.
|
|
|
65 |
*
|
|
|
66 |
* @param S3ClientInterface $client Client used for transfers.
|
|
|
67 |
* @param string|Iterator $source Where the files are transferred from.
|
|
|
68 |
* @param string $dest Where the files are transferred to.
|
|
|
69 |
* @param array $options Hash of options.
|
|
|
70 |
*/
|
|
|
71 |
public function __construct(
|
|
|
72 |
S3ClientInterface $client,
|
|
|
73 |
$source,
|
|
|
74 |
$dest,
|
|
|
75 |
array $options = []
|
|
|
76 |
) {
|
|
|
77 |
$this->client = $client;
|
|
|
78 |
|
|
|
79 |
// Prepare the destination.
|
|
|
80 |
$this->destination = $this->prepareTarget($dest);
|
|
|
81 |
if ($this->destination['scheme'] === 's3') {
|
|
|
82 |
$this->s3Args = $this->getS3Args($this->destination['path']);
|
|
|
83 |
}
|
|
|
84 |
|
|
|
85 |
// Prepare the source.
|
|
|
86 |
if (is_string($source)) {
|
|
|
87 |
$this->sourceMetadata = $this->prepareTarget($source);
|
|
|
88 |
$this->source = $source;
|
|
|
89 |
} elseif ($source instanceof Iterator) {
|
|
|
90 |
if (empty($options['base_dir'])) {
|
|
|
91 |
throw new \InvalidArgumentException('You must provide the source'
|
|
|
92 |
. ' argument as a string or provide the "base_dir" option.');
|
|
|
93 |
}
|
|
|
94 |
|
|
|
95 |
$this->sourceMetadata = $this->prepareTarget($options['base_dir']);
|
|
|
96 |
$this->source = $source;
|
|
|
97 |
} else {
|
|
|
98 |
throw new \InvalidArgumentException('source must be the path to a '
|
|
|
99 |
. 'directory or an iterator that yields file names.');
|
|
|
100 |
}
|
|
|
101 |
|
|
|
102 |
// Validate schemes.
|
|
|
103 |
if ($this->sourceMetadata['scheme'] === $this->destination['scheme']) {
|
|
|
104 |
throw new \InvalidArgumentException("You cannot copy from"
|
|
|
105 |
. " {$this->sourceMetadata['scheme']} to"
|
|
|
106 |
. " {$this->destination['scheme']}."
|
|
|
107 |
);
|
|
|
108 |
}
|
|
|
109 |
|
|
|
110 |
// Handle multipart-related options.
|
|
|
111 |
$this->concurrency = isset($options['concurrency'])
|
|
|
112 |
? $options['concurrency']
|
|
|
113 |
: MultipartUploader::DEFAULT_CONCURRENCY;
|
|
|
114 |
$this->mupThreshold = isset($options['mup_threshold'])
|
|
|
115 |
? $options['mup_threshold']
|
|
|
116 |
: 16777216;
|
|
|
117 |
if ($this->mupThreshold < MultipartUploader::PART_MIN_SIZE) {
|
|
|
118 |
throw new \InvalidArgumentException('mup_threshold must be >= 5MB');
|
|
|
119 |
}
|
|
|
120 |
|
|
|
121 |
// Handle "before" callback option.
|
|
|
122 |
if (isset($options['before'])) {
|
|
|
123 |
$this->before = $options['before'];
|
|
|
124 |
if (!is_callable($this->before)) {
|
|
|
125 |
throw new \InvalidArgumentException('before must be a callable.');
|
|
|
126 |
}
|
|
|
127 |
}
|
|
|
128 |
|
|
|
129 |
// Handle "debug" option.
|
|
|
130 |
if (isset($options['debug'])) {
|
|
|
131 |
if ($options['debug'] === true) {
|
|
|
132 |
$options['debug'] = fopen('php://output', 'w');
|
|
|
133 |
}
|
|
|
134 |
if (is_resource($options['debug'])) {
|
|
|
135 |
$this->addDebugToBefore($options['debug']);
|
|
|
136 |
}
|
|
|
137 |
}
|
|
|
138 |
|
|
|
139 |
// Handle "add_content_md5" option.
|
|
|
140 |
$this->addContentMD5 = isset($options['add_content_md5'])
|
|
|
141 |
&& $options['add_content_md5'] === true;
|
|
|
142 |
}
|
|
|
143 |
|
|
|
144 |
/**
|
|
|
145 |
* Transfers the files.
|
|
|
146 |
*
|
|
|
147 |
* @return PromiseInterface
|
|
|
148 |
*/
|
|
|
149 |
public function promise()
|
|
|
150 |
{
|
|
|
151 |
// If the promise has been created, just return it.
|
|
|
152 |
if (!$this->promise) {
|
|
|
153 |
// Create an upload/download promise for the transfer.
|
|
|
154 |
$this->promise = $this->sourceMetadata['scheme'] === 'file'
|
|
|
155 |
? $this->createUploadPromise()
|
|
|
156 |
: $this->createDownloadPromise();
|
|
|
157 |
}
|
|
|
158 |
|
|
|
159 |
return $this->promise;
|
|
|
160 |
}
|
|
|
161 |
|
|
|
162 |
/**
|
|
|
163 |
* Transfers the files synchronously.
|
|
|
164 |
*/
|
|
|
165 |
public function transfer()
|
|
|
166 |
{
|
|
|
167 |
$this->promise()->wait();
|
|
|
168 |
}
|
|
|
169 |
|
|
|
170 |
private function prepareTarget($targetPath)
|
|
|
171 |
{
|
|
|
172 |
$target = [
|
|
|
173 |
'path' => $this->normalizePath($targetPath),
|
|
|
174 |
'scheme' => $this->determineScheme($targetPath),
|
|
|
175 |
];
|
|
|
176 |
|
|
|
177 |
if ($target['scheme'] !== 's3' && $target['scheme'] !== 'file') {
|
|
|
178 |
throw new \InvalidArgumentException('Scheme must be "s3" or "file".');
|
|
|
179 |
}
|
|
|
180 |
|
|
|
181 |
return $target;
|
|
|
182 |
}
|
|
|
183 |
|
|
|
184 |
/**
|
|
|
185 |
* Creates an array that contains Bucket and Key by parsing the filename.
|
|
|
186 |
*
|
|
|
187 |
* @param string $path Path to parse.
|
|
|
188 |
*
|
|
|
189 |
* @return array
|
|
|
190 |
*/
|
|
|
191 |
private function getS3Args($path)
|
|
|
192 |
{
|
|
|
193 |
$parts = explode('/', str_replace('s3://', '', $path), 2);
|
|
|
194 |
$args = ['Bucket' => $parts[0]];
|
|
|
195 |
if (isset($parts[1])) {
|
|
|
196 |
$args['Key'] = $parts[1];
|
|
|
197 |
}
|
|
|
198 |
|
|
|
199 |
return $args;
|
|
|
200 |
}
|
|
|
201 |
|
|
|
202 |
/**
|
|
|
203 |
* Parses the scheme from a filename.
|
|
|
204 |
*
|
|
|
205 |
* @param string $path Path to parse.
|
|
|
206 |
*
|
|
|
207 |
* @return string
|
|
|
208 |
*/
|
|
|
209 |
private function determineScheme($path)
|
|
|
210 |
{
|
|
|
211 |
return !strpos($path, '://') ? 'file' : explode('://', $path)[0];
|
|
|
212 |
}
|
|
|
213 |
|
|
|
214 |
/**
|
|
|
215 |
* Normalize a path so that it has UNIX-style directory separators and no trailing /
|
|
|
216 |
*
|
|
|
217 |
* @param string $path
|
|
|
218 |
*
|
|
|
219 |
* @return string
|
|
|
220 |
*/
|
|
|
221 |
private function normalizePath($path)
|
|
|
222 |
{
|
|
|
223 |
return rtrim(str_replace('\\', '/', $path), '/');
|
|
|
224 |
}
|
|
|
225 |
|
|
|
226 |
private function resolvesOutsideTargetDirectory($sink, $objectKey)
|
|
|
227 |
{
|
|
|
228 |
$resolved = [];
|
|
|
229 |
$sections = explode('/', $sink);
|
|
|
230 |
$targetSectionsLength = count(explode('/', $objectKey));
|
|
|
231 |
$targetSections = array_slice($sections, -($targetSectionsLength + 1));
|
|
|
232 |
$targetDirectory = $targetSections[0];
|
|
|
233 |
|
|
|
234 |
foreach ($targetSections as $section) {
|
|
|
235 |
if ($section === '.' || $section === '') {
|
|
|
236 |
continue;
|
|
|
237 |
}
|
|
|
238 |
if ($section === '..') {
|
|
|
239 |
array_pop($resolved);
|
|
|
240 |
if (empty($resolved) || $resolved[0] !== $targetDirectory) {
|
|
|
241 |
return true;
|
|
|
242 |
}
|
|
|
243 |
} else {
|
|
|
244 |
$resolved []= $section;
|
|
|
245 |
}
|
|
|
246 |
}
|
|
|
247 |
return false;
|
|
|
248 |
}
|
|
|
249 |
|
|
|
250 |
private function createDownloadPromise()
|
|
|
251 |
{
|
|
|
252 |
$parts = $this->getS3Args($this->sourceMetadata['path']);
|
|
|
253 |
$prefix = "s3://{$parts['Bucket']}/"
|
|
|
254 |
. (isset($parts['Key']) ? $parts['Key'] . '/' : '');
|
|
|
255 |
|
|
|
256 |
$commands = [];
|
|
|
257 |
foreach ($this->getDownloadsIterator() as $object) {
|
|
|
258 |
// Prepare the sink.
|
|
|
259 |
$objectKey = preg_replace('/^' . preg_quote($prefix, '/') . '/', '', $object);
|
|
|
260 |
$sink = $this->destination['path'] . '/' . $objectKey;
|
|
|
261 |
|
|
|
262 |
$command = $this->client->getCommand(
|
|
|
263 |
'GetObject',
|
|
|
264 |
$this->getS3Args($object) + ['@http' => ['sink' => $sink]]
|
|
|
265 |
);
|
|
|
266 |
|
|
|
267 |
if ($this->resolvesOutsideTargetDirectory($sink, $objectKey)) {
|
|
|
268 |
throw new AwsException(
|
|
|
269 |
'Cannot download key ' . $objectKey
|
|
|
270 |
. ', its relative path resolves outside the'
|
|
|
271 |
. ' parent directory', $command);
|
|
|
272 |
}
|
|
|
273 |
|
|
|
274 |
// Create the directory if needed.
|
|
|
275 |
$dir = dirname($sink);
|
|
|
276 |
if (!is_dir($dir) && !mkdir($dir, 0777, true)) {
|
|
|
277 |
throw new \RuntimeException("Could not create dir: {$dir}");
|
|
|
278 |
}
|
|
|
279 |
|
|
|
280 |
// Create the command.
|
|
|
281 |
$commands []= $command;
|
|
|
282 |
}
|
|
|
283 |
|
|
|
284 |
// Create a GetObject command pool and return the promise.
|
|
|
285 |
return (new Aws\CommandPool($this->client, $commands, [
|
|
|
286 |
'concurrency' => $this->concurrency,
|
|
|
287 |
'before' => $this->before,
|
|
|
288 |
'rejected' => function ($reason, $idx, Promise\PromiseInterface $p) {
|
|
|
289 |
$p->reject($reason);
|
|
|
290 |
}
|
|
|
291 |
]))->promise();
|
|
|
292 |
}
|
|
|
293 |
|
|
|
294 |
private function createUploadPromise()
|
|
|
295 |
{
|
|
|
296 |
// Map each file into a promise that performs the actual transfer.
|
|
|
297 |
$files = \Aws\map($this->getUploadsIterator(), function ($file) {
|
|
|
298 |
return (filesize($file) >= $this->mupThreshold)
|
|
|
299 |
? $this->uploadMultipart($file)
|
|
|
300 |
: $this->upload($file);
|
|
|
301 |
});
|
|
|
302 |
|
|
|
303 |
// Create an EachPromise, that will concurrently handle the upload
|
|
|
304 |
// operations' yielded promises from the iterator.
|
|
|
305 |
return Promise\Each::ofLimitAll($files, $this->concurrency);
|
|
|
306 |
}
|
|
|
307 |
|
|
|
308 |
/** @return Iterator */
|
|
|
309 |
private function getUploadsIterator()
|
|
|
310 |
{
|
|
|
311 |
if (is_string($this->source)) {
|
|
|
312 |
return Aws\filter(
|
|
|
313 |
Aws\recursive_dir_iterator($this->sourceMetadata['path']),
|
|
|
314 |
function ($file) { return !is_dir($file); }
|
|
|
315 |
);
|
|
|
316 |
}
|
|
|
317 |
|
|
|
318 |
return $this->source;
|
|
|
319 |
}
|
|
|
320 |
|
|
|
321 |
/** @return Iterator */
|
|
|
322 |
private function getDownloadsIterator()
|
|
|
323 |
{
|
|
|
324 |
if (is_string($this->source)) {
|
|
|
325 |
$listArgs = $this->getS3Args($this->sourceMetadata['path']);
|
|
|
326 |
if (isset($listArgs['Key'])) {
|
|
|
327 |
$listArgs['Prefix'] = $listArgs['Key'] . '/';
|
|
|
328 |
unset($listArgs['Key']);
|
|
|
329 |
}
|
|
|
330 |
|
|
|
331 |
$files = $this->client
|
|
|
332 |
->getPaginator('ListObjects', $listArgs)
|
|
|
333 |
->search('Contents[].Key');
|
|
|
334 |
$files = Aws\map($files, function ($key) use ($listArgs) {
|
|
|
335 |
return "s3://{$listArgs['Bucket']}/$key";
|
|
|
336 |
});
|
|
|
337 |
return Aws\filter($files, function ($key) {
|
|
|
338 |
return substr($key, -1, 1) !== '/';
|
|
|
339 |
});
|
|
|
340 |
}
|
|
|
341 |
|
|
|
342 |
return $this->source;
|
|
|
343 |
}
|
|
|
344 |
|
|
|
345 |
private function upload($filename)
|
|
|
346 |
{
|
|
|
347 |
$args = $this->s3Args;
|
|
|
348 |
$args['SourceFile'] = $filename;
|
|
|
349 |
$args['Key'] = $this->createS3Key($filename);
|
|
|
350 |
$args['AddContentMD5'] = $this->addContentMD5;
|
|
|
351 |
$command = $this->client->getCommand('PutObject', $args);
|
|
|
352 |
$this->before and call_user_func($this->before, $command);
|
|
|
353 |
|
|
|
354 |
return $this->client->executeAsync($command);
|
|
|
355 |
}
|
|
|
356 |
|
|
|
357 |
private function uploadMultipart($filename)
|
|
|
358 |
{
|
|
|
359 |
$args = $this->s3Args;
|
|
|
360 |
$args['Key'] = $this->createS3Key($filename);
|
|
|
361 |
$filename = $filename instanceof \SplFileInfo ? $filename->getPathname() : $filename;
|
|
|
362 |
|
|
|
363 |
return (new MultipartUploader($this->client, $filename, [
|
|
|
364 |
'bucket' => $args['Bucket'],
|
|
|
365 |
'key' => $args['Key'],
|
|
|
366 |
'before_initiate' => $this->before,
|
|
|
367 |
'before_upload' => $this->before,
|
|
|
368 |
'before_complete' => $this->before,
|
|
|
369 |
'concurrency' => $this->concurrency,
|
|
|
370 |
'add_content_md5' => $this->addContentMD5
|
|
|
371 |
]))->promise();
|
|
|
372 |
}
|
|
|
373 |
|
|
|
374 |
private function createS3Key($filename)
|
|
|
375 |
{
|
|
|
376 |
$filename = $this->normalizePath($filename);
|
|
|
377 |
$relative_file_path = ltrim(
|
|
|
378 |
preg_replace('#^' . preg_quote($this->sourceMetadata['path']) . '#', '', $filename),
|
|
|
379 |
'/\\'
|
|
|
380 |
);
|
|
|
381 |
|
|
|
382 |
if (isset($this->s3Args['Key'])) {
|
|
|
383 |
return rtrim($this->s3Args['Key'], '/').'/'.$relative_file_path;
|
|
|
384 |
}
|
|
|
385 |
|
|
|
386 |
return $relative_file_path;
|
|
|
387 |
}
|
|
|
388 |
|
|
|
389 |
private function addDebugToBefore($debug)
|
|
|
390 |
{
|
|
|
391 |
$before = $this->before;
|
|
|
392 |
$sourcePath = $this->sourceMetadata['path'];
|
|
|
393 |
$s3Args = $this->s3Args;
|
|
|
394 |
|
|
|
395 |
$this->before = static function (
|
|
|
396 |
CommandInterface $command
|
|
|
397 |
) use ($before, $debug, $sourcePath, $s3Args) {
|
|
|
398 |
// Call the composed before function.
|
|
|
399 |
$before and $before($command);
|
|
|
400 |
|
|
|
401 |
// Determine the source and dest values based on operation.
|
|
|
402 |
switch ($operation = $command->getName()) {
|
|
|
403 |
case 'GetObject':
|
|
|
404 |
$source = "s3://{$command['Bucket']}/{$command['Key']}";
|
|
|
405 |
$dest = $command['@http']['sink'];
|
|
|
406 |
break;
|
|
|
407 |
case 'PutObject':
|
|
|
408 |
$source = $command['SourceFile'];
|
|
|
409 |
$dest = "s3://{$command['Bucket']}/{$command['Key']}";
|
|
|
410 |
break;
|
|
|
411 |
case 'UploadPart':
|
|
|
412 |
$part = $command['PartNumber'];
|
|
|
413 |
case 'CreateMultipartUpload':
|
|
|
414 |
case 'CompleteMultipartUpload':
|
|
|
415 |
$sourceKey = $command['Key'];
|
|
|
416 |
if (isset($s3Args['Key']) && strpos($sourceKey, $s3Args['Key']) === 0) {
|
|
|
417 |
$sourceKey = substr($sourceKey, strlen($s3Args['Key']) + 1);
|
|
|
418 |
}
|
|
|
419 |
$source = "{$sourcePath}/{$sourceKey}";
|
|
|
420 |
$dest = "s3://{$command['Bucket']}/{$command['Key']}";
|
|
|
421 |
break;
|
|
|
422 |
default:
|
|
|
423 |
throw new \UnexpectedValueException(
|
|
|
424 |
"Transfer encountered an unexpected operation: {$operation}."
|
|
|
425 |
);
|
|
|
426 |
}
|
|
|
427 |
|
|
|
428 |
// Print the debugging message.
|
|
|
429 |
$context = sprintf('%s -> %s (%s)', $source, $dest, $operation);
|
|
|
430 |
if (isset($part)) {
|
|
|
431 |
$context .= " : Part={$part}";
|
|
|
432 |
}
|
|
|
433 |
fwrite($debug, "Transferring {$context}\n");
|
|
|
434 |
};
|
|
|
435 |
}
|
|
|
436 |
}
|