Searcher.php 7.74 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
<?php

namespace App\Jobs;

use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\DispatchesJobs;
use Illuminate\Support\Facades\Redis;
use Log;

class Searcher implements ShouldQueue
{
15
    use InteractsWithQueue, Queueable, SerializesModels;
16

17
    protected $name, $ch, $pid, $counter, $lastTime;
Dominik Hebeler's avatar
Dominik Hebeler committed
18
19
20
    protected $MAX_REQUESTS = 100;
    protected $importantEngines = array("Fastbot", "overture", "overtureAds");
    protected $recheck;
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

    /**
     * Create a new job instance.
     * This is our new Worker/Searcher Class
     * It will take it's name from the sumas.xml as constructor argument
     * Each Searcher is dedicated to one remote server from our supported Searchengines
     * It will listen to a queue in the Redis Database within the handle() method and
     * answer requests to this specific search engine.
     * The curl handle will be left initialized and opened so that we can make full use of
     * keep-alive requests.
     * @return void
     */
    public function __construct($name)
    {
        $this->name = $name;
36
        $this->pid = getmypid();
Dominik Hebeler's avatar
Dominik Hebeler committed
37
        $this->recheck = false;
38
39
40
41
42
43
44
45
46
47
48
49
50
51
        // Submit this worker to the Redis System
        Redis::expire($this->name, 5);
    }

    /**
     * Execute the job.
     *
     * @return void
     */
    public function handle()
    {
        // This Searches is freshly called so we need to initialize the curl handle $ch
        $this->ch = $this->initCurlHandle();
        $this->counter = 0;                 // Counts the number of answered jobs
52
        $time = microtime(true);
53
54
55
        while(true){
            // Update the expire
            Redis::expire($this->name, 5);
Dominik Hebeler's avatar
Dominik Hebeler committed
56
            Redis::expire($this->name . ".stats", 5);
57
58
59
60
61
62
            // One Searcher can handle a ton of requests to the same server
            // Each search to the server of this Searcher will be submitted to a queue
            // stored in redis which has the same name as this searchengine appended by a ".queue"
            // We will perform a blocking pop on this queue so the queue can remain empty for a while 
            // without killing this searcher directly.
            $mission = Redis::blpop($this->name . ".queue", 4);
Dominik Hebeler's avatar
Dominik Hebeler committed
63
            $this->counter++;
64
65
            // The mission can be empty when blpop hit the timeout
            if(empty($mission)){
Dominik Hebeler's avatar
Bugfix    
Dominik Hebeler committed
66
                continue;
67
68
            }else{
                $mission = $mission[1];
69
                $poptime = microtime(true) - $time;
70

Dominik Hebeler's avatar
Dominik Hebeler committed
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
                // The mission is a String which can be divided to retrieve two informations:
                // 1. The Hash Value where the result should be stored
                // 2. The Url to Retrieve
                // These two informations are divided by a ";" in the mission string
                $mission = explode(";", $mission);
                $hashValue = $mission[0];
                $url = base64_decode($mission[1]);
                $timeout = $mission[2]; // Timeout from the MetaGer process in ms
                $medianFetchTime = $this->getFetchTime();   // The median Fetch time of the search engine in ms
                Redis::hset('search.' . $hashValue, $this->name, "connected");

                $result = $this->retrieveUrl($url);

                $this->storeResult($result, $poptime, $hashValue);

                /**
                * When a Searcher is initially started the redis value for $this->name is set to "locked"
                * which effectively will prevent new Searchers of this type to be started. (Value is checked by the MetaGer process which starts the Searchers)
                * This is done so the MetaGer processes won't start hundreds of Searchers parallely when under high work load.
                * It will force that Searchers can only be started one after the other.
                * When a new Searcher has served a minimum of three requests we have enough data to decide whether we need even more Searchers.
                * To do so we will then set the redis value for $this->name to "running".
                * There is a case where we don't want new Searchers to be started even if we would need to do so to serve every Request:
                *   When a search engine needs more time to produce search results than the timeout of the MetaGer process, we won't even bother of spawning
                *   more and more Searchers because they would just block free worker processes from serving the important engines which will give results in time.
                **/
                if($this->counter === 3 || $this->recheck){
                    # If the MetaGer process waits longer for the results than this Fetcher will probably need to fetch
                    # Or if this engine is in the array of important engines which we will always try to serve
                    if($timeout >= $medianFetchTime || in_array($this->name, $this->importantEngines)){
                        Redis::set($this->name, "running");
                        $this->recheck = false;
                    }else{
                        $this->recheck = true;
                    }
                }

                // Reset the time of the last Job so we can calculate
                // the time we have spend waiting for a new job
                // We submit that calculation to the Redis systemin the method
                $time = microtime(true);
Dominik Hebeler's avatar
Dominik Hebeler committed
112
113
            }

114
115
116
            // In sync mode every Searcher may only retrieve one result because it would block
            // the execution of the remaining code otherwise:
            if(getenv("QUEUE_DRIVER") === "sync" || $this->counter > $this->MAX_REQUESTS){
Dominik Hebeler's avatar
Dominik Hebeler committed
117
               break;
118
119
120
            } 
        }
        // When we reach this point, time has come for this Searcher to retire
Dominik Hebeler's avatar
Bugfix    
Dominik Hebeler committed
121
        $this->shutdown();
122
123
    }

Dominik Hebeler's avatar
Dominik Hebeler committed
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
    private function getFetchTime(){
        $vals = Redis::hgetall($this->name . ".stats");
        if(sizeof($vals) === 0){
            return 0;
        }else{
            $totalTime = 0;
            foreach ($vals as $pid => $value) {
                $time = floatval(json_decode(base64_decode(explode(";", $value)[0]), true)["total_time"]);
                $time *= 1000;  // Transform from seconds to milliseconds
                $totalTime += $time;
            }
            $totalTime /= sizeof($vals);
            return $totalTime;
        }
    }

140
141
142
143
144
145
146
147
    private function retrieveUrl($url){
        // Set this URL to the Curl handle
        curl_setopt($this->ch, CURLOPT_URL, $url);

        $result = curl_exec($this->ch);
        return $result;
    }

148
    private function storeResult($result, $poptime, $hashValue){
149
        Redis::hset('search.' . $hashValue, $this->name, $result);
150
151
152
153
154
        $connectionInfo = base64_encode(json_encode(curl_getinfo($this->ch), true));
        Redis::hset($this->name . ".stats", $this->pid, $connectionInfo . ";" . $poptime);
        $this->lastTime = microtime(true);
    }

Dominik Hebeler's avatar
Bugfix    
Dominik Hebeler committed
155
    private function shutdown(){
156
157
158
159
160
161
        Redis::hdel($this->name . ".stats", $this->pid);
        if(sizeof(Redis::hgetall($this->name . ".stats")) === 0){
            Redis::del($this->name);
        }
        // We should close our curl handle before we do so
        curl_close($this->ch);
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
    }

    private function initCurlHandle(){
        $ch = curl_init();

        curl_setopt_array($ch, array(
                CURLOPT_RETURNTRANSFER => 1,
                CURLOPT_USERAGENT => "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:40.0) Gecko/20100101 Firefox/40.1",
                CURLOPT_FOLLOWLOCATION => TRUE,
                CURLOPT_CONNECTTIMEOUT => 10,
                CURLOPT_MAXCONNECTS => 500,
                CURLOPT_LOW_SPEED_LIMIT => 500,
                CURLOPT_LOW_SPEED_TIME => 5,
                CURLOPT_TIMEOUT => 10
        ));

        return $ch;
    }
}