Proyectos de Subversion Moodle

Rev

| Ultima modificación | Ver Log |

Rev Autor Línea Nro. Línea
1 efrain 1
<?php
2
 
3
namespace GuzzleHttp\Promise;
4
 
5
/**
6
 * Represents a promise that iterates over many promises and invokes
7
 * side-effect functions in the process.
8
 */
9
class EachPromise implements PromisorInterface
10
{
11
    private $pending = [];
12
 
13
    private $nextPendingIndex = 0;
14
 
15
    /** @var \Iterator|null */
16
    private $iterable;
17
 
18
    /** @var callable|int|null */
19
    private $concurrency;
20
 
21
    /** @var callable|null */
22
    private $onFulfilled;
23
 
24
    /** @var callable|null */
25
    private $onRejected;
26
 
27
    /** @var Promise|null */
28
    private $aggregate;
29
 
30
    /** @var bool|null */
31
    private $mutex;
32
 
33
    /**
34
     * Configuration hash can include the following key value pairs:
35
     *
36
     * - fulfilled: (callable) Invoked when a promise fulfills. The function
37
     *   is invoked with three arguments: the fulfillment value, the index
38
     *   position from the iterable list of the promise, and the aggregate
39
     *   promise that manages all of the promises. The aggregate promise may
40
     *   be resolved from within the callback to short-circuit the promise.
41
     * - rejected: (callable) Invoked when a promise is rejected. The
42
     *   function is invoked with three arguments: the rejection reason, the
43
     *   index position from the iterable list of the promise, and the
44
     *   aggregate promise that manages all of the promises. The aggregate
45
     *   promise may be resolved from within the callback to short-circuit
46
     *   the promise.
47
     * - concurrency: (integer) Pass this configuration option to limit the
48
     *   allowed number of outstanding concurrently executing promises,
49
     *   creating a capped pool of promises. There is no limit by default.
50
     *
51
     * @param mixed $iterable Promises or values to iterate.
52
     * @param array $config   Configuration options
53
     */
54
    public function __construct($iterable, array $config = [])
55
    {
56
        $this->iterable = Create::iterFor($iterable);
57
 
58
        if (isset($config['concurrency'])) {
59
            $this->concurrency = $config['concurrency'];
60
        }
61
 
62
        if (isset($config['fulfilled'])) {
63
            $this->onFulfilled = $config['fulfilled'];
64
        }
65
 
66
        if (isset($config['rejected'])) {
67
            $this->onRejected = $config['rejected'];
68
        }
69
    }
70
 
71
    /** @psalm-suppress InvalidNullableReturnType */
72
    public function promise()
73
    {
74
        if ($this->aggregate) {
75
            return $this->aggregate;
76
        }
77
 
78
        try {
79
            $this->createPromise();
80
            /** @psalm-assert Promise $this->aggregate */
81
            $this->iterable->rewind();
82
            $this->refillPending();
83
        } catch (\Throwable $e) {
84
            $this->aggregate->reject($e);
85
        } catch (\Exception $e) {
86
            $this->aggregate->reject($e);
87
        }
88
 
89
        /**
90
         * @psalm-suppress NullableReturnStatement
91
         * @phpstan-ignore-next-line
92
         */
93
        return $this->aggregate;
94
    }
95
 
96
    private function createPromise()
97
    {
98
        $this->mutex = false;
99
        $this->aggregate = new Promise(function () {
100
            if ($this->checkIfFinished()) {
101
                return;
102
            }
103
            reset($this->pending);
104
            // Consume a potentially fluctuating list of promises while
105
            // ensuring that indexes are maintained (precluding array_shift).
106
            while ($promise = current($this->pending)) {
107
                next($this->pending);
108
                $promise->wait();
109
                if (Is::settled($this->aggregate)) {
110
                    return;
111
                }
112
            }
113
        });
114
 
115
        // Clear the references when the promise is resolved.
116
        $clearFn = function () {
117
            $this->iterable = $this->concurrency = $this->pending = null;
118
            $this->onFulfilled = $this->onRejected = null;
119
            $this->nextPendingIndex = 0;
120
        };
121
 
122
        $this->aggregate->then($clearFn, $clearFn);
123
    }
124
 
125
    private function refillPending()
126
    {
127
        if (!$this->concurrency) {
128
            // Add all pending promises.
129
            while ($this->addPending() && $this->advanceIterator());
130
            return;
131
        }
132
 
133
        // Add only up to N pending promises.
134
        $concurrency = is_callable($this->concurrency)
135
            ? call_user_func($this->concurrency, count($this->pending))
136
            : $this->concurrency;
137
        $concurrency = max($concurrency - count($this->pending), 0);
138
        // Concurrency may be set to 0 to disallow new promises.
139
        if (!$concurrency) {
140
            return;
141
        }
142
        // Add the first pending promise.
143
        $this->addPending();
144
        // Note this is special handling for concurrency=1 so that we do
145
        // not advance the iterator after adding the first promise. This
146
        // helps work around issues with generators that might not have the
147
        // next value to yield until promise callbacks are called.
148
        while (--$concurrency
149
            && $this->advanceIterator()
150
            && $this->addPending());
151
    }
152
 
153
    private function addPending()
154
    {
155
        if (!$this->iterable || !$this->iterable->valid()) {
156
            return false;
157
        }
158
 
159
        $promise = Create::promiseFor($this->iterable->current());
160
        $key = $this->iterable->key();
161
 
162
        // Iterable keys may not be unique, so we use a counter to
163
        // guarantee uniqueness
164
        $idx = $this->nextPendingIndex++;
165
 
166
        $this->pending[$idx] = $promise->then(
167
            function ($value) use ($idx, $key) {
168
                if ($this->onFulfilled) {
169
                    call_user_func(
170
                        $this->onFulfilled,
171
                        $value,
172
                        $key,
173
                        $this->aggregate
174
                    );
175
                }
176
                $this->step($idx);
177
            },
178
            function ($reason) use ($idx, $key) {
179
                if ($this->onRejected) {
180
                    call_user_func(
181
                        $this->onRejected,
182
                        $reason,
183
                        $key,
184
                        $this->aggregate
185
                    );
186
                }
187
                $this->step($idx);
188
            }
189
        );
190
 
191
        return true;
192
    }
193
 
194
    private function advanceIterator()
195
    {
196
        // Place a lock on the iterator so that we ensure to not recurse,
197
        // preventing fatal generator errors.
198
        if ($this->mutex) {
199
            return false;
200
        }
201
 
202
        $this->mutex = true;
203
 
204
        try {
205
            $this->iterable->next();
206
            $this->mutex = false;
207
            return true;
208
        } catch (\Throwable $e) {
209
            $this->aggregate->reject($e);
210
            $this->mutex = false;
211
            return false;
212
        } catch (\Exception $e) {
213
            $this->aggregate->reject($e);
214
            $this->mutex = false;
215
            return false;
216
        }
217
    }
218
 
219
    private function step($idx)
220
    {
221
        // If the promise was already resolved, then ignore this step.
222
        if (Is::settled($this->aggregate)) {
223
            return;
224
        }
225
 
226
        unset($this->pending[$idx]);
227
 
228
        // Only refill pending promises if we are not locked, preventing the
229
        // EachPromise to recursively invoke the provided iterator, which
230
        // cause a fatal error: "Cannot resume an already running generator"
231
        if ($this->advanceIterator() && !$this->checkIfFinished()) {
232
            // Add more pending promises if possible.
233
            $this->refillPending();
234
        }
235
    }
236
 
237
    private function checkIfFinished()
238
    {
239
        if (!$this->pending && !$this->iterable->valid()) {
240
            // Resolve the promise if there's nothing left to do.
241
            $this->aggregate->resolve(null);
242
            return true;
243
        }
244
 
245
        return false;
246
    }
247
}