Skip to content
Snippets Groups Projects
Searcher.php 7.61 KiB
<?php

namespace App\Jobs;

use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\DispatchesJobs;
use Illuminate\Support\Facades\Redis;
use Log;

class Searcher implements ShouldQueue
{
    use InteractsWithQueue, Queueable, SerializesModels;

    protected $name, $ch, $pid, $counter, $lastTime;
    protected $MAX_REQUESTS = 100;
    protected $importantEngines = array("Fastbot", "overture", "overtureAds");
    protected $recheck;

    /**
     * Create a new job instance.
     * This is our new Worker/Searcher Class
     * It will take it's name from the sumas.xml as constructor argument
     * Each Searcher is dedicated to one remote server from our supported Searchengines
     * It will listen to a queue in the Redis Database within the handle() method and
     * answer requests to this specific search engine.
     * The curl handle will be left initialized and opened so that we can make full use of
     * keep-alive requests.
     * @return void
     */
    public function __construct($name)
    {
        $this->name = $name;
        $this->pid = getmypid();
        $this->recheck = false;
        // Submit this worker to the Redis System
        Redis::expire($this->name, 5);
    }

    /**
     * Execute the job.
     *
     * @return void
     */
    public function handle()
    {
        // This Searches is freshly called so we need to initialize the curl handle $ch
        $this->ch = $this->initCurlHandle();
        $this->counter = 0;                 // Counts the number of answered jobs
        $time = microtime(true);
        while(true){
            // Update the expire
            Redis::expire($this->name, 5);
            Redis::expire($this->name . ".stats", 5);
            // One Searcher can handle a ton of requests to the same server
            // Each search to the server of this Searcher will be submitted to a queue
            // stored in redis which has the same name as this searchengine appended by a ".queue"
            // We will perform a blocking pop on this queue so the queue can remain empty for a while 
            // without killing this searcher directly.
            $mission = Redis::blpop($this->name . ".queue", 4);
            $this->counter++;
            $this->updateStats(microtime(true) - $time);
            $this->switchToRunning();
            // The mission can be empty when blpop hit the timeout
            if(empty($mission)){
                continue;
            }else{
                $mission = $mission[1];
                $poptime = microtime(true) - $time;

                // The mission is a String which can be divided to retrieve two informations:
                // 1. The Hash Value where the result should be stored
                // 2. The Url to Retrieve
                // These two informations are divided by a ";" in the mission string
                $mission = explode(";", $mission);
                $hashValue = $mission[0];
                $url = base64_decode($mission[1]);
                $timeout = $mission[2]; // Timeout from the MetaGer process in ms
                $medianFetchTime = $this->getFetchTime();   // The median Fetch time of the search engine in ms
                Redis::hset('search.' . $hashValue, $this->name, "connected");

                $result = $this->retrieveUrl($url);

                $this->storeResult($result, $poptime, $hashValue);

                // Reset the time of the last Job so we can calculate
                // the time we have spend waiting for a new job
                // We submit that calculation to the Redis systemin the method
                $time = microtime(true);
            }

            // In sync mode every Searcher may only retrieve one result because it would block
            // the execution of the remaining code otherwise:
            if(getenv("QUEUE_DRIVER") === "sync" || $this->counter > $this->MAX_REQUESTS){
               break;
            } 
        }
        // When we reach this point, time has come for this Searcher to retire
        $this->shutdown();
    }

    private function switchToRunning(){
        /**
        * When a Searcher is initially started the redis value for $this->name is set to "locked"
        * which effectively will prevent new Searchers of this type to be started. (Value is checked by the MetaGer process which starts the Searchers)
        * This is done so the MetaGer processes won't start hundreds of Searchers parallely when under high work load.
        * It will force that Searchers can only be started one after the other.
        * When a new Searcher has served a minimum of three requests we have enough data to decide whether we need even more Searchers.
        * To do so we will then set the redis value for $this->name to "running".
        * There is a case where we don't want new Searchers to be started even if we would need to do so to serve every Request:
        *   When a search engine needs more time to produce search results than the timeout of the MetaGer process, we won't even bother of spawning
        *   more and more Searchers because they would just block free worker processes from serving the important engines which will give results in time.
        **/
        if($this->counter === 3){
            # If the MetaGer process waits longer for the results than this Fetcher will probably need to fetch
            # Or if this engine is in the array of important engines which we will always try to serve
            Redis::set($this->name, "running");
            $this->recheck = false;
        }
    }
    private function updateStats($poptime){
        $connectionInfo = base64_encode(json_encode(curl_getinfo($this->ch), true));
        Redis::hset($this->name . ".stats", $this->pid, $connectionInfo . ";" . $poptime);
    }

    private function getFetchTime(){
        $vals = Redis::hgetall($this->name . ".stats");
        if(sizeof($vals) === 0){
            return 0;
        }else{
            $totalTime = 0;
            foreach ($vals as $pid => $value) {
                $time = floatval(json_decode(base64_decode(explode(";", $value)[0]), true)["total_time"]);
                $time *= 1000;  // Transform from seconds to milliseconds
                $totalTime += $time;
            }
            $totalTime /= sizeof($vals);
            return $totalTime;
        }
    }

    private function retrieveUrl($url){
        // Set this URL to the Curl handle
        curl_setopt($this->ch, CURLOPT_URL, $url);

        $result = curl_exec($this->ch);
        return $result;
    }

    private function storeResult($result, $poptime, $hashValue){
        Redis::hset('search.' . $hashValue, $this->name, $result);
        $this->lastTime = microtime(true);
    }

    private function shutdown(){
        Redis::hdel($this->name . ".stats", $this->pid);
        if(sizeof(Redis::hgetall($this->name . ".stats")) === 0){
            Redis::del($this->name);
        }
        // We should close our curl handle before we do so
        curl_close($this->ch);
        Log::info("Exiting here!");
    }

    private function initCurlHandle(){
        $ch = curl_init();

        curl_setopt_array($ch, array(
                CURLOPT_RETURNTRANSFER => 1,
                CURLOPT_USERAGENT => "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:40.0) Gecko/20100101 Firefox/40.1",
                CURLOPT_FOLLOWLOCATION => TRUE,
                CURLOPT_CONNECTTIMEOUT => 10,
                CURLOPT_MAXCONNECTS => 500,
                CURLOPT_LOW_SPEED_LIMIT => 500,
                CURLOPT_LOW_SPEED_TIME => 5,
                CURLOPT_TIMEOUT => 10
        ));

        return $ch;
    }
}