Skip to content
Snippets Groups Projects

Resolve "use multi curl to improve performance"

Merged Ghost User requested to merge 19-use-multi-curl-to-improve-performance into master
Files
5
+ 232
0
<?php
namespace App\Console\Commands;
use Cache;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\Redis;
use Log;
class RequestFetcher extends Command
{
public const FETCHQUEUE_KEY = "fetcher.queue";
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'requests:fetcher';
/**
* The console command description.
*
* @var string
*/
protected $description = 'This commands fetches requests to the installed search engines';
protected $shouldRun = true;
protected $multicurl = null;
protected $proxyhost;
protected $proxyuser;
protected $proxypassword;
/**
* Create a new command instance.
*
* @return void
*/
public function __construct()
{
parent::__construct();
$this->multicurl = curl_multi_init();
$this->proxyhost = env("PROXY_HOST", "");
$this->proxyport = env("PROXY_PORT", "");
$this->proxyuser = env("PROXY_USER", "");
$this->proxypassword = env("PROXY_PASSWORD", "");
}
/**
* Execute the console command.
*
* @return mixed
*/
public function handle()
{
$pidFile = "/tmp/fetcher";
\pcntl_signal(SIGINT, [$this, "sig_handler"]);
\pcntl_signal(SIGTERM, [$this, "sig_handler"]);
\pcntl_signal(SIGHUP, [$this, "sig_handler"]);
// Redis might not be available now
for ($count = 0; $count < 10; $count++) {
try {
Redis::connection();
break;
} catch (\Predis\Connection\ConnectionException $e) {
if ($count >= 9) {
// If its not available after 10 seconds we will exit
return;
}
sleep(1);
}
}
touch($pidFile);
if (!file_exists($pidFile)) {
return;
}
try {
while ($this->shouldRun) {
$operationsRunning = true;
curl_multi_exec($this->multicurl, $operationsRunning);
$status = $this->readMultiCurl($this->multicurl);
$answersRead = $status[0];
$messagesLeft = $status[1];
$newJobs = $this->checkNewJobs($operationsRunning, $messagesLeft);
if ($newJobs === 0 && $answersRead === 0) {
usleep(10 * 1000);
}
}
} finally {
unlink($pidFile);
curl_multi_close($this->multicurl);
}
}
/**
* Checks the Redis queue if any new fetch jobs where submitted
* and adds them to multicurl if there are.
* Will be blocking call to redis if there are no running jobs in multicurl
*/
private function checkNewJobs($operationsRunning, $messagesLeft)
{
$newJobs = [];
if ($operationsRunning === 0 && $messagesLeft === -1) {
$newJob = Redis::blpop($this::FETCHQUEUE_KEY, 1);
if (!empty($newJob)) {
$newJobs[] = $newJob[1];
}
} else {
$elements = Redis::pipeline(function ($redis) {
$redis->lrange($this::FETCHQUEUE_KEY, 0, -1);
$redis->del($this::FETCHQUEUE_KEY);
});
$newJobs = $elements[0];
}
$addedJobs = 0;
foreach ($newJobs as $newJob) {
$newJob = json_decode($newJob, true);
$ch = $this->getCurlHandle($newJob);
if (curl_multi_add_handle($this->multicurl, $ch) !== 0) {
$this->shouldRun = false;
Log::error("Couldn't add Handle to multicurl");
break;
} else {
$addedJobs++;
}
}
return $addedJobs;
}
private function readMultiCurl($mc)
{
$messagesLeft = -1;
$answersRead = 0;
while (($info = curl_multi_info_read($mc, $messagesLeft)) !== false) {
try {
$answersRead++;
$infos = curl_getinfo($info["handle"], CURLINFO_PRIVATE);
$infos = explode(";", $infos);
$resulthash = $infos[0];
$cacheDurationMinutes = intval($infos[1]);
$responseCode = curl_getinfo($info["handle"], CURLINFO_HTTP_CODE);
$body = "no-result";
$totalTime = curl_getinfo($info["handle"], CURLINFO_TOTAL_TIME);
$error = curl_error($info["handle"]);
if (!empty($error)) {
Log::error($error);
}
$body = \curl_multi_getcontent($info["handle"]);
Redis::pipeline(function ($pipe) use ($resulthash, $body, $cacheDurationMinutes) {
$pipe->lpush($resulthash, $body);
$pipe->expire($resulthash, 60);
});
if ($cacheDurationMinutes > 0) {
try {
Cache::put($resulthash, $body, $cacheDurationMinutes * 60);
} catch (\Exception $e) {
Log::error($e->getMessage());
}
}
} catch (\Exception $e) {
Log::error($e->getMessage());
} finally {
\curl_multi_remove_handle($mc, $info["handle"]);
}
}
return [$answersRead, $messagesLeft];
}
private function getCurlHandle($job)
{
$ch = curl_init();
curl_setopt_array($ch, array(
CURLOPT_URL => $job["url"],
CURLOPT_PRIVATE => $job["resulthash"] . ";" . $job["cacheDuration"],
CURLOPT_RETURNTRANSFER => 1,
CURLOPT_USERAGENT => $job["useragent"],
CURLOPT_FOLLOWLOCATION => true,
CURLOPT_CONNECTTIMEOUT => 2,
CURLOPT_MAXCONNECTS => 500,
CURLOPT_LOW_SPEED_LIMIT => 50000,
CURLOPT_LOW_SPEED_TIME => 5,
CURLOPT_TIMEOUT => 7,
CURLOPT_HEADER => true,
));
if (!empty($job["curlopts"])) {
curl_setopt_array($ch, $job["curlopts"]);
}
if (!empty($this->proxyhost) && !empty($this->proxyport) && !empty($this->proxyuser) && !empty($this->proxypassword)) {
curl_setopt($ch, CURLOPT_PROXY, $this->proxyhost);
curl_setopt($ch, CURLOPT_PROXYUSERPWD, $this->proxyuser . ":" . $this->proxypassword);
curl_setopt($ch, CURLOPT_PROXYPORT, $this->proxyport);
curl_setopt($ch, CURLOPT_PROXYTYPE, CURLPROXY_HTTP);
}
if (!empty($job["username"]) && !empty($job["password"])) {
curl_setopt($ch, CURLOPT_USERPWD, $job["username"] . ":" . $job["password"]);
}
if (!empty($job["headers"]) && sizeof($job["headers"]) > 0) {
$headers = [];
foreach ($job["headers"] as $key => $value) {
$headers[] = $key . ":" . $value;
}
# Headers are in the Form:
# <key>:<value>;<key>:<value>
curl_setopt($ch, CURLOPT_HTTPHEADER, $headers);
}
return $ch;
}
public function sig_handler($sig)
{
$this->shouldRun = false;
echo("Terminating Process\n");
}
}
Loading