Searcher.php 9.94 KB
Newer Older
1
2
3
4
5
6
<?php

namespace App\Jobs;

use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
Dominik Hebeler's avatar
Dominik Hebeler committed
7
8
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
9
10
11
12
use Illuminate\Support\Facades\Redis;

class Searcher implements ShouldQueue
{
13
    use InteractsWithQueue, Queueable, SerializesModels;
14

15
    protected $name, $ch, $pid, $counter, $lastTime, $connectionInfo, $user, $password, $headers;
16
    # Each Searcher will shutdown after a specified time(s) or number of requests
Dominik Hebeler's avatar
Dominik Hebeler committed
17
    protected $MAX_REQUESTS = 100;
18
19
20
    # This value should always be below the retry_after value in config/queue.php
    protected $MAX_TIME = 240;
    protected $startTime = null;
Dominik Hebeler's avatar
Dominik Hebeler committed
21
22
    protected $importantEngines = array("Fastbot", "overture", "overtureAds");
    protected $recheck;
23
24
25
26
27
28
29
30
31
32
33
34

    /**
     * Create a new job instance.
     * This is our new Worker/Searcher Class
     * It will take it's name from the sumas.xml as constructor argument
     * Each Searcher is dedicated to one remote server from our supported Searchengines
     * It will listen to a queue in the Redis Database within the handle() method and
     * answer requests to this specific search engine.
     * The curl handle will be left initialized and opened so that we can make full use of
     * keep-alive requests.
     * @return void
     */
35
    public function __construct($name, $user = null, $password = null, $headers = null)
36
37
    {
        $this->name = $name;
38
        $this->pid = getmypid();
Dominik Hebeler's avatar
Dominik Hebeler committed
39
        $this->recheck = false;
40
        $this->startTime = microtime(true);
Dominik Hebeler's avatar
Dominik Hebeler committed
41
42
        $this->user = $user;
        $this->password = $password;
43
        $this->headers = $headers;
44
45
46
47
48
49
50
51
52
53
54
55
56
        // Submit this worker to the Redis System
        Redis::expire($this->name, 5);
    }

    /**
     * Execute the job.
     *
     * @return void
     */
    public function handle()
    {
        // This Searches is freshly called so we need to initialize the curl handle $ch
        $this->ch = $this->initCurlHandle();
Dominik Hebeler's avatar
Dominik Hebeler committed
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
        try {
            $this->counter = 0; // Counts the number of answered jobs
            $time = microtime(true);
            while (true) {
                // Update the expire
                Redis::expire($this->name, 5);
                Redis::expire($this->name . ".stats", 5);
                // One Searcher can handle a ton of requests to the same server
                // Each search to the server of this Searcher will be submitted to a queue
                // stored in redis which has the same name as this searchengine appended by a ".queue"
                // We will perform a blocking pop on this queue so the queue can remain empty for a while
                // without killing this searcher directly.
                $mission = Redis::blpop($this->name . ".queue", 4);
                $this->counter++;
                $this->updateStats(microtime(true) - $time);
                $this->switchToRunning();
                // The mission can be empty when blpop hit the timeout
                if (!empty($mission)) {
                    $mission = $mission[1];
                    $poptime = microtime(true) - $time;

                    // The mission is a String which can be divided to retrieve three informations:
                    // 1. The Hash Value where the result should be stored
                    // 2. The Url to Retrieve
                    // 3. The maximum time to take
                    // These three informations are divided by a ";" in the mission string
                    $mission = explode(";", $mission);
                    $hashValue = $mission[0]; // The hash value for redis to store the results under
                    $url = base64_decode($mission[1]); // The url to fetch
                    $timeout = $mission[2]; // Timeout from the MetaGer process in ms
                    $medianFetchTime = $this->getFetchTime(); // The median Fetch time of the search engine in ms
88
                    Redis::hset('search.' . $hashValue . ".results." . $this->name, "status", "connected");
Dominik Hebeler's avatar
Dominik Hebeler committed
89
90
91
92
93
94
95
96
97
98
99
100
                    $result = $this->retrieveUrl($url);

                    $this->storeResult($result, $poptime, $hashValue);

                    // Reset the time of the last Job so we can calculate
                    // the time we have spend waiting for a new job
                    // We submit that calculation to the Redis systemin the method
                    $time = microtime(true);
                }

                // In sync mode every Searcher may only retrieve one result because it would block
                // the execution of the remaining code otherwise:
101
                if (getenv("QUEUE_CONNECTION") === "sync"
Dominik Hebeler's avatar
Dominik Hebeler committed
102
103
104
105
                    || $this->counter > $this->MAX_REQUESTS
                    || (microtime(true) - $this->startTime) > $this->MAX_TIME) {
                    break;
                }
Dominik Hebeler's avatar
Dominik Hebeler committed
106
            }
Dominik Hebeler's avatar
Dominik Hebeler committed
107
108
109
        } finally {
            // When we reach this point, time has come for this Searcher to retire
            $this->shutdown();
110
111
112
        }
    }

Dominik Hebeler's avatar
Dominik Hebeler committed
113
114
    private function switchToRunning()
    {
115
        /**
Dominik Hebeler's avatar
Dominik Hebeler committed
116
117
118
119
120
121
122
123
124
125
126
         * When a Searcher is initially started the redis value for $this->name is set to "locked"
         * which effectively will prevent new Searchers of this type to be started. (Value is checked by the MetaGer process which starts the Searchers)
         * This is done so the MetaGer processes won't start hundreds of Searchers parallely when under high work load.
         * It will force that Searchers can only be started one after the other.
         * When a new Searcher has served a minimum of three requests we have enough data to decide whether we need even more Searchers.
         * To do so we will then set the redis value for $this->name to "running".
         * There is a case where we don't want new Searchers to be started even if we would need to do so to serve every Request:
         *   When a search engine needs more time to produce search results than the timeout of the MetaGer process, we won't even bother of spawning
         *   more and more Searchers because they would just block free worker processes from serving the important engines which will give results in time.
         **/
        if ($this->counter === 3 || getenv("QUEUE_DRIVER") === "sync") {
127
128
129
130
131
132
            # If the MetaGer process waits longer for the results than this Fetcher will probably need to fetch
            # Or if this engine is in the array of important engines which we will always try to serve
            Redis::set($this->name, "running");
            $this->recheck = false;
        }
    }
Dominik Hebeler's avatar
Dominik Hebeler committed
133
134
135
    private function updateStats($poptime)
    {
        if ($this->connectionInfo !== null) {
Dominik Hebeler's avatar
Dominik Hebeler committed
136
137
138
            $connectionInfo = base64_encode(json_encode($this->connectionInfo));
            Redis::hset($this->name . ".stats", $this->pid, $connectionInfo . ";" . $poptime);
        }
139
140
    }

Dominik Hebeler's avatar
Dominik Hebeler committed
141
142
    private function getFetchTime()
    {
Dominik Hebeler's avatar
Dominik Hebeler committed
143
        $vals = Redis::hgetall($this->name . ".stats");
Dominik Hebeler's avatar
Dominik Hebeler committed
144
        if (sizeof($vals) === 0) {
Dominik Hebeler's avatar
Dominik Hebeler committed
145
            return 0;
Dominik Hebeler's avatar
Dominik Hebeler committed
146
        } else {
Dominik Hebeler's avatar
Dominik Hebeler committed
147
148
149
            $totalTime = 0;
            foreach ($vals as $pid => $value) {
                $time = floatval(json_decode(base64_decode(explode(";", $value)[0]), true)["total_time"]);
Dominik Hebeler's avatar
Dominik Hebeler committed
150
                $time *= 1000; // Transform from seconds to milliseconds
Dominik Hebeler's avatar
Dominik Hebeler committed
151
152
153
154
155
156
157
                $totalTime += $time;
            }
            $totalTime /= sizeof($vals);
            return $totalTime;
        }
    }

Dominik Hebeler's avatar
Dominik Hebeler committed
158
159
    private function retrieveUrl($url)
    {
160
161
162
        // Set this URL to the Curl handle
        curl_setopt($this->ch, CURLOPT_URL, $url);
        $result = curl_exec($this->ch);
163
        $this->connectionInfo = curl_getinfo($this->ch);
164
165
166
        return $result;
    }

Dominik Hebeler's avatar
Dominik Hebeler committed
167
168
    private function storeResult($result, $poptime, $hashValue)
    {
169
170
        $redis = Redis::connection(env('REDIS_RESULT_CONNECTION'));
        $pipeline = $redis->pipeline();
171
        $pipeline->hset('search.' . $hashValue . ".results." . $this->name, "response", $result);
172
173
        $pipeline->hset('search.' . $hashValue . ".results." . $this->name, "delivered", "0");
        $pipeline->hincrby('search.' . $hashValue . ".results.status", "engineAnswered", 1);
174
        // After 60 seconds the results should be read by the MetaGer Process and stored in the Cache instead
175
        $pipeline->expire('search.' . $hashValue . ".results." . $this->name, env('REDIS_RESULT_CACHE_DURATION'));
176
        $pipeline->rpush('search.' . $hashValue . ".ready", $this->name);
177
        $pipeline->expire('search.' . $hashValue . ".ready", env('REDIS_RESULT_CACHE_DURATION'));
178
        $pipeline->sadd('search.' . $hashValue . ".engines", $this->name);
179
        $pipeline->expire('search.' . $hashValue . ".engines", env('REDIS_RESULT_CACHE_DURATION'));
180
        $pipeline->execute();
181
182
183
        $this->lastTime = microtime(true);
    }

Dominik Hebeler's avatar
Dominik Hebeler committed
184
185
    private function shutdown()
    {
186
        Redis::hdel($this->name . ".stats", $this->pid);
Dominik Hebeler's avatar
Dominik Hebeler committed
187
        if (sizeof(Redis::hgetall($this->name . ".stats")) === 0) {
188
189
190
191
            Redis::del($this->name);
        }
        // We should close our curl handle before we do so
        curl_close($this->ch);
192
193
    }

Dominik Hebeler's avatar
Dominik Hebeler committed
194
195
    private function initCurlHandle()
    {
196
197
198
        $ch = curl_init();

        curl_setopt_array($ch, array(
Dominik Hebeler's avatar
Dominik Hebeler committed
199
200
201
202
203
204
205
206
            CURLOPT_RETURNTRANSFER => 1,
            CURLOPT_USERAGENT => "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:40.0) Gecko/20100101 Firefox/40.1",
            CURLOPT_FOLLOWLOCATION => true,
            CURLOPT_CONNECTTIMEOUT => 10,
            CURLOPT_MAXCONNECTS => 500,
            CURLOPT_LOW_SPEED_LIMIT => 500,
            CURLOPT_LOW_SPEED_TIME => 5,
            CURLOPT_TIMEOUT => 10,
207
208
        ));

Dominik Hebeler's avatar
Dominik Hebeler committed
209
210
211
212
        if ($this->user !== null && $this->password !== null) {
            curl_setopt($ch, CURLOPT_USERPWD, $this->user . ":" . $this->password);
        }

213
        if ($this->headers !== null) {
214
215
216
217
            $headers = [];
            foreach ($this->headers as $key => $value) {
                $headers[] = $key . ":" . $value;
            }
218
219
            # Headers are in the Form:
            # <key>:<value>;<key>:<value>
220
            curl_setopt($ch, CURLOPT_HTTPHEADER, $headers);
221
222
        }

223
224
225
        return $ch;
    }
}