Proyectos de Subversion Moodle

Rev

| Ultima modificación | Ver Log |

Rev Autor Línea Nro. Línea
1 efrain 1
<?php
2
namespace Aws;
3
 
4
use GuzzleHttp\Promise\PromiseInterface;
5
use GuzzleHttp\Promise\PromisorInterface;
6
use GuzzleHttp\Promise\EachPromise;
7
 
8
/**
9
 * Sends and iterator of commands concurrently using a capped pool size.
10
 *
11
 * The pool will read command objects from an iterator until it is cancelled or
12
 * until the iterator is consumed.
13
 */
14
class CommandPool implements PromisorInterface
15
{
16
    /** @var EachPromise */
17
    private $each;
18
 
19
    /**
20
     * The CommandPool constructor accepts a hash of configuration options:
21
     *
22
     * - concurrency: (callable|int) Maximum number of commands to execute
23
     *   concurrently. Provide a function to resize the pool dynamically. The
24
     *   function will be provided the current number of pending requests and
25
     *   is expected to return an integer representing the new pool size limit.
26
     * - before: (callable) function to invoke before sending each command. The
27
     *   before function accepts the command and the key of the iterator of the
28
     *   command. You can mutate the command as needed in the before function
29
     *   before sending the command.
30
     * - fulfilled: (callable) Function to invoke when a promise is fulfilled.
31
     *   The function is provided the result object, id of the iterator that the
32
     *   result came from, and the aggregate promise that can be resolved/rejected
33
     *   if you need to short-circuit the pool.
34
     * - rejected: (callable) Function to invoke when a promise is rejected.
35
     *   The function is provided an AwsException object, id of the iterator that
36
     *   the exception came from, and the aggregate promise that can be
37
     *   resolved/rejected if you need to short-circuit the pool.
38
     * - preserve_iterator_keys: (bool) Retain the iterator key when generating
39
     *   the commands.
40
     *
41
     * @param AwsClientInterface $client   Client used to execute commands.
42
     * @param array|\Iterator    $commands Iterable that yields commands.
43
     * @param array              $config   Associative array of options.
44
     */
45
    public function __construct(
46
        AwsClientInterface $client,
47
        $commands,
48
        array $config = []
49
    ) {
50
        if (!isset($config['concurrency'])) {
51
            $config['concurrency'] = 25;
52
        }
53
 
54
        $before = $this->getBefore($config);
55
        $mapFn = function ($commands) use ($client, $before, $config) {
56
            foreach ($commands as $key => $command) {
57
                if (!($command instanceof CommandInterface)) {
58
                    throw new \InvalidArgumentException('Each value yielded by '
59
                        . 'the iterator must be an Aws\CommandInterface.');
60
                }
61
                if ($before) {
62
                    $before($command, $key);
63
                }
64
                if (!empty($config['preserve_iterator_keys'])) {
65
                    yield $key => $client->executeAsync($command);
66
                } else {
67
                    yield $client->executeAsync($command);
68
                }
69
            }
70
        };
71
 
72
        $this->each = new EachPromise($mapFn($commands), $config);
73
    }
74
 
75
    /**
76
     * @return PromiseInterface
77
     */
78
    public function promise()
79
    {
80
        return $this->each->promise();
81
    }
82
 
83
    /**
84
     * Executes a pool synchronously and aggregates the results of the pool
85
     * into an indexed array in the same order as the passed in array.
86
     *
87
     * @param AwsClientInterface $client   Client used to execute commands.
88
     * @param mixed              $commands Iterable that yields commands.
89
     * @param array              $config   Configuration options.
90
     *
91
     * @return array
92
     * @see \Aws\CommandPool::__construct for available configuration options.
93
     */
94
    public static function batch(
95
        AwsClientInterface $client,
96
        $commands,
97
        array $config = []
98
    ) {
99
        $results = [];
100
        self::cmpCallback($config, 'fulfilled', $results);
101
        self::cmpCallback($config, 'rejected', $results);
102
 
103
        return (new self($client, $commands, $config))
104
            ->promise()
105
            ->then(static function () use (&$results) {
106
                ksort($results);
107
                return $results;
108
            })
109
            ->wait();
110
    }
111
 
112
    /**
113
     * @return callable
114
     */
115
    private function getBefore(array $config)
116
    {
117
        if (!isset($config['before'])) {
118
            return null;
119
        }
120
 
121
        if (is_callable($config['before'])) {
122
            return $config['before'];
123
        }
124
 
125
        throw new \InvalidArgumentException('before must be callable');
126
    }
127
 
128
    /**
129
     * Adds an onFulfilled or onRejected callback that aggregates results into
130
     * an array. If a callback is already present, it is replaced with the
131
     * composed function.
132
     *
133
     * @param array $config
134
     * @param       $name
135
     * @param array $results
136
     */
137
    private static function cmpCallback(array &$config, $name, array &$results)
138
    {
139
        if (!isset($config[$name])) {
140
            $config[$name] = function ($v, $k) use (&$results) {
141
                $results[$k] = $v;
142
            };
143
        } else {
144
            $currentFn = $config[$name];
145
            $config[$name] = function ($v, $k) use (&$results, $currentFn) {
146
                $currentFn($v, $k);
147
                $results[$k] = $v;
148
            };
149
        }
150
    }
151
}