| 1 |
liveuser |
1 |
<?php
|
|
|
2 |
|
|
|
3 |
namespace React\Dns\Query;
|
|
|
4 |
|
|
|
5 |
use React\Promise\Promise;
|
|
|
6 |
|
|
|
7 |
/**
|
|
|
8 |
* Cooperatively resolves hosts via the given base executor to ensure same query is not run concurrently
|
|
|
9 |
*
|
|
|
10 |
* Wraps an existing `ExecutorInterface` to keep tracking of pending queries
|
|
|
11 |
* and only starts a new query when the same query is not already pending. Once
|
|
|
12 |
* the underlying query is fulfilled/rejected, it will forward its value to all
|
|
|
13 |
* promises awaiting the same query.
|
|
|
14 |
*
|
|
|
15 |
* This means it will not limit concurrency for queries that differ, for example
|
|
|
16 |
* when sending many queries for different host names or types.
|
|
|
17 |
*
|
|
|
18 |
* This is useful because all executors are entirely async and as such allow you
|
|
|
19 |
* to execute any number of queries concurrently. You should probably limit the
|
|
|
20 |
* number of concurrent queries in your application or you're very likely going
|
|
|
21 |
* to face rate limitations and bans on the resolver end. For many common
|
|
|
22 |
* applications, you may want to avoid sending the same query multiple times
|
|
|
23 |
* when the first one is still pending, so you will likely want to use this in
|
|
|
24 |
* combination with some other executor like this:
|
|
|
25 |
*
|
|
|
26 |
* ```php
|
|
|
27 |
* $executor = new CoopExecutor(
|
|
|
28 |
* new RetryExecutor(
|
|
|
29 |
* new TimeoutExecutor(
|
|
|
30 |
* new UdpTransportExecutor($nameserver, $loop),
|
|
|
31 |
* 3.0,
|
|
|
32 |
* $loop
|
|
|
33 |
* )
|
|
|
34 |
* )
|
|
|
35 |
* );
|
|
|
36 |
* ```
|
|
|
37 |
*/
|
|
|
38 |
final class CoopExecutor implements ExecutorInterface
|
|
|
39 |
{
|
|
|
40 |
private $executor;
|
|
|
41 |
private $pending = array();
|
|
|
42 |
private $counts = array();
|
|
|
43 |
|
|
|
44 |
public function __construct(ExecutorInterface $base)
|
|
|
45 |
{
|
|
|
46 |
$this->executor = $base;
|
|
|
47 |
}
|
|
|
48 |
|
|
|
49 |
public function query(Query $query)
|
|
|
50 |
{
|
|
|
51 |
$key = $this->serializeQueryToIdentity($query);
|
|
|
52 |
if (isset($this->pending[$key])) {
|
|
|
53 |
// same query is already pending, so use shared reference to pending query
|
|
|
54 |
$promise = $this->pending[$key];
|
|
|
55 |
++$this->counts[$key];
|
|
|
56 |
} else {
|
|
|
57 |
// no such query pending, so start new query and keep reference until it's fulfilled or rejected
|
|
|
58 |
$promise = $this->executor->query($query);
|
|
|
59 |
$this->pending[$key] = $promise;
|
|
|
60 |
$this->counts[$key] = 1;
|
|
|
61 |
|
|
|
62 |
$pending =& $this->pending;
|
|
|
63 |
$counts =& $this->counts;
|
|
|
64 |
$promise->then(function () use ($key, &$pending, &$counts) {
|
|
|
65 |
unset($pending[$key], $counts[$key]);
|
|
|
66 |
}, function () use ($key, &$pending, &$counts) {
|
|
|
67 |
unset($pending[$key], $counts[$key]);
|
|
|
68 |
});
|
|
|
69 |
}
|
|
|
70 |
|
|
|
71 |
// Return a child promise awaiting the pending query.
|
|
|
72 |
// Cancelling this child promise should only cancel the pending query
|
|
|
73 |
// when no other child promise is awaiting the same query.
|
|
|
74 |
$pending =& $this->pending;
|
|
|
75 |
$counts =& $this->counts;
|
|
|
76 |
return new Promise(function ($resolve, $reject) use ($promise) {
|
|
|
77 |
$promise->then($resolve, $reject);
|
|
|
78 |
}, function () use (&$promise, $key, $query, &$pending, &$counts) {
|
|
|
79 |
if (--$counts[$key] < 1) {
|
|
|
80 |
unset($pending[$key], $counts[$key]);
|
|
|
81 |
$promise->cancel();
|
|
|
82 |
$promise = null;
|
|
|
83 |
}
|
|
|
84 |
throw new \RuntimeException('DNS query for ' . $query->name . ' has been cancelled');
|
|
|
85 |
});
|
|
|
86 |
}
|
|
|
87 |
|
|
|
88 |
private function serializeQueryToIdentity(Query $query)
|
|
|
89 |
{
|
|
|
90 |
return sprintf('%s:%s:%s', $query->name, $query->type, $query->class);
|
|
|
91 |
}
|
|
|
92 |
}
|