Searcher.php 7.85 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
<?php

namespace App\Jobs;

use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\DispatchesJobs;
use Illuminate\Support\Facades\Redis;
use Log;

class Searcher implements ShouldQueue
{
15
    use InteractsWithQueue, Queueable, SerializesModels;
16

17
    protected $name, $ch, $pid, $counter, $lastTime, $connectionInfo;
Dominik Hebeler's avatar
Dominik Hebeler committed
18
19
20
    protected $MAX_REQUESTS = 100;
    protected $importantEngines = array("Fastbot", "overture", "overtureAds");
    protected $recheck;
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

    /**
     * Create a new job instance.
     * This is our new Worker/Searcher Class
     * It will take it's name from the sumas.xml as constructor argument
     * Each Searcher is dedicated to one remote server from our supported Searchengines
     * It will listen to a queue in the Redis Database within the handle() method and
     * answer requests to this specific search engine.
     * The curl handle will be left initialized and opened so that we can make full use of
     * keep-alive requests.
     * @return void
     */
    public function __construct($name)
    {
        $this->name = $name;
36
        $this->pid = getmypid();
Dominik Hebeler's avatar
Dominik Hebeler committed
37
        $this->recheck = false;
38
39
40
41
42
43
44
45
46
47
48
49
50
51
        // Submit this worker to the Redis System
        Redis::expire($this->name, 5);
    }

    /**
     * Execute the job.
     *
     * @return void
     */
    public function handle()
    {
        // This Searches is freshly called so we need to initialize the curl handle $ch
        $this->ch = $this->initCurlHandle();
        $this->counter = 0;                 // Counts the number of answered jobs
52
        $time = microtime(true);
53
54
55
        while(true){
            // Update the expire
            Redis::expire($this->name, 5);
Dominik Hebeler's avatar
Dominik Hebeler committed
56
            Redis::expire($this->name . ".stats", 5);
57
58
59
60
61
62
            // One Searcher can handle a ton of requests to the same server
            // Each search to the server of this Searcher will be submitted to a queue
            // stored in redis which has the same name as this searchengine appended by a ".queue"
            // We will perform a blocking pop on this queue so the queue can remain empty for a while 
            // without killing this searcher directly.
            $mission = Redis::blpop($this->name . ".queue", 4);
Dominik Hebeler's avatar
Dominik Hebeler committed
63
            $this->counter++;
64
65
            $this->updateStats(microtime(true) - $time);
            $this->switchToRunning();
66
            // The mission can be empty when blpop hit the timeout
67
            if(!empty($mission)){
68
                $mission = $mission[1];
69
                $poptime = microtime(true) - $time;
70

Dominik Hebeler's avatar
Dominik Hebeler committed
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
                // The mission is a String which can be divided to retrieve two informations:
                // 1. The Hash Value where the result should be stored
                // 2. The Url to Retrieve
                // These two informations are divided by a ";" in the mission string
                $mission = explode(";", $mission);
                $hashValue = $mission[0];
                $url = base64_decode($mission[1]);
                $timeout = $mission[2]; // Timeout from the MetaGer process in ms
                $medianFetchTime = $this->getFetchTime();   // The median Fetch time of the search engine in ms
                Redis::hset('search.' . $hashValue, $this->name, "connected");

                $result = $this->retrieveUrl($url);

                $this->storeResult($result, $poptime, $hashValue);

                // Reset the time of the last Job so we can calculate
                // the time we have spend waiting for a new job
                // We submit that calculation to the Redis systemin the method
                $time = microtime(true);
Dominik Hebeler's avatar
Dominik Hebeler committed
90
91
            }

92
93
94
            // In sync mode every Searcher may only retrieve one result because it would block
            // the execution of the remaining code otherwise:
            if(getenv("QUEUE_DRIVER") === "sync" || $this->counter > $this->MAX_REQUESTS){
Dominik Hebeler's avatar
Dominik Hebeler committed
95
               break;
96
97
98
            } 
        }
        // When we reach this point, time has come for this Searcher to retire
Dominik Hebeler's avatar
Bugfix    
Dominik Hebeler committed
99
        $this->shutdown();
100
101
    }

102
103
104
105
106
107
108
109
110
111
112
113
    private function switchToRunning(){
        /**
        * When a Searcher is initially started the redis value for $this->name is set to "locked"
        * which effectively will prevent new Searchers of this type to be started. (Value is checked by the MetaGer process which starts the Searchers)
        * This is done so the MetaGer processes won't start hundreds of Searchers parallely when under high work load.
        * It will force that Searchers can only be started one after the other.
        * When a new Searcher has served a minimum of three requests we have enough data to decide whether we need even more Searchers.
        * To do so we will then set the redis value for $this->name to "running".
        * There is a case where we don't want new Searchers to be started even if we would need to do so to serve every Request:
        *   When a search engine needs more time to produce search results than the timeout of the MetaGer process, we won't even bother of spawning
        *   more and more Searchers because they would just block free worker processes from serving the important engines which will give results in time.
        **/
Dominik Hebeler's avatar
Dominik Hebeler committed
114
        if($this->counter === 3 || getenv("QUEUE_DRIVER") === "sync"){
115
116
117
118
119
120
121
            # If the MetaGer process waits longer for the results than this Fetcher will probably need to fetch
            # Or if this engine is in the array of important engines which we will always try to serve
            Redis::set($this->name, "running");
            $this->recheck = false;
        }
    }
    private function updateStats($poptime){
Dominik Hebeler's avatar
Dominik Hebeler committed
122
123
124
125
        if($this->connectionInfo !== NULL){
            $connectionInfo = base64_encode(json_encode($this->connectionInfo));
            Redis::hset($this->name . ".stats", $this->pid, $connectionInfo . ";" . $poptime);
        }
126
127
    }

Dominik Hebeler's avatar
Dominik Hebeler committed
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
    private function getFetchTime(){
        $vals = Redis::hgetall($this->name . ".stats");
        if(sizeof($vals) === 0){
            return 0;
        }else{
            $totalTime = 0;
            foreach ($vals as $pid => $value) {
                $time = floatval(json_decode(base64_decode(explode(";", $value)[0]), true)["total_time"]);
                $time *= 1000;  // Transform from seconds to milliseconds
                $totalTime += $time;
            }
            $totalTime /= sizeof($vals);
            return $totalTime;
        }
    }

144
145
146
147
    private function retrieveUrl($url){
        // Set this URL to the Curl handle
        curl_setopt($this->ch, CURLOPT_URL, $url);
        $result = curl_exec($this->ch);
148
        $this->connectionInfo = curl_getinfo($this->ch);
149
150
151
        return $result;
    }

152
    private function storeResult($result, $poptime, $hashValue){
153
        Redis::hset('search.' . $hashValue, $this->name, $result);
154
155
        // After 60 seconds the results should be read by the MetaGer Process and stored in the Cache instead
        Redis::expire('search.' . $hashValue, 60);
156
157
158
        $this->lastTime = microtime(true);
    }

Dominik Hebeler's avatar
Bugfix    
Dominik Hebeler committed
159
    private function shutdown(){
160
161
162
163
164
165
        Redis::hdel($this->name . ".stats", $this->pid);
        if(sizeof(Redis::hgetall($this->name . ".stats")) === 0){
            Redis::del($this->name);
        }
        // We should close our curl handle before we do so
        curl_close($this->ch);
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
    }

    private function initCurlHandle(){
        $ch = curl_init();

        curl_setopt_array($ch, array(
                CURLOPT_RETURNTRANSFER => 1,
                CURLOPT_USERAGENT => "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:40.0) Gecko/20100101 Firefox/40.1",
                CURLOPT_FOLLOWLOCATION => TRUE,
                CURLOPT_CONNECTTIMEOUT => 10,
                CURLOPT_MAXCONNECTS => 500,
                CURLOPT_LOW_SPEED_LIMIT => 500,
                CURLOPT_LOW_SPEED_TIME => 5,
                CURLOPT_TIMEOUT => 10
        ));

        return $ch;
    }
}