Skip to content
Snippets Groups Projects
RequestFetcher.php 9.48 KiB
Newer Older
  • Learn to ignore specific revisions
  • <?php
    
    namespace App\Console\Commands;
    
    use Cache;
    use Illuminate\Console\Command;
    use Illuminate\Support\Facades\Redis;
    use Log;
    
    class RequestFetcher extends Command
    {
    
        public const FETCHQUEUE_KEY = "fetcher.queue";
    
        public const SIZE_LIMIT = 2 * 1024 * 1024; // Limit Document Size to 2MB
    
        /**
         * The name and signature of the console command.
         *
         * @var string
         */
        protected $signature = 'requests:fetcher';
    
        /**
         * The console command description.
         *
         * @var string
         */
        protected $description = 'This commands fetches requests to the installed search engines';
    
        protected $shouldRun = true;
        protected $multicurl = null;
        protected $proxyhost;
        protected $proxyuser;
        protected $proxypassword;
    
        /**
         * Create a new command instance.
         *
         * @return void
         */
        public function __construct()
        {
            parent::__construct();
            $this->multicurl = curl_multi_init();
            $this->proxyhost = env("PROXY_HOST", "");
            $this->proxyport = env("PROXY_PORT", "");
            $this->proxyuser = env("PROXY_USER", "");
            $this->proxypassword = env("PROXY_PASSWORD", "");
        }
    
        /**
         * Execute the console command.
         *
         * @return mixed
         */
        public function handle()
        {
            $pidFile = "/tmp/fetcher";
            \pcntl_signal(SIGINT, [$this, "sig_handler"]);
            \pcntl_signal(SIGTERM, [$this, "sig_handler"]);
            \pcntl_signal(SIGHUP, [$this, "sig_handler"]);
    
            // Redis might not be available now
            for ($count = 0; $count < 10; $count++) {
                try {
                    Redis::connection();
                    break;
    
    Dominik Hebeler's avatar
    Dominik Hebeler committed
                } catch (\Exception $e) {
    
                    if ($count >= 9) {
                        // If its not available after 10 seconds we will exit
                        return;
                    }
                    sleep(1);
                }
            }
    
            touch($pidFile);
    
            if (!file_exists($pidFile)) {
                return;
            }
    
            try {
                while ($this->shouldRun) {
                    $operationsRunning = true;
                    curl_multi_exec($this->multicurl, $operationsRunning);
                    $status = $this->readMultiCurl($this->multicurl);
                    $answersRead = $status[0];
                    $messagesLeft = $status[1];
                    $newJobs = $this->checkNewJobs($operationsRunning, $messagesLeft);
                    
                    if ($newJobs === 0 && $answersRead === 0) {
                        usleep(10 * 1000);
                    }
                }
            } finally {
                unlink($pidFile);
                curl_multi_close($this->multicurl);
            }
        }
    
        /**
         * Checks the Redis queue if any new fetch jobs where submitted
         * and adds them to multicurl if there are.
         * Will be blocking call to redis if there are no running jobs in multicurl
         */
        private function checkNewJobs($operationsRunning, $messagesLeft)
        {
            $newJobs = [];
            if ($operationsRunning === 0 && $messagesLeft === -1) {
    
                $newJob = Redis::blpop($this::FETCHQUEUE_KEY, 1);
    
                if (!empty($newJob)) {
                    $newJobs[] = $newJob[1];
                }
            } else {
                $elements = Redis::pipeline(function ($redis) {
    
                    $redis->lrange($this::FETCHQUEUE_KEY, 0, -1);
                    $redis->del($this::FETCHQUEUE_KEY);
    
                });
                $newJobs = $elements[0];
            }
    
            $addedJobs = 0;
            foreach ($newJobs as $newJob) {
                $newJob = json_decode($newJob, true);
                $ch = $this->getCurlHandle($newJob);
                if (curl_multi_add_handle($this->multicurl, $ch) !== 0) {
                    $this->shouldRun = false;
                    Log::error("Couldn't add Handle to multicurl");
                    break;
                } else {
                    $addedJobs++;
                }
            }
    
            return $addedJobs;
        }
    
        private function readMultiCurl($mc)
        {
            $messagesLeft = -1;
            $answersRead = 0;
            while (($info = curl_multi_info_read($mc, $messagesLeft)) !== false) {
                try {
                    $answersRead++;
                    $infos = curl_getinfo($info["handle"], CURLINFO_PRIVATE);
                    $infos = explode(";", $infos);
                    $resulthash = $infos[0];
                    $cacheDurationMinutes = intval($infos[1]);
                    $responseCode = curl_getinfo($info["handle"], CURLINFO_HTTP_CODE);
                    $body = "no-result";
    
                    $totalTime = curl_getinfo($info["handle"], CURLINFO_TOTAL_TIME);
    
                    $error = curl_error($info["handle"]);
                    if (!empty($error)) {
                        Log::error($error);
    
    Dominik Hebeler's avatar
    Dominik Hebeler committed
                    }
    
                    $result = $this->parseResponse($info["handle"]);
    
    Davide Aprea's avatar
    Davide Aprea committed
                    
    
                    Redis::pipeline(function ($pipe) use ($resulthash, $result, $cacheDurationMinutes) {
                        $pipe->lpush($resulthash, json_encode($result));
    
    Dominik Hebeler's avatar
    Dominik Hebeler committed
                        $pipe->expire($resulthash, 15);
    
                    });
    
                    if ($cacheDurationMinutes > 0) {
                        try {
    
                            Cache::put($resulthash, $result, $cacheDurationMinutes * 60);
    
                        } catch (\Exception $e) {
                            Log::error($e->getMessage());
                        }
                    }
    
    Davide Aprea's avatar
    Davide Aprea committed
                } catch (\Exception $e) {
                    Log::error($e->getMessage());
    
                } finally {
                    \curl_multi_remove_handle($mc, $info["handle"]);
                }
            }
            return [$answersRead, $messagesLeft];
        }
    
    
    Dominik Hebeler's avatar
    Dominik Hebeler committed
        private function parseResponse($ch)
        {
    
            $errorNumber = curl_errno($ch);
            if ($errorNumber === CURLE_ABORTED_BY_CALLBACK) {
                return [
                    "error" => $errorNumber,
                    "message" => curl_error($ch),
                ];
            }
    
            $httpResponse = \curl_multi_getcontent($ch);
        
    
    Dominik Hebeler's avatar
    Dominik Hebeler committed
            if (empty($httpResponse)) {
    
                return null;
            }
            $headerSize = curl_getinfo($ch, CURLINFO_HEADER_SIZE);
            $headers = substr($httpResponse, 0, $headerSize);
            $body = substr($httpResponse, $headerSize);
            $httpcode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
    
            // Parse HTTP Headers into Key Value array
            $headers_arr = array();
            $headers_indexed_arr = explode("\r\n", $headers);
            array_shift($headers_indexed_arr);
            foreach ($headers_indexed_arr as $value) {
    
    Dominik Hebeler's avatar
    Dominik Hebeler committed
                if (false !== ($matches = explode(':', $value, 2)) && sizeof($matches) === 2) {
                    $headers_arr[strtolower("{$matches[0]}")] = trim($matches[1]);
                }
    
            // Unzip if content-encoding is gzip
            if(strtolower($headers_arr["content-encoding"]) === "gzip"){
                $body = gzdecode($body);
            }
    
    
            $sanitizedHeaders = array();
    
    Dominik Hebeler's avatar
    Dominik Hebeler committed
            foreach ($headers_arr as $key => $value) {
                if (stripos($key, "content-encoding") === false &&
    
                stripos($key, "x-frame-options") === false &&
    
    Dominik Hebeler's avatar
    Dominik Hebeler committed
                stripos($key, "content-length") === false &&
    
                stripos($key, "content-security-policy") === false &&
                stripos($key, "set-cookie") === false) {
    
                    $sanitizedHeaders[$key] = $value;
                }
            }
            
            $response = [
                "http-code" => $httpcode,
                "body" => base64_encode($body),
                "headers" => $sanitizedHeaders
            ];
    
            return $response;
        }
    
    
        private function getCurlHandle($job)
        {
            $ch = curl_init();
    
            curl_setopt_array($ch, array(
                CURLOPT_URL => $job["url"],
    
                CURLOPT_PRIVATE => $job["resulthash"] . ";" . $job["cacheDuration"],
    
                CURLOPT_RETURNTRANSFER => 1,
                CURLOPT_USERAGENT => $job["useragent"],
                CURLOPT_FOLLOWLOCATION => true,
                CURLOPT_CONNECTTIMEOUT => 2,
                CURLOPT_MAXCONNECTS => 500,
                CURLOPT_LOW_SPEED_LIMIT => 50000,
                CURLOPT_LOW_SPEED_TIME => 5,
                CURLOPT_TIMEOUT => 7,
    
                CURLOPT_HEADER => true,
    
                CURLOPT_NOPROGRESS => false,
                CURLOPT_PROGRESSFUNCTION => 'self::progress',
    
            ));
    
            if (!empty($job["curlopts"])) {
                curl_setopt_array($ch, $job["curlopts"]);
            }
    
            if (!empty($this->proxyhost) && !empty($this->proxyport) && !empty($this->proxyuser) && !empty($this->proxypassword)) {
                curl_setopt($ch, CURLOPT_PROXY, $this->proxyhost);
                curl_setopt($ch, CURLOPT_PROXYUSERPWD, $this->proxyuser . ":" . $this->proxypassword);
                curl_setopt($ch, CURLOPT_PROXYPORT, $this->proxyport);
                curl_setopt($ch, CURLOPT_PROXYTYPE, CURLPROXY_HTTP);
            }
    
            if (!empty($job["username"]) && !empty($job["password"])) {
                curl_setopt($ch, CURLOPT_USERPWD, $job["username"] . ":" . $job["password"]);
            }
    
            if (!empty($job["headers"]) && sizeof($job["headers"]) > 0) {
                $headers = [];
                foreach ($job["headers"] as $key => $value) {
                    $headers[] = $key . ":" . $value;
                }
                # Headers are in the Form:
                # <key>:<value>;<key>:<value>
                curl_setopt($ch, CURLOPT_HTTPHEADER, $headers);
            }
    
            return $ch;
        }
    
    
        public static function progress($resource, $download_size, $downloaded, $upload_size, $uploaded)
        {
            // Abort Download if size limit is reached
            if ($downloaded > self::SIZE_LIMIT) {
                return 1;
            }
        }
    
    
        public function sig_handler($sig)
        {
            $this->shouldRun = false;
            echo("Terminating Process\n");
        }
    }