Subversion Repositories php-qbpwcf

Rev

Details | Last modification | View Log | RSS feed

Rev Author Line No. Line
3 liveuser 1
<?php
2
 
3
namespace Guzzle\Http\Curl;
4
 
5
use Guzzle\Common\AbstractHasDispatcher;
6
use Guzzle\Common\Event;
7
use Guzzle\Http\Exception\MultiTransferException;
8
use Guzzle\Http\Exception\CurlException;
9
use Guzzle\Http\Message\RequestInterface;
10
use Guzzle\Http\Message\EntityEnclosingRequestInterface;
11
use Guzzle\Http\Exception\RequestException;
12
 
13
/**
14
 * Send {@see RequestInterface} objects in parallel using curl_multi
15
 */
16
class CurlMulti extends AbstractHasDispatcher implements CurlMultiInterface
17
{
18
    /** @var resource cURL multi handle. */
19
    protected $multiHandle;
20
 
21
    /** @var array Attached {@see RequestInterface} objects. */
22
    protected $requests;
23
 
24
    /** @var \SplObjectStorage RequestInterface to CurlHandle hash */
25
    protected $handles;
26
 
27
    /** @var array Hash mapping curl handle resource IDs to request objects */
28
    protected $resourceHash;
29
 
30
    /** @var array Queued exceptions */
31
    protected $exceptions = array();
32
 
33
    /** @var array Requests that succeeded */
34
    protected $successful = array();
35
 
36
    /** @var array cURL multi error values and codes */
37
    protected $multiErrors = array(
38
        CURLM_BAD_HANDLE      => array('CURLM_BAD_HANDLE', 'The passed-in handle is not a valid CURLM handle.'),
39
        CURLM_BAD_EASY_HANDLE => array('CURLM_BAD_EASY_HANDLE', "An easy handle was not good/valid. It could mean that it isn't an easy handle at all, or possibly that the handle already is in used by this or another multi handle."),
40
        CURLM_OUT_OF_MEMORY   => array('CURLM_OUT_OF_MEMORY', 'You are doomed.'),
41
        CURLM_INTERNAL_ERROR  => array('CURLM_INTERNAL_ERROR', 'This can only be returned if libcurl bugs. Please report it to us!')
42
    );
43
 
44
    /** @var float */
45
    protected $selectTimeout;
46
 
47
    public function __construct($selectTimeout = 1.0)
48
    {
49
        $this->selectTimeout = $selectTimeout;
50
        $this->multiHandle = curl_multi_init();
51
        // @codeCoverageIgnoreStart
52
        if ($this->multiHandle === false) {
53
            throw new CurlException('Unable to create multi handle');
54
        }
55
        // @codeCoverageIgnoreEnd
56
        $this->reset();
57
    }
58
 
59
    public function __destruct()
60
    {
61
        if (is_resource($this->multiHandle)) {
62
            curl_multi_close($this->multiHandle);
63
        }
64
    }
65
 
66
    public function add(RequestInterface $request)
67
    {
68
        $this->requests[] = $request;
69
        // If requests are currently transferring and this is async, then the
70
        // request must be prepared now as the send() method is not called.
71
        $this->beforeSend($request);
72
        $this->dispatch(self::ADD_REQUEST, array('request' => $request));
73
 
74
        return $this;
75
    }
76
 
77
    public function all()
78
    {
79
        return $this->requests;
80
    }
81
 
82
    public function remove(RequestInterface $request)
83
    {
84
        $this->removeHandle($request);
85
        if (($index = array_search($request, $this->requests, true)) !== false) {
86
            $request = $this->requests[$index];
87
            unset($this->requests[$index]);
88
            $this->requests = array_values($this->requests);
89
            $this->dispatch(self::REMOVE_REQUEST, array('request' => $request));
90
            return true;
91
        }
92
 
93
        return false;
94
    }
95
 
96
    public function reset($hard = false)
97
    {
98
        // Remove each request
99
        if ($this->requests) {
100
            foreach ($this->requests as $request) {
101
                $this->remove($request);
102
            }
103
        }
104
 
105
        $this->handles = new \SplObjectStorage();
106
        $this->requests = $this->resourceHash = $this->exceptions = $this->successful = array();
107
    }
108
 
109
    public function send()
110
    {
111
        $this->perform();
112
        $exceptions = $this->exceptions;
113
        $successful = $this->successful;
114
        $this->reset();
115
 
116
        if ($exceptions) {
117
            $this->throwMultiException($exceptions, $successful);
118
        }
119
    }
120
 
121
    public function count()
122
    {
123
        return count($this->requests);
124
    }
125
 
126
    /**
127
     * Build and throw a MultiTransferException
128
     *
129
     * @param array $exceptions Exceptions encountered
130
     * @param array $successful Successful requests
131
     * @throws MultiTransferException
132
     */
133
    protected function throwMultiException(array $exceptions, array $successful)
134
    {
135
        $multiException = new MultiTransferException('Errors during multi transfer');
136
 
137
        while ($e = array_shift($exceptions)) {
138
            $multiException->addFailedRequestWithException($e['request'], $e['exception']);
139
        }
140
 
141
        // Add successful requests
142
        foreach ($successful as $request) {
143
            if (!$multiException->containsRequest($request)) {
144
                $multiException->addSuccessfulRequest($request);
145
            }
146
        }
147
 
148
        throw $multiException;
149
    }
150
 
151
    /**
152
     * Prepare for sending
153
     *
154
     * @param RequestInterface $request Request to prepare
155
     * @throws \Exception on error preparing the request
156
     */
157
    protected function beforeSend(RequestInterface $request)
158
    {
159
        try {
160
            $state = $request->setState(RequestInterface::STATE_TRANSFER);
161
            if ($state == RequestInterface::STATE_TRANSFER) {
162
                $this->addHandle($request);
163
            } else {
164
                // Requests might decide they don't need to be sent just before
165
                // transfer (e.g. CachePlugin)
166
                $this->remove($request);
167
                if ($state == RequestInterface::STATE_COMPLETE) {
168
                    $this->successful[] = $request;
169
                }
170
            }
171
        } catch (\Exception $e) {
172
            // Queue the exception to be thrown when sent
173
            $this->removeErroredRequest($request, $e);
174
        }
175
    }
176
 
177
    private function addHandle(RequestInterface $request)
178
    {
179
        $handle = $this->createCurlHandle($request)->getHandle();
180
        $this->checkCurlResult(
181
            curl_multi_add_handle($this->multiHandle, $handle)
182
        );
183
    }
184
 
185
    /**
186
     * Create a curl handle for a request
187
     *
188
     * @param RequestInterface $request Request
189
     *
190
     * @return CurlHandle
191
     */
192
    protected function createCurlHandle(RequestInterface $request)
193
    {
194
        $wrapper = CurlHandle::factory($request);
195
        $this->handles[$request] = $wrapper;
196
        $this->resourceHash[(int) $wrapper->getHandle()] = $request;
197
 
198
        return $wrapper;
199
    }
200
 
201
    /**
202
     * Get the data from the multi handle
203
     */
204
    protected function perform()
205
    {
206
        $event = new Event(array('curl_multi' => $this));
207
 
208
        while ($this->requests) {
209
            // Notify each request as polling
210
            $blocking = $total = 0;
211
            foreach ($this->requests as $request) {
212
                ++$total;
213
                $event['request'] = $request;
214
                $request->getEventDispatcher()->dispatch(self::POLLING_REQUEST, $event);
215
                // The blocking variable just has to be non-falsey to block the loop
216
                if ($request->getParams()->hasKey(self::BLOCKING)) {
217
                    ++$blocking;
218
                }
219
            }
220
            if ($blocking == $total) {
221
                // Sleep to prevent eating CPU because no requests are actually pending a select call
222
                usleep(500);
223
            } else {
224
                $this->executeHandles();
225
            }
226
        }
227
    }
228
 
229
    /**
230
     * Execute and select curl handles
231
     */
232
    private function executeHandles()
233
    {
234
        // The first curl_multi_select often times out no matter what, but is usually required for fast transfers
235
        $selectTimeout = 0.001;
236
        $active = false;
237
        do {
238
            while (($mrc = curl_multi_exec($this->multiHandle, $active)) == CURLM_CALL_MULTI_PERFORM);
239
            $this->checkCurlResult($mrc);
240
            $this->processMessages();
241
            if ($active && curl_multi_select($this->multiHandle, $selectTimeout) === -1) {
242
                // Perform a usleep if a select returns -1: https://bugs.php.net/bug.php?id=61141
243
                usleep(150);
244
            }
245
            $selectTimeout = $this->selectTimeout;
246
        } while ($active);
247
    }
248
 
249
    /**
250
     * Process any received curl multi messages
251
     */
252
    private function processMessages()
253
    {
254
        while ($done = curl_multi_info_read($this->multiHandle)) {
255
            $request = $this->resourceHash[(int) $done['handle']];
256
            try {
257
                $this->processResponse($request, $this->handles[$request], $done);
258
                $this->successful[] = $request;
259
            } catch (\Exception $e) {
260
                $this->removeErroredRequest($request, $e);
261
            }
262
        }
263
    }
264
 
265
    /**
266
     * Remove a request that encountered an exception
267
     *
268
     * @param RequestInterface $request Request to remove
269
     * @param \Exception       $e       Exception encountered
270
     */
271
    protected function removeErroredRequest(RequestInterface $request, \Exception $e = null)
272
    {
273
        $this->exceptions[] = array('request' => $request, 'exception' => $e);
274
        $this->remove($request);
275
        $this->dispatch(self::MULTI_EXCEPTION, array('exception' => $e, 'all_exceptions' => $this->exceptions));
276
    }
277
 
278
    /**
279
     * Check for errors and fix headers of a request based on a curl response
280
     *
281
     * @param RequestInterface $request Request to process
282
     * @param CurlHandle       $handle  Curl handle object
283
     * @param array            $curl    Array returned from curl_multi_info_read
284
     *
285
     * @throws CurlException on Curl error
286
     */
287
    protected function processResponse(RequestInterface $request, CurlHandle $handle, array $curl)
288
    {
289
        // Set the transfer stats on the response
290
        $handle->updateRequestFromTransfer($request);
291
        // Check if a cURL exception occurred, and if so, notify things
292
        $curlException = $this->isCurlException($request, $handle, $curl);
293
 
294
        // Always remove completed curl handles.  They can be added back again
295
        // via events if needed (e.g. ExponentialBackoffPlugin)
296
        $this->removeHandle($request);
297
 
298
        if (!$curlException) {
299
            if ($this->validateResponseWasSet($request)) {
300
                $state = $request->setState(
301
                    RequestInterface::STATE_COMPLETE,
302
                    array('handle' => $handle)
303
                );
304
                // Only remove the request if it wasn't resent as a result of
305
                // the state change
306
                if ($state != RequestInterface::STATE_TRANSFER) {
307
                    $this->remove($request);
308
                }
309
            }
310
            return;
311
        }
312
 
313
        // Set the state of the request to an error
314
        $state = $request->setState(RequestInterface::STATE_ERROR, array('exception' => $curlException));
315
        // Allow things to ignore the error if possible
316
        if ($state != RequestInterface::STATE_TRANSFER) {
317
            $this->remove($request);
318
        }
319
 
320
        // The error was not handled, so fail
321
        if ($state == RequestInterface::STATE_ERROR) {
322
            /** @var CurlException $curlException */
323
            throw $curlException;
324
        }
325
    }
326
 
327
    /**
328
     * Remove a curl handle from the curl multi object
329
     *
330
     * @param RequestInterface $request Request that owns the handle
331
     */
332
    protected function removeHandle(RequestInterface $request)
333
    {
334
        if (isset($this->handles[$request])) {
335
            $handle = $this->handles[$request];
336
            curl_multi_remove_handle($this->multiHandle, $handle->getHandle());
337
            unset($this->handles[$request]);
338
            unset($this->resourceHash[(int) $handle->getHandle()]);
339
            $handle->close();
340
        }
341
    }
342
 
343
    /**
344
     * Check if a cURL transfer resulted in what should be an exception
345
     *
346
     * @param RequestInterface $request Request to check
347
     * @param CurlHandle       $handle  Curl handle object
348
     * @param array            $curl    Array returned from curl_multi_info_read
349
     *
350
     * @return CurlException|bool
351
     */
352
    private function isCurlException(RequestInterface $request, CurlHandle $handle, array $curl)
353
    {
354
        if (CURLM_OK == $curl['result'] || CURLM_CALL_MULTI_PERFORM == $curl['result']) {
355
            return false;
356
        }
357
 
358
        $handle->setErrorNo($curl['result']);
359
        $e = new CurlException(sprintf('[curl] %s: %s [url] %s',
360
            $handle->getErrorNo(), $handle->getError(), $handle->getUrl()));
361
        $e->setCurlHandle($handle)
362
            ->setRequest($request)
363
            ->setCurlInfo($handle->getInfo())
364
            ->setError($handle->getError(), $handle->getErrorNo());
365
 
366
        return $e;
367
    }
368
 
369
    /**
370
     * Throw an exception for a cURL multi response if needed
371
     *
372
     * @param int $code Curl response code
373
     * @throws CurlException
374
     */
375
    private function checkCurlResult($code)
376
    {
377
        if ($code != CURLM_OK && $code != CURLM_CALL_MULTI_PERFORM) {
378
            throw new CurlException(isset($this->multiErrors[$code])
379
                ? "cURL error: {$code} ({$this->multiErrors[$code][0]}): cURL message: {$this->multiErrors[$code][1]}"
380
                : 'Unexpected cURL error: ' . $code
381
            );
382
        }
383
    }
384
 
385
    /**
386
     * @link https://github.com/guzzle/guzzle/issues/710
387
     */
388
    private function validateResponseWasSet(RequestInterface $request)
389
    {
390
        if ($request->getResponse()) {
391
            return true;
392
        }
393
 
394
        $body = $request instanceof EntityEnclosingRequestInterface
395
            ? $request->getBody()
396
            : null;
397
 
398
        if (!$body) {
399
            $rex = new RequestException(
400
                'No response was received for a request with no body. This'
401
                . ' could mean that you are saturating your network.'
402
            );
403
            $rex->setRequest($request);
404
            $this->removeErroredRequest($request, $rex);
405
        } elseif (!$body->isSeekable() || !$body->seek(0)) {
406
            // Nothing we can do with this. Sorry!
407
            $rex = new RequestException(
408
                'The connection was unexpectedly closed. The request would'
409
                . ' have been retried, but attempting to rewind the'
410
                . ' request body failed.'
411
            );
412
            $rex->setRequest($request);
413
            $this->removeErroredRequest($request, $rex);
414
        } else {
415
            $this->remove($request);
416
            // Add the request back to the batch to retry automatically.
417
            $this->requests[] = $request;
418
            $this->addHandle($request);
419
        }
420
 
421
        return false;
422
    }
423
}