Subversion Repositories php-qbpwcf

Rev

Details | Last modification | View Log | RSS feed

Rev Author Line No. Line
3 liveuser 1
<?php
2
 
3
namespace React\Socket;
4
 
5
use React\Dns\Model\Message;
6
use React\Dns\Resolver\ResolverInterface;
7
use React\EventLoop\LoopInterface;
8
use React\EventLoop\TimerInterface;
9
use React\Promise;
10
use React\Promise\CancellablePromiseInterface;
11
 
12
/**
13
 * @internal
14
 */
15
final class HappyEyeBallsConnectionBuilder
16
{
17
    /**
18
     * As long as we haven't connected yet keep popping an IP address of the connect queue until one of them
19
     * succeeds or they all fail. We will wait 100ms between connection attempts as per RFC.
20
     *
21
     * @link https://tools.ietf.org/html/rfc8305#section-5
22
     */
23
    const CONNECTION_ATTEMPT_DELAY = 0.1;
24
 
25
    /**
26
     * Delay `A` lookup by 50ms sending out connection to IPv4 addresses when IPv6 records haven't
27
     * resolved yet as per RFC.
28
     *
29
     * @link https://tools.ietf.org/html/rfc8305#section-3
30
     */
31
    const RESOLUTION_DELAY = 0.05;
32
 
33
    public $loop;
34
    public $connector;
35
    public $resolver;
36
    public $uri;
37
    public $host;
38
    public $resolved = array(
39
        Message::TYPE_A    => false,
40
        Message::TYPE_AAAA => false,
41
    );
42
    public $resolverPromises = array();
43
    public $connectionPromises = array();
44
    public $connectQueue = array();
45
    public $nextAttemptTimer;
46
    public $parts;
47
    public $ipsCount = 0;
48
    public $failureCount = 0;
49
    public $resolve;
50
    public $reject;
51
 
52
    public $lastErrorFamily;
53
    public $lastError6;
54
    public $lastError4;
55
 
56
    public function __construct(LoopInterface $loop, ConnectorInterface $connector, ResolverInterface $resolver, $uri, $host, $parts)
57
    {
58
        $this->loop = $loop;
59
        $this->connector = $connector;
60
        $this->resolver = $resolver;
61
        $this->uri = $uri;
62
        $this->host = $host;
63
        $this->parts = $parts;
64
    }
65
 
66
    public function connect()
67
    {
68
        $timer = null;
69
        $that = $this;
70
        return new Promise\Promise(function ($resolve, $reject) use ($that, &$timer) {
71
            $lookupResolve = function ($type) use ($that, $resolve, $reject) {
72
                return function (array $ips) use ($that, $type, $resolve, $reject) {
73
                    unset($that->resolverPromises[$type]);
74
                    $that->resolved[$type] = true;
75
 
76
                    $that->mixIpsIntoConnectQueue($ips);
77
 
78
                    // start next connection attempt if not already awaiting next
79
                    if ($that->nextAttemptTimer === null && $that->connectQueue) {
80
                        $that->check($resolve, $reject);
81
                    }
82
                };
83
            };
84
 
85
            $that->resolverPromises[Message::TYPE_AAAA] = $that->resolve(Message::TYPE_AAAA, $reject)->then($lookupResolve(Message::TYPE_AAAA));
86
            $that->resolverPromises[Message::TYPE_A] = $that->resolve(Message::TYPE_A, $reject)->then(function (array $ips) use ($that, &$timer) {
87
                // happy path: IPv6 has resolved already, continue with IPv4 addresses
88
                if ($that->resolved[Message::TYPE_AAAA] === true) {
89
                    return $ips;
90
                }
91
 
92
                // Otherwise delay processing IPv4 lookup until short timer passes or IPv6 resolves in the meantime
93
                $deferred = new Promise\Deferred();
94
                $timer = $that->loop->addTimer($that::RESOLUTION_DELAY, function () use ($deferred, $ips) {
95
                    $deferred->resolve($ips);
96
                });
97
 
98
                $that->resolverPromises[Message::TYPE_AAAA]->then(function () use ($that, $timer, $deferred, $ips) {
99
                    $that->loop->cancelTimer($timer);
100
                    $deferred->resolve($ips);
101
                });
102
 
103
                return $deferred->promise();
104
            })->then($lookupResolve(Message::TYPE_A));
105
        }, function ($_, $reject) use ($that, &$timer) {
106
            $reject(new \RuntimeException('Connection to ' . $that->uri . ' cancelled' . (!$that->connectionPromises ? ' during DNS lookup' : '')));
107
            $_ = $reject = null;
108
 
109
            $that->cleanUp();
110
            if ($timer instanceof TimerInterface) {
111
                $that->loop->cancelTimer($timer);
112
            }
113
        });
114
    }
115
 
116
    /**
117
     * @internal
118
     * @param int      $type   DNS query type
119
     * @param callable $reject
120
     * @return \React\Promise\PromiseInterface<string[],\Exception> Returns a promise
121
     *     that resolves list of IP addresses on success or rejects with an \Exception on error.
122
     */
123
    public function resolve($type, $reject)
124
    {
125
        $that = $this;
126
        return $that->resolver->resolveAll($that->host, $type)->then(null, function (\Exception $e) use ($type, $reject, $that) {
127
            unset($that->resolverPromises[$type]);
128
            $that->resolved[$type] = true;
129
 
130
            if ($type === Message::TYPE_A) {
131
                $that->lastError4 = $e->getMessage();
132
                $that->lastErrorFamily = 4;
133
            } else {
134
                $that->lastError6 = $e->getMessage();
135
                $that->lastErrorFamily = 6;
136
            }
137
 
138
            // cancel next attempt timer when there are no more IPs to connect to anymore
139
            if ($that->nextAttemptTimer !== null && !$that->connectQueue) {
140
                $that->loop->cancelTimer($that->nextAttemptTimer);
141
                $that->nextAttemptTimer = null;
142
            }
143
 
144
            if ($that->hasBeenResolved() && $that->ipsCount === 0) {
145
                $reject(new \RuntimeException($that->error()));
146
            }
147
 
148
            throw $e;
149
        });
150
    }
151
 
152
    /**
153
     * @internal
154
     */
155
    public function check($resolve, $reject)
156
    {
157
        $ip = \array_shift($this->connectQueue);
158
 
159
        // start connection attempt and remember array position to later unset again
160
        $this->connectionPromises[] = $this->attemptConnection($ip);
161
        \end($this->connectionPromises);
162
        $index = \key($this->connectionPromises);
163
 
164
        $that = $this;
165
        $that->connectionPromises[$index]->then(function ($connection) use ($that, $index, $resolve) {
166
            unset($that->connectionPromises[$index]);
167
 
168
            $that->cleanUp();
169
 
170
            $resolve($connection);
171
        }, function (\Exception $e) use ($that, $index, $ip, $resolve, $reject) {
172
            unset($that->connectionPromises[$index]);
173
 
174
            $that->failureCount++;
175
 
176
            if (\strpos($ip, ':') === false) {
177
                $that->lastError4 = $e->getMessage();
178
                $that->lastErrorFamily = 4;
179
            } else {
180
                $that->lastError6 = $e->getMessage();
181
                $that->lastErrorFamily = 6;
182
            }
183
 
184
            // start next connection attempt immediately on error
185
            if ($that->connectQueue) {
186
                if ($that->nextAttemptTimer !== null) {
187
                    $that->loop->cancelTimer($that->nextAttemptTimer);
188
                    $that->nextAttemptTimer = null;
189
                }
190
 
191
                $that->check($resolve, $reject);
192
            }
193
 
194
            if ($that->hasBeenResolved() === false) {
195
                return;
196
            }
197
 
198
            if ($that->ipsCount === $that->failureCount) {
199
                $that->cleanUp();
200
 
201
                $reject(new \RuntimeException($that->error()));
202
            }
203
        });
204
 
205
        // Allow next connection attempt in 100ms: https://tools.ietf.org/html/rfc8305#section-5
206
        // Only start timer when more IPs are queued or when DNS query is still pending (might add more IPs)
207
        if ($this->nextAttemptTimer === null && (\count($this->connectQueue) > 0 || $this->resolved[Message::TYPE_A] === false || $this->resolved[Message::TYPE_AAAA] === false)) {
208
            $this->nextAttemptTimer = $this->loop->addTimer(self::CONNECTION_ATTEMPT_DELAY, function () use ($that, $resolve, $reject) {
209
                $that->nextAttemptTimer = null;
210
 
211
                if ($that->connectQueue) {
212
                    $that->check($resolve, $reject);
213
                }
214
            });
215
        }
216
    }
217
 
218
    /**
219
     * @internal
220
     */
221
    public function attemptConnection($ip)
222
    {
223
        $uri = '';
224
 
225
        // prepend original scheme if known
226
        if (isset($this->parts['scheme'])) {
227
            $uri .= $this->parts['scheme'] . '://';
228
        }
229
 
230
        if (\strpos($ip, ':') !== false) {
231
            // enclose IPv6 addresses in square brackets before appending port
232
            $uri .= '[' . $ip . ']';
233
        } else {
234
            $uri .= $ip;
235
        }
236
 
237
        // append original port if known
238
        if (isset($this->parts['port'])) {
239
            $uri .= ':' . $this->parts['port'];
240
        }
241
 
242
        // append orignal path if known
243
        if (isset($this->parts['path'])) {
244
            $uri .= $this->parts['path'];
245
        }
246
 
247
        // append original query if known
248
        if (isset($this->parts['query'])) {
249
            $uri .= '?' . $this->parts['query'];
250
        }
251
 
252
        // append original hostname as query if resolved via DNS and if
253
        // destination URI does not contain "hostname" query param already
254
        $args = array();
255
        \parse_str(isset($this->parts['query']) ? $this->parts['query'] : '', $args);
256
        if ($this->host !== $ip && !isset($args['hostname'])) {
257
            $uri .= (isset($this->parts['query']) ? '&' : '?') . 'hostname=' . \rawurlencode($this->host);
258
        }
259
 
260
        // append original fragment if known
261
        if (isset($this->parts['fragment'])) {
262
            $uri .= '#' . $this->parts['fragment'];
263
        }
264
 
265
        return $this->connector->connect($uri);
266
    }
267
 
268
    /**
269
     * @internal
270
     */
271
    public function cleanUp()
272
    {
273
        // clear list of outstanding IPs to avoid creating new connections
274
        $this->connectQueue = array();
275
 
276
        foreach ($this->connectionPromises as $connectionPromise) {
277
            if ($connectionPromise instanceof CancellablePromiseInterface) {
278
                $connectionPromise->cancel();
279
            }
280
        }
281
 
282
        foreach ($this->resolverPromises as $resolverPromise) {
283
            if ($resolverPromise instanceof CancellablePromiseInterface) {
284
                $resolverPromise->cancel();
285
            }
286
        }
287
 
288
        if ($this->nextAttemptTimer instanceof TimerInterface) {
289
            $this->loop->cancelTimer($this->nextAttemptTimer);
290
            $this->nextAttemptTimer = null;
291
        }
292
    }
293
 
294
    /**
295
     * @internal
296
     */
297
    public function hasBeenResolved()
298
    {
299
        foreach ($this->resolved as $typeHasBeenResolved) {
300
            if ($typeHasBeenResolved === false) {
301
                return false;
302
            }
303
        }
304
 
305
        return true;
306
    }
307
 
308
    /**
309
     * Mixes an array of IP addresses into the connect queue in such a way they alternate when attempting to connect.
310
     * The goal behind it is first attempt to connect to IPv6, then to IPv4, then to IPv6 again until one of those
311
     * attempts succeeds.
312
     *
313
     * @link https://tools.ietf.org/html/rfc8305#section-4
314
     *
315
     * @internal
316
     */
317
    public function mixIpsIntoConnectQueue(array $ips)
318
    {
319
        $this->ipsCount += \count($ips);
320
        $connectQueueStash = $this->connectQueue;
321
        $this->connectQueue = array();
322
        while (\count($connectQueueStash) > 0 || \count($ips) > 0) {
323
            if (\count($ips) > 0) {
324
                $this->connectQueue[] = \array_shift($ips);
325
            }
326
            if (\count($connectQueueStash) > 0) {
327
                $this->connectQueue[] = \array_shift($connectQueueStash);
328
            }
329
        }
330
    }
331
 
332
    /**
333
     * @internal
334
     * @return string
335
     */
336
    public function error()
337
    {
338
        if ($this->lastError4 === $this->lastError6) {
339
            $message = $this->lastError6;
340
        } elseif ($this->lastErrorFamily === 6) {
341
            $message = 'Last error for IPv6: ' . $this->lastError6 . '. Previous error for IPv4: ' . $this->lastError4;
342
        } else {
343
            $message = 'Last error for IPv4: ' . $this->lastError4 . '. Previous error for IPv6: ' . $this->lastError6;
344
        }
345
 
346
        if ($this->hasBeenResolved() && $this->ipsCount === 0) {
347
            if ($this->lastError6 === $this->lastError4) {
348
                $message = ' during DNS lookup: ' . $this->lastError6;
349
            } else {
350
                $message = ' during DNS lookup. ' . $message;
351
            }
352
        } else {
353
            $message = ': ' . $message;
354
        }
355
 
356
        return 'Connection to ' . $this->uri . ' failed'  . $message;
357
    }
358
}