Proyectos de Subversion Moodle

Rev

Rev 1 | | Comparar con el anterior | Ultima modificación | Ver Log |

Rev Autor Línea Nro. Línea
1 efrain 1
<?php
2
 
1441 ariadna 3
declare(strict_types=1);
4
 
1 efrain 5
namespace GuzzleHttp\Promise;
6
 
7
/**
8
 * Represents a promise that iterates over many promises and invokes
9
 * side-effect functions in the process.
1441 ariadna 10
 *
11
 * @final
1 efrain 12
 */
13
class EachPromise implements PromisorInterface
14
{
15
    private $pending = [];
16
 
17
    private $nextPendingIndex = 0;
18
 
19
    /** @var \Iterator|null */
20
    private $iterable;
21
 
22
    /** @var callable|int|null */
23
    private $concurrency;
24
 
25
    /** @var callable|null */
26
    private $onFulfilled;
27
 
28
    /** @var callable|null */
29
    private $onRejected;
30
 
31
    /** @var Promise|null */
32
    private $aggregate;
33
 
34
    /** @var bool|null */
35
    private $mutex;
36
 
37
    /**
38
     * Configuration hash can include the following key value pairs:
39
     *
40
     * - fulfilled: (callable) Invoked when a promise fulfills. The function
41
     *   is invoked with three arguments: the fulfillment value, the index
42
     *   position from the iterable list of the promise, and the aggregate
43
     *   promise that manages all of the promises. The aggregate promise may
44
     *   be resolved from within the callback to short-circuit the promise.
45
     * - rejected: (callable) Invoked when a promise is rejected. The
46
     *   function is invoked with three arguments: the rejection reason, the
47
     *   index position from the iterable list of the promise, and the
48
     *   aggregate promise that manages all of the promises. The aggregate
49
     *   promise may be resolved from within the callback to short-circuit
50
     *   the promise.
51
     * - concurrency: (integer) Pass this configuration option to limit the
52
     *   allowed number of outstanding concurrently executing promises,
53
     *   creating a capped pool of promises. There is no limit by default.
54
     *
55
     * @param mixed $iterable Promises or values to iterate.
56
     * @param array $config   Configuration options
57
     */
58
    public function __construct($iterable, array $config = [])
59
    {
60
        $this->iterable = Create::iterFor($iterable);
61
 
62
        if (isset($config['concurrency'])) {
63
            $this->concurrency = $config['concurrency'];
64
        }
65
 
66
        if (isset($config['fulfilled'])) {
67
            $this->onFulfilled = $config['fulfilled'];
68
        }
69
 
70
        if (isset($config['rejected'])) {
71
            $this->onRejected = $config['rejected'];
72
        }
73
    }
74
 
75
    /** @psalm-suppress InvalidNullableReturnType */
1441 ariadna 76
    public function promise(): PromiseInterface
1 efrain 77
    {
78
        if ($this->aggregate) {
79
            return $this->aggregate;
80
        }
81
 
82
        try {
83
            $this->createPromise();
84
            /** @psalm-assert Promise $this->aggregate */
85
            $this->iterable->rewind();
86
            $this->refillPending();
87
        } catch (\Throwable $e) {
88
            $this->aggregate->reject($e);
89
        }
90
 
91
        /**
92
         * @psalm-suppress NullableReturnStatement
93
         */
94
        return $this->aggregate;
95
    }
96
 
1441 ariadna 97
    private function createPromise(): void
1 efrain 98
    {
99
        $this->mutex = false;
1441 ariadna 100
        $this->aggregate = new Promise(function (): void {
1 efrain 101
            if ($this->checkIfFinished()) {
102
                return;
103
            }
104
            reset($this->pending);
105
            // Consume a potentially fluctuating list of promises while
106
            // ensuring that indexes are maintained (precluding array_shift).
107
            while ($promise = current($this->pending)) {
108
                next($this->pending);
109
                $promise->wait();
110
                if (Is::settled($this->aggregate)) {
111
                    return;
112
                }
113
            }
114
        });
115
 
116
        // Clear the references when the promise is resolved.
1441 ariadna 117
        $clearFn = function (): void {
1 efrain 118
            $this->iterable = $this->concurrency = $this->pending = null;
119
            $this->onFulfilled = $this->onRejected = null;
120
            $this->nextPendingIndex = 0;
121
        };
122
 
123
        $this->aggregate->then($clearFn, $clearFn);
124
    }
125
 
1441 ariadna 126
    private function refillPending(): void
1 efrain 127
    {
128
        if (!$this->concurrency) {
129
            // Add all pending promises.
1441 ariadna 130
            while ($this->addPending() && $this->advanceIterator()) {
131
            }
132
 
1 efrain 133
            return;
134
        }
135
 
136
        // Add only up to N pending promises.
137
        $concurrency = is_callable($this->concurrency)
1441 ariadna 138
            ? ($this->concurrency)(count($this->pending))
1 efrain 139
            : $this->concurrency;
140
        $concurrency = max($concurrency - count($this->pending), 0);
141
        // Concurrency may be set to 0 to disallow new promises.
142
        if (!$concurrency) {
143
            return;
144
        }
145
        // Add the first pending promise.
146
        $this->addPending();
147
        // Note this is special handling for concurrency=1 so that we do
148
        // not advance the iterator after adding the first promise. This
149
        // helps work around issues with generators that might not have the
150
        // next value to yield until promise callbacks are called.
151
        while (--$concurrency
152
            && $this->advanceIterator()
1441 ariadna 153
            && $this->addPending()) {
154
        }
1 efrain 155
    }
156
 
1441 ariadna 157
    private function addPending(): bool
1 efrain 158
    {
159
        if (!$this->iterable || !$this->iterable->valid()) {
160
            return false;
161
        }
162
 
163
        $promise = Create::promiseFor($this->iterable->current());
164
        $key = $this->iterable->key();
165
 
166
        // Iterable keys may not be unique, so we use a counter to
167
        // guarantee uniqueness
168
        $idx = $this->nextPendingIndex++;
169
 
170
        $this->pending[$idx] = $promise->then(
1441 ariadna 171
            function ($value) use ($idx, $key): void {
1 efrain 172
                if ($this->onFulfilled) {
1441 ariadna 173
                    ($this->onFulfilled)(
1 efrain 174
                        $value,
175
                        $key,
176
                        $this->aggregate
177
                    );
178
                }
179
                $this->step($idx);
180
            },
1441 ariadna 181
            function ($reason) use ($idx, $key): void {
1 efrain 182
                if ($this->onRejected) {
1441 ariadna 183
                    ($this->onRejected)(
1 efrain 184
                        $reason,
185
                        $key,
186
                        $this->aggregate
187
                    );
188
                }
189
                $this->step($idx);
190
            }
191
        );
192
 
193
        return true;
194
    }
195
 
1441 ariadna 196
    private function advanceIterator(): bool
1 efrain 197
    {
198
        // Place a lock on the iterator so that we ensure to not recurse,
199
        // preventing fatal generator errors.
200
        if ($this->mutex) {
201
            return false;
202
        }
203
 
204
        $this->mutex = true;
205
 
206
        try {
207
            $this->iterable->next();
208
            $this->mutex = false;
1441 ariadna 209
 
1 efrain 210
            return true;
211
        } catch (\Throwable $e) {
212
            $this->aggregate->reject($e);
213
            $this->mutex = false;
1441 ariadna 214
 
1 efrain 215
            return false;
216
        }
217
    }
218
 
1441 ariadna 219
    private function step(int $idx): void
1 efrain 220
    {
221
        // If the promise was already resolved, then ignore this step.
222
        if (Is::settled($this->aggregate)) {
223
            return;
224
        }
225
 
226
        unset($this->pending[$idx]);
227
 
228
        // Only refill pending promises if we are not locked, preventing the
229
        // EachPromise to recursively invoke the provided iterator, which
230
        // cause a fatal error: "Cannot resume an already running generator"
231
        if ($this->advanceIterator() && !$this->checkIfFinished()) {
232
            // Add more pending promises if possible.
233
            $this->refillPending();
234
        }
235
    }
236
 
1441 ariadna 237
    private function checkIfFinished(): bool
1 efrain 238
    {
239
        if (!$this->pending && !$this->iterable->valid()) {
240
            // Resolve the promise if there's nothing left to do.
241
            $this->aggregate->resolve(null);
1441 ariadna 242
 
1 efrain 243
            return true;
244
        }
245
 
246
        return false;
247
    }
248
}