Subversion Repositories qbpwcf-lib(archive)

Rev

Rev 915 | Details | Compare with Previous | Last modification | View Log | RSS feed

Rev Author Line No. Line
1 liveuser 1
<?php
2
 
3
namespace React\Dns\Query;
4
 
5
use React\Promise\Promise;
6
 
7
/**
8
 * Cooperatively resolves hosts via the given base executor to ensure same query is not run concurrently
9
 *
10
 * Wraps an existing `ExecutorInterface` to keep tracking of pending queries
11
 * and only starts a new query when the same query is not already pending. Once
12
 * the underlying query is fulfilled/rejected, it will forward its value to all
13
 * promises awaiting the same query.
14
 *
15
 * This means it will not limit concurrency for queries that differ, for example
16
 * when sending many queries for different host names or types.
17
 *
18
 * This is useful because all executors are entirely async and as such allow you
19
 * to execute any number of queries concurrently. You should probably limit the
20
 * number of concurrent queries in your application or you're very likely going
21
 * to face rate limitations and bans on the resolver end. For many common
22
 * applications, you may want to avoid sending the same query multiple times
23
 * when the first one is still pending, so you will likely want to use this in
24
 * combination with some other executor like this:
25
 *
26
 * ```php
27
 * $executor = new CoopExecutor(
28
 *     new RetryExecutor(
29
 *         new TimeoutExecutor(
30
 *             new UdpTransportExecutor($nameserver, $loop),
31
 *             3.0,
32
 *             $loop
33
 *         )
34
 *     )
35
 * );
36
 * ```
37
 */
38
final class CoopExecutor implements ExecutorInterface
39
{
40
    private $executor;
41
    private $pending = array();
42
    private $counts = array();
43
 
44
    public function __construct(ExecutorInterface $base)
45
    {
46
        $this->executor = $base;
47
    }
48
 
49
    public function query(Query $query)
50
    {
51
        $key = $this->serializeQueryToIdentity($query);
52
        if (isset($this->pending[$key])) {
53
            // same query is already pending, so use shared reference to pending query
54
            $promise = $this->pending[$key];
55
            ++$this->counts[$key];
56
        } else {
57
            // no such query pending, so start new query and keep reference until it's fulfilled or rejected
58
            $promise = $this->executor->query($query);
59
            $this->pending[$key] = $promise;
60
            $this->counts[$key] = 1;
61
 
62
            $pending =& $this->pending;
63
            $counts =& $this->counts;
64
            $promise->then(function () use ($key, &$pending, &$counts) {
65
                unset($pending[$key], $counts[$key]);
66
            }, function () use ($key, &$pending, &$counts) {
67
                unset($pending[$key], $counts[$key]);
68
            });
69
        }
70
 
71
        // Return a child promise awaiting the pending query.
72
        // Cancelling this child promise should only cancel the pending query
73
        // when no other child promise is awaiting the same query.
74
        $pending =& $this->pending;
75
        $counts =& $this->counts;
76
        return new Promise(function ($resolve, $reject) use ($promise) {
77
            $promise->then($resolve, $reject);
78
        }, function () use (&$promise, $key, $query, &$pending, &$counts) {
79
            if (--$counts[$key] < 1) {
80
                unset($pending[$key], $counts[$key]);
81
                $promise->cancel();
82
                $promise = null;
83
            }
84
            throw new \RuntimeException('DNS query for ' . $query->name . ' has been cancelled');
85
        });
86
    }
87
 
88
    private function serializeQueryToIdentity(Query $query)
89
    {
90
        return sprintf('%s:%s:%s', $query->name, $query->type, $query->class);
91
    }
92
}