Select Git revision
RequestFetcher.php

Dominik Hebeler authored
RequestFetcher.php 9.33 KiB
<?php
namespace App\Console\Commands;
use Cache;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\Redis;
use Log;
class RequestFetcher extends Command
{
public const FETCHQUEUE_KEY = "fetcher.queue";
public const SIZE_LIMIT = 20 * 1024 * 1024; // Limit Document Size to 2MB
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'requests:fetcher';
/**
* The console command description.
*
* @var string
*/
protected $description = 'This commands fetches requests to the installed search engines';
protected $shouldRun = true;
protected $multicurl = null;
protected $proxyhost;
protected $proxyuser;
protected $proxypassword;
/**
* Create a new command instance.
*
* @return void
*/
public function __construct()
{
parent::__construct();
$this->multicurl = curl_multi_init();
$this->proxyhost = env("PROXY_HOST", "");
$this->proxyport = env("PROXY_PORT", "");
$this->proxyuser = env("PROXY_USER", "");
$this->proxypassword = env("PROXY_PASSWORD", "");
}
/**
* Execute the console command.
*
* @return mixed
*/
public function handle()
{
$pidFile = "/tmp/fetcher";
\pcntl_signal(SIGINT, [$this, "sig_handler"]);
\pcntl_signal(SIGTERM, [$this, "sig_handler"]);
\pcntl_signal(SIGHUP, [$this, "sig_handler"]);
// Redis might not be available now
for ($count = 0; $count < 10; $count++) {
try {
Redis::connection();
break;
} catch (\Exception $e) {
if ($count >= 9) {
// If its not available after 10 seconds we will exit
return;
}
sleep(1);
}
}
touch($pidFile);
if (!file_exists($pidFile)) {
return;
}
try {
while ($this->shouldRun) {
$operationsRunning = true;
curl_multi_exec($this->multicurl, $operationsRunning);
$status = $this->readMultiCurl($this->multicurl);
$answersRead = $status[0];
$messagesLeft = $status[1];
$newJobs = $this->checkNewJobs($operationsRunning, $messagesLeft);
if ($newJobs === 0 && $answersRead === 0) {
usleep(10 * 1000);
}
}
} finally {
unlink($pidFile);
curl_multi_close($this->multicurl);
}
}
/**
* Checks the Redis queue if any new fetch jobs where submitted
* and adds them to multicurl if there are.
* Will be blocking call to redis if there are no running jobs in multicurl
*/
private function checkNewJobs($operationsRunning, $messagesLeft)
{
$newJobs = [];
if ($operationsRunning === 0 && $messagesLeft === -1) {
$newJob = Redis::blpop($this::FETCHQUEUE_KEY, 1);
if (!empty($newJob)) {
$newJobs[] = $newJob[1];
}
} else {
$elements = Redis::pipeline(function ($redis) {
$redis->lrange($this::FETCHQUEUE_KEY, 0, -1);
$redis->del($this::FETCHQUEUE_KEY);
});
$newJobs = $elements[0];
}
$addedJobs = 0;
foreach ($newJobs as $newJob) {
$newJob = json_decode($newJob, true);
$ch = $this->getCurlHandle($newJob);
if (curl_multi_add_handle($this->multicurl, $ch) !== 0) {
$this->shouldRun = false;
Log::error("Couldn't add Handle to multicurl");
break;
} else {
$addedJobs++;
}
}
return $addedJobs;
}
private function readMultiCurl($mc)
{
$messagesLeft = -1;
$answersRead = 0;
while (($info = curl_multi_info_read($mc, $messagesLeft)) !== false) {
try {
$answersRead++;
$infos = curl_getinfo($info["handle"], CURLINFO_PRIVATE);
$infos = explode(";", $infos);
$resulthash = $infos[0];
$cacheDurationMinutes = intval($infos[1]);
if(curl_errno($info["handle"])){
Log::error(curl_error($info["handle"]));
}
$result = $this->parseResponse($info["handle"]);
Redis::pipeline(function ($pipe) use ($resulthash, $result, $cacheDurationMinutes) {
$pipe->lpush($resulthash, json_encode($result));
$pipe->expire($resulthash, 15);
});
if ($cacheDurationMinutes > 0) {
try {
Cache::put($resulthash, $result, $cacheDurationMinutes * 60);
} catch (\Exception $e) {
Log::error($e->getMessage());
}
}
} catch (\Exception $e) {
Log::error($e->getMessage());
} finally {
\curl_multi_remove_handle($mc, $info["handle"]);
}
}
return [$answersRead, $messagesLeft];
}
private function parseResponse($ch)
{
$errorNumber = curl_errno($ch);
if ($errorNumber === CURLE_ABORTED_BY_CALLBACK) {
return [
"error" => $errorNumber,
"message" => curl_error($ch),
];
}
$httpResponse = \curl_multi_getcontent($ch);
if (empty($httpResponse)) {
return null;
}
$headerSize = curl_getinfo($ch, CURLINFO_HEADER_SIZE);
$headers = substr($httpResponse, 0, $headerSize);
$body = substr($httpResponse, $headerSize);
$httpcode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
// Parse HTTP Headers into Key Value array
$headers_arr = array();
$headers_indexed_arr = explode("\r\n", $headers);
array_shift($headers_indexed_arr);
foreach ($headers_indexed_arr as $value) {
if (false !== ($matches = explode(':', $value, 2)) && sizeof($matches) === 2) {
$headers_arr[strtolower("{$matches[0]}")] = trim($matches[1]);
}
}
// Unzip if content-encoding is gzip
if(!empty($headers_arr["content-encoding"]) && strtolower($headers_arr["content-encoding"]) === "gzip"){
$body = gzdecode($body);
}
$sanitizedHeaders = array();
foreach ($headers_arr as $key => $value) {
if (stripos($key, "content-encoding") === false &&
stripos($key, "x-frame-options") === false &&
stripos($key, "content-length") === false &&
stripos($key, "content-security-policy") === false &&
stripos($key, "set-cookie") === false) {
$sanitizedHeaders[$key] = $value;
}
}
$response = [
"http-code" => $httpcode,
"body" => base64_encode($body),
"headers" => $sanitizedHeaders
];
return $response;
}
private function getCurlHandle($job)
{
$ch = curl_init();
curl_setopt_array($ch, array(
CURLOPT_URL => $job["url"],
CURLOPT_PRIVATE => $job["resulthash"] . ";" . $job["cacheDuration"],
CURLOPT_RETURNTRANSFER => 1,
CURLOPT_USERAGENT => $job["useragent"],
CURLOPT_FOLLOWLOCATION => true,
CURLOPT_CONNECTTIMEOUT => 10,
CURLOPT_MAXCONNECTS => 500,
CURLOPT_LOW_SPEED_LIMIT => 50000,
CURLOPT_LOW_SPEED_TIME => 5,
CURLOPT_TIMEOUT => 20,
CURLOPT_HEADER => true,
CURLOPT_NOPROGRESS => false,
CURLOPT_PROGRESSFUNCTION => 'self::progress',
));
if (!empty($job["curlopts"])) {
curl_setopt_array($ch, $job["curlopts"]);
}
if (!empty($this->proxyhost) && !empty($this->proxyport) && !empty($this->proxyuser) && !empty($this->proxypassword)) {
curl_setopt($ch, CURLOPT_PROXY, $this->proxyhost);
curl_setopt($ch, CURLOPT_PROXYUSERPWD, $this->proxyuser . ":" . $this->proxypassword);
curl_setopt($ch, CURLOPT_PROXYPORT, $this->proxyport);
curl_setopt($ch, CURLOPT_PROXYTYPE, CURLPROXY_HTTP);
}
if (!empty($job["username"]) && !empty($job["password"])) {
curl_setopt($ch, CURLOPT_USERPWD, $job["username"] . ":" . $job["password"]);
}
if (!empty($job["headers"]) && sizeof($job["headers"]) > 0) {
$headers = [];
foreach ($job["headers"] as $key => $value) {
$headers[] = $key . ":" . $value;
}
# Headers are in the Form:
# <key>:<value>;<key>:<value>
curl_setopt($ch, CURLOPT_HTTPHEADER, $headers);
}
return $ch;
}
public static function progress($resource, $download_size, $downloaded, $upload_size, $uploaded)
{
// Abort Download if size limit is reached
if ($downloaded > self::SIZE_LIMIT) {
return 1;
}
}
public function sig_handler($sig)
{
$this->shouldRun = false;
echo("Terminating Process\n");
}
}