Как сделать Amphp пул / очередь из нескольких запросов? А где находится обработчик Curl?

Вот пример / тестовый код, сделанный с использованием GuzzleHttp:

use GuzzleHttp\Client;
use GuzzleHttp\Handler\CurlHandler;
use GuzzleHttp\HandlerStack;
use GuzzleHttp\Middleware;
use GuzzleHttp\Pool;
use Psr\Http\Message\ResponseInterface;

require __DIR__ . '/vendor/autoload.php';

$handler = new CurlHandler();

$stack = new HandlerStack($handler);
$stack->push(Middleware::httpErrors(), 'http_errors');
$stack->push(Middleware::redirect(), 'allow_redirects');
$stack->push(Middleware::cookies(), 'cookies');
$stack->push(Middleware::prepareBody(), 'prepare_body');
$interval = 100;
$concurrency = 50;
$client = new Client(['handler' => $stack]);
echo sprintf("Using Guzzle handler %s\n", get_class($handler));
echo sprintf("Printing memory usage every %d requests\n", $interval);
echo "Fetching package list... ";

$packageNames = json_decode(
$client->get('https://packagist.org/packages/list.json')
->getBody()
->getContents()
)->packageNames;

if (empty($packageNames)) {
echo "Empty result. No reason to continue.";
return;
}

echo 'done. (' . count($packageNames) . " packages)\n\n";

$requests = function($packageNames) {
foreach ($packageNames as $packageVendorPair) {
yield new GuzzleHttp\Psr7\Request('GET', "https://packagist.org/p/{$packageVendorPair}.json");
}
};

$pool = new Pool($client, $requests($packageNames), [
'concurrency' => $concurrency,
'fulfilled' => function (ResponseInterface $response, $index) use (&$counter, $interval) {
$counter++;
if ($counter % $interval === 0) {
echo sprintf(
"Processed %s requests. Memory used: %s MB\n",
number_format($counter),
number_format(memory_get_peak_usage()/1024/1024, 3)
);
}
},
'rejected' => function($reason, $index) use (&$counter, $interval)
{
$counter++;
if ($counter % $interval === 0) {
echo sprintf(
'Processed %s requests. Memory used: %s MB',
number_format($counter),
number_format(memory_get_peak_usage()/1024/1024, 3)
);
}
}
]);

$promise = $pool->promise();
$response = $promise->wait();

Как сделать что-то похожее для Amphp или Artax?
Я искал документы по amp и stackoverflow, но не смог найти ничего похожего.

Кстати, я также обнаружил, что Amp не использует Curl в качестве обработчика. Не понимаю, почему такой опции нет. Можете ли вы добавить его вручную или что-то еще лучше, что заменило функциональность curl (различные пользовательские заголовки, возможности отладки / подробности и т. Д.)?

Конкретные моменты, где мне нужна помощь:

  1. Возможно ли, что кто-то может показать мне, где я могу найти пример, эквивалентный пулу, сделанный с использованием Amp Framework или любой из его библиотек, и / или просто показать его даже в более простом примере?
  2. Где находится обработчик Curl в Amp? Могу ли я использовать это и как?

На сайте Amphp сказано:

Сообщество Stack Overflow может ответить на ваш вопрос, если он достаточно общий. Используйте тег amphp, чтобы правильные люди нашли ваш вопрос.

Поскольку я привел достаточно простой (и работающий) пример, я подумал, что будет легко понять, что именно мне нужно.

При всем моем уважении.

0

Решение

Эквивалентного пула нет, но его можно записать с использованием семафора и асинхронных сопрограмм.

<?php

use Amp\Artax\DefaultClient;
use Amp\Loop;
use Amp\Sync\LocalSemaphore;

require __DIR__ . "/vendor/autoload.php";

Loop::run(function () {
$concurrency = 10;
$client = new DefaultClient;
$semaphore = new LocalSemaphore(10);

$packageResponse = yield $client->request("https://packagist.org/packages/list.json");
$packageNames = json_decode(yield $packageResponse->getBody())->packageNames;

$requestHandler = Amp\coroutine(function ($package) use ($client) {
$url = "https://packagist.org/p/{$package}.json";

$response = yield $client->request($url);
$body = yield $response->getBody();

return $body;
});

$counter = 0;

foreach ($packageNames as $package) {
$lock = yield $semaphore->acquire();

$promise = $requestHandler($package);
$promise->onResolve(function ($error, $body) use (&$counter, $lock) {
$lock->release();

if (++$counter % 50 === 0) {
echo sprintf(
"Processed %s requests. Memory used: %s MB\n",
number_format($counter),
number_format(memory_get_peak_usage()/1024/1024, 3)
);
}
});
}
});

Этот пример использует LocalSemaphore реализация, которая является реализацией Amp\Sync\Semaphore. Семафор используется для ограничения параллелизма.

В Amp нет обработчика Curl, потому что он плохо работает с циклами событий. У Curl есть свой собственный цикл обработки событий, но он допускает только несколько одновременных HTTP-запросов, никаких других неблокирующих операций ввода-вывода. Вот почему Artax реализует HTTP на основе необработанных сокетов PHP без какой-либо зависимости от Curl.

1

Другие решения

Других решений пока нет …