<?php namespace App\Console\Commands; use Cache; use Illuminate\Console\Command; use Illuminate\Support\Facades\Redis; use Log; class RequestFetcher extends Command { public const FETCHQUEUE_KEY = "fetcher.queue"; public const SIZE_LIMIT = 2 * 1024 * 1024; // Limit Document Size to 2MB /** * The name and signature of the console command. * * @var string */ protected $signature = 'requests:fetcher'; /** * The console command description. * * @var string */ protected $description = 'This commands fetches requests to the installed search engines'; protected $shouldRun = true; protected $multicurl = null; protected $proxyhost; protected $proxyuser; protected $proxypassword; /** * Create a new command instance. * * @return void */ public function __construct() { parent::__construct(); $this->multicurl = curl_multi_init(); $this->proxyhost = env("PROXY_HOST", ""); $this->proxyport = env("PROXY_PORT", ""); $this->proxyuser = env("PROXY_USER", ""); $this->proxypassword = env("PROXY_PASSWORD", ""); } /** * Execute the console command. * * @return mixed */ public function handle() { $pidFile = "/tmp/fetcher"; \pcntl_signal(SIGINT, [$this, "sig_handler"]); \pcntl_signal(SIGTERM, [$this, "sig_handler"]); \pcntl_signal(SIGHUP, [$this, "sig_handler"]); // Redis might not be available now for ($count = 0; $count < 10; $count++) { try { Redis::connection(); break; } catch (\Exception $e) { if ($count >= 9) { // If its not available after 10 seconds we will exit return; } sleep(1); } } touch($pidFile); if (!file_exists($pidFile)) { return; } try { while ($this->shouldRun) { $operationsRunning = true; curl_multi_exec($this->multicurl, $operationsRunning); $status = $this->readMultiCurl($this->multicurl); $answersRead = $status[0]; $messagesLeft = $status[1]; $newJobs = $this->checkNewJobs($operationsRunning, $messagesLeft); if ($newJobs === 0 && $answersRead === 0) { usleep(10 * 1000); } } } finally { unlink($pidFile); curl_multi_close($this->multicurl); } } /** * Checks the Redis queue if any new fetch jobs where submitted * and adds them to multicurl if there are. * Will be blocking call to redis if there are no running jobs in multicurl */ private function checkNewJobs($operationsRunning, $messagesLeft) { $newJobs = []; if ($operationsRunning === 0 && $messagesLeft === -1) { $newJob = Redis::blpop($this::FETCHQUEUE_KEY, 1); if (!empty($newJob)) { $newJobs[] = $newJob[1]; } } else { $elements = Redis::pipeline(function ($redis) { $redis->lrange($this::FETCHQUEUE_KEY, 0, -1); $redis->del($this::FETCHQUEUE_KEY); }); $newJobs = $elements[0]; } $addedJobs = 0; foreach ($newJobs as $newJob) { $newJob = json_decode($newJob, true); $ch = $this->getCurlHandle($newJob); if (curl_multi_add_handle($this->multicurl, $ch) !== 0) { $this->shouldRun = false; Log::error("Couldn't add Handle to multicurl"); break; } else { $addedJobs++; } } return $addedJobs; } private function readMultiCurl($mc) { $messagesLeft = -1; $answersRead = 0; while (($info = curl_multi_info_read($mc, $messagesLeft)) !== false) { try { $answersRead++; $infos = curl_getinfo($info["handle"], CURLINFO_PRIVATE); $infos = explode(";", $infos); $resulthash = $infos[0]; $cacheDurationMinutes = intval($infos[1]); $responseCode = curl_getinfo($info["handle"], CURLINFO_HTTP_CODE); $body = "no-result"; $totalTime = curl_getinfo($info["handle"], CURLINFO_TOTAL_TIME); $error = curl_error($info["handle"]); if (!empty($error)) { Log::error($error); } $result = $this->parseResponse($info["handle"]); Redis::pipeline(function ($pipe) use ($resulthash, $result, $cacheDurationMinutes) { $pipe->lpush($resulthash, json_encode($result)); $pipe->expire($resulthash, 15); }); if ($cacheDurationMinutes > 0) { try { Cache::put($resulthash, $result, $cacheDurationMinutes * 60); } catch (\Exception $e) { Log::error($e->getMessage()); } } } catch (\Exception $e) { Log::error($e->getMessage()); } finally { \curl_multi_remove_handle($mc, $info["handle"]); } } return [$answersRead, $messagesLeft]; } private function parseResponse($ch) { $errorNumber = curl_errno($ch); if ($errorNumber === CURLE_ABORTED_BY_CALLBACK) { return [ "error" => $errorNumber, "message" => curl_error($ch), ]; } $httpResponse = \curl_multi_getcontent($ch); if (empty($httpResponse)) { return null; } $headerSize = curl_getinfo($ch, CURLINFO_HEADER_SIZE); $headers = substr($httpResponse, 0, $headerSize); $body = substr($httpResponse, $headerSize); $httpcode = curl_getinfo($ch, CURLINFO_HTTP_CODE); // Parse HTTP Headers into Key Value array $headers_arr = array(); $headers_indexed_arr = explode("\r\n", $headers); array_shift($headers_indexed_arr); foreach ($headers_indexed_arr as $value) { if (false !== ($matches = explode(':', $value, 2)) && sizeof($matches) === 2) { $headers_arr[strtolower("{$matches[0]}")] = trim($matches[1]); } } // Unzip if content-encoding is gzip if(strtolower($headers_arr["content-encoding"]) === "gzip"){ $body = gzdecode($body); } $sanitizedHeaders = array(); foreach ($headers_arr as $key => $value) { if (stripos($key, "content-encoding") === false && stripos($key, "x-frame-options") === false && stripos($key, "content-length") === false && stripos($key, "content-security-policy") === false && stripos($key, "set-cookie") === false) { $sanitizedHeaders[$key] = $value; } } $response = [ "http-code" => $httpcode, "body" => base64_encode($body), "headers" => $sanitizedHeaders ]; return $response; } private function getCurlHandle($job) { $ch = curl_init(); curl_setopt_array($ch, array( CURLOPT_URL => $job["url"], CURLOPT_PRIVATE => $job["resulthash"] . ";" . $job["cacheDuration"], CURLOPT_RETURNTRANSFER => 1, CURLOPT_USERAGENT => $job["useragent"], CURLOPT_FOLLOWLOCATION => true, CURLOPT_CONNECTTIMEOUT => 2, CURLOPT_MAXCONNECTS => 500, CURLOPT_LOW_SPEED_LIMIT => 50000, CURLOPT_LOW_SPEED_TIME => 5, CURLOPT_TIMEOUT => 7, CURLOPT_HEADER => true, CURLOPT_NOPROGRESS => false, CURLOPT_PROGRESSFUNCTION => 'self::progress', )); if (!empty($job["curlopts"])) { curl_setopt_array($ch, $job["curlopts"]); } if (!empty($this->proxyhost) && !empty($this->proxyport) && !empty($this->proxyuser) && !empty($this->proxypassword)) { curl_setopt($ch, CURLOPT_PROXY, $this->proxyhost); curl_setopt($ch, CURLOPT_PROXYUSERPWD, $this->proxyuser . ":" . $this->proxypassword); curl_setopt($ch, CURLOPT_PROXYPORT, $this->proxyport); curl_setopt($ch, CURLOPT_PROXYTYPE, CURLPROXY_HTTP); } if (!empty($job["username"]) && !empty($job["password"])) { curl_setopt($ch, CURLOPT_USERPWD, $job["username"] . ":" . $job["password"]); } if (!empty($job["headers"]) && sizeof($job["headers"]) > 0) { $headers = []; foreach ($job["headers"] as $key => $value) { $headers[] = $key . ":" . $value; } # Headers are in the Form: # <key>:<value>;<key>:<value> curl_setopt($ch, CURLOPT_HTTPHEADER, $headers); } return $ch; } public static function progress($resource, $download_size, $downloaded, $upload_size, $uploaded) { // Abort Download if size limit is reached if ($downloaded > self::SIZE_LIMIT) { return 1; } } public function sig_handler($sig) { $this->shouldRun = false; echo("Terminating Process\n"); } }