Searcher.php 10.8 KB
Newer Older
1
2
3
4
5
6
<?php

namespace App\Jobs;

use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
Dominik Hebeler's avatar
Dominik Hebeler committed
7
8
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
9
10
11
12
use Illuminate\Support\Facades\Redis;

class Searcher implements ShouldQueue
{
13
    use InteractsWithQueue, Queueable, SerializesModels;
14

15
    public $tries = 1;
16
17
18
19
20
21
    /**
     * The number of seconds the job can run before timing out.
     *
     * @var int
     */
    public $timeout = 300;
22

23
    protected $name, $ch, $pid, $counter, $lastTime, $connectionInfo, $user, $password, $headers;
24
    protected $proxyhost, $proxyuser, $proxypassword;
25
    # Each Searcher will shutdown after a specified time(s) or number of requests
Dominik Hebeler's avatar
Dominik Hebeler committed
26
    protected $MAX_REQUESTS = 100;
27
28
29
    # This value should always be below the retry_after value in config/queue.php
    protected $MAX_TIME = 240;
    protected $startTime = null;
Dominik Hebeler's avatar
Dominik Hebeler committed
30
31
    protected $importantEngines = array("Fastbot", "overture", "overtureAds");
    protected $recheck;
32
33
34
35
36
37
38
39
40
41
42
43

    /**
     * Create a new job instance.
     * This is our new Worker/Searcher Class
     * It will take it's name from the sumas.xml as constructor argument
     * Each Searcher is dedicated to one remote server from our supported Searchengines
     * It will listen to a queue in the Redis Database within the handle() method and
     * answer requests to this specific search engine.
     * The curl handle will be left initialized and opened so that we can make full use of
     * keep-alive requests.
     * @return void
     */
44
    public function __construct($name, $user = null, $password = null, $headers = null)
45
46
    {
        $this->name = $name;
47
        $this->pid = getmypid();
Dominik Hebeler's avatar
Dominik Hebeler committed
48
        $this->recheck = false;
49
        $this->startTime = microtime(true);
Dominik Hebeler's avatar
Dominik Hebeler committed
50
51
        $this->user = $user;
        $this->password = $password;
52
        $this->headers = $headers;
53
54
55
56
        $this->proxyhost = env("PROXY_HOST", "");
        $this->proxyport = env("PROXY_PORT", "");
        $this->proxyuser = env("PROXY_USER", "");
        $this->proxypassword = env("PROXY_PASSWORD", "");
57
58
59
60
61
62
63
64
65
66
67
68
69
        // Submit this worker to the Redis System
        Redis::expire($this->name, 5);
    }

    /**
     * Execute the job.
     *
     * @return void
     */
    public function handle()
    {
        // This Searches is freshly called so we need to initialize the curl handle $ch
        $this->ch = $this->initCurlHandle();
Dominik Hebeler's avatar
Dominik Hebeler committed
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
        try {
            $this->counter = 0; // Counts the number of answered jobs
            $time = microtime(true);
            while (true) {
                // Update the expire
                Redis::expire($this->name, 5);
                Redis::expire($this->name . ".stats", 5);
                // One Searcher can handle a ton of requests to the same server
                // Each search to the server of this Searcher will be submitted to a queue
                // stored in redis which has the same name as this searchengine appended by a ".queue"
                // We will perform a blocking pop on this queue so the queue can remain empty for a while
                // without killing this searcher directly.
                $mission = Redis::blpop($this->name . ".queue", 4);
                $this->counter++;
                $this->updateStats(microtime(true) - $time);
                $this->switchToRunning();
                // The mission can be empty when blpop hit the timeout
                if (!empty($mission)) {
                    $mission = $mission[1];
                    $poptime = microtime(true) - $time;

                    // The mission is a String which can be divided to retrieve three informations:
                    // 1. The Hash Value where the result should be stored
                    // 2. The Url to Retrieve
                    // 3. The maximum time to take
                    // These three informations are divided by a ";" in the mission string
                    $mission = explode(";", $mission);
                    $hashValue = $mission[0]; // The hash value for redis to store the results under
                    $url = base64_decode($mission[1]); // The url to fetch
                    $timeout = $mission[2]; // Timeout from the MetaGer process in ms
                    $medianFetchTime = $this->getFetchTime(); // The median Fetch time of the search engine in ms
101
                    Redis::hset('search.' . $hashValue . ".results." . $this->name, "status", "connected");
Dominik Hebeler's avatar
Dominik Hebeler committed
102
103
104
105
106
107
108
109
110
111
112
113
                    $result = $this->retrieveUrl($url);

                    $this->storeResult($result, $poptime, $hashValue);

                    // Reset the time of the last Job so we can calculate
                    // the time we have spend waiting for a new job
                    // We submit that calculation to the Redis systemin the method
                    $time = microtime(true);
                }

                // In sync mode every Searcher may only retrieve one result because it would block
                // the execution of the remaining code otherwise:
114
                if (getenv("QUEUE_CONNECTION") === "sync"
Dominik Hebeler's avatar
Dominik Hebeler committed
115
116
117
118
                    || $this->counter > $this->MAX_REQUESTS
                    || (microtime(true) - $this->startTime) > $this->MAX_TIME) {
                    break;
                }
Dominik Hebeler's avatar
Dominik Hebeler committed
119
            }
Dominik Hebeler's avatar
Dominik Hebeler committed
120
121
122
        } finally {
            // When we reach this point, time has come for this Searcher to retire
            $this->shutdown();
123
124
125
        }
    }

Dominik Hebeler's avatar
Dominik Hebeler committed
126
127
    private function switchToRunning()
    {
128
        /**
Dominik Hebeler's avatar
Dominik Hebeler committed
129
130
131
132
133
134
135
136
137
138
         * When a Searcher is initially started the redis value for $this->name is set to "locked"
         * which effectively will prevent new Searchers of this type to be started. (Value is checked by the MetaGer process which starts the Searchers)
         * This is done so the MetaGer processes won't start hundreds of Searchers parallely when under high work load.
         * It will force that Searchers can only be started one after the other.
         * When a new Searcher has served a minimum of three requests we have enough data to decide whether we need even more Searchers.
         * To do so we will then set the redis value for $this->name to "running".
         * There is a case where we don't want new Searchers to be started even if we would need to do so to serve every Request:
         *   When a search engine needs more time to produce search results than the timeout of the MetaGer process, we won't even bother of spawning
         *   more and more Searchers because they would just block free worker processes from serving the important engines which will give results in time.
         **/
139
        if ($this->counter === 3 || getenv("QUEUE_CONNECTION") === "sync") {
140
141
142
143
144
145
            # If the MetaGer process waits longer for the results than this Fetcher will probably need to fetch
            # Or if this engine is in the array of important engines which we will always try to serve
            Redis::set($this->name, "running");
            $this->recheck = false;
        }
    }
Dominik Hebeler's avatar
Dominik Hebeler committed
146
147
148
    private function updateStats($poptime)
    {
        if ($this->connectionInfo !== null) {
Dominik Hebeler's avatar
Dominik Hebeler committed
149
150
151
            $connectionInfo = base64_encode(json_encode($this->connectionInfo));
            Redis::hset($this->name . ".stats", $this->pid, $connectionInfo . ";" . $poptime);
        }
152
153
    }

Dominik Hebeler's avatar
Dominik Hebeler committed
154
155
    private function getFetchTime()
    {
Dominik Hebeler's avatar
Dominik Hebeler committed
156
        $vals = Redis::hgetall($this->name . ".stats");
Dominik Hebeler's avatar
Dominik Hebeler committed
157
        if (sizeof($vals) === 0) {
Dominik Hebeler's avatar
Dominik Hebeler committed
158
            return 0;
Dominik Hebeler's avatar
Dominik Hebeler committed
159
        } else {
Dominik Hebeler's avatar
Dominik Hebeler committed
160
161
162
            $totalTime = 0;
            foreach ($vals as $pid => $value) {
                $time = floatval(json_decode(base64_decode(explode(";", $value)[0]), true)["total_time"]);
Dominik Hebeler's avatar
Dominik Hebeler committed
163
                $time *= 1000; // Transform from seconds to milliseconds
Dominik Hebeler's avatar
Dominik Hebeler committed
164
165
166
167
168
169
170
                $totalTime += $time;
            }
            $totalTime /= sizeof($vals);
            return $totalTime;
        }
    }

Dominik Hebeler's avatar
Dominik Hebeler committed
171
172
    private function retrieveUrl($url)
    {
173
174
175
        // Set this URL to the Curl handle
        curl_setopt($this->ch, CURLOPT_URL, $url);
        $result = curl_exec($this->ch);
176
        $this->connectionInfo = curl_getinfo($this->ch);
177
178
179
        return $result;
    }

Dominik Hebeler's avatar
Dominik Hebeler committed
180
181
    private function storeResult($result, $poptime, $hashValue)
    {
182
183
        $redis = Redis::connection(env('REDIS_RESULT_CONNECTION'));
        $pipeline = $redis->pipeline();
184
        $pipeline->hset('search.' . $hashValue . ".results." . $this->name, "response", $result);
185
186
        $pipeline->hset('search.' . $hashValue . ".results." . $this->name, "delivered", "0");
        $pipeline->hincrby('search.' . $hashValue . ".results.status", "engineAnswered", 1);
187
        // After 60 seconds the results should be read by the MetaGer Process and stored in the Cache instead
188
        $pipeline->expire('search.' . $hashValue . ".results." . $this->name, env('REDIS_RESULT_CACHE_DURATION'));
189
        $pipeline->rpush('search.' . $hashValue . ".ready", $this->name);
190
        $pipeline->expire('search.' . $hashValue . ".ready", env('REDIS_RESULT_CACHE_DURATION'));
191
        $pipeline->sadd('search.' . $hashValue . ".engines", $this->name);
192
        $pipeline->expire('search.' . $hashValue . ".engines", env('REDIS_RESULT_CACHE_DURATION'));
193
        $pipeline->execute();
194
195
196
        $this->lastTime = microtime(true);
    }

Dominik Hebeler's avatar
Dominik Hebeler committed
197
198
    private function shutdown()
    {
199
        Redis::hdel($this->name . ".stats", $this->pid);
Dominik Hebeler's avatar
Dominik Hebeler committed
200
        if (sizeof(Redis::hgetall($this->name . ".stats")) === 0) {
201
202
203
204
            Redis::del($this->name);
        }
        // We should close our curl handle before we do so
        curl_close($this->ch);
205
206
    }

Dominik Hebeler's avatar
Dominik Hebeler committed
207
208
    private function initCurlHandle()
    {
209
210
211
        $ch = curl_init();

        curl_setopt_array($ch, array(
Dominik Hebeler's avatar
Dominik Hebeler committed
212
213
214
215
216
217
218
219
            CURLOPT_RETURNTRANSFER => 1,
            CURLOPT_USERAGENT => "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:40.0) Gecko/20100101 Firefox/40.1",
            CURLOPT_FOLLOWLOCATION => true,
            CURLOPT_CONNECTTIMEOUT => 10,
            CURLOPT_MAXCONNECTS => 500,
            CURLOPT_LOW_SPEED_LIMIT => 500,
            CURLOPT_LOW_SPEED_TIME => 5,
            CURLOPT_TIMEOUT => 10,
220
221
        ));

222
223
224
225
226
227
228
        if (!empty($this->proxyhost) && !empty($this->proxyport) && !empty($this->proxyuser) && !empty($this->proxypassword)) {
            curl_setopt(CURLOPT_PROXY, $this->proxyhost);
            curl_setopt(CURLOPT_PROXYUSERPWD, $this->proxyuser . ":" . $this->proxypassword);
            curl_setopt(CURLOPT_PROXYPORT, $this->proxyport);
            curl_setopt(CURLOPT_PROXYTYPE, CURLPROXY_SOCKS5);
        }

Dominik Hebeler's avatar
Dominik Hebeler committed
229
230
231
232
        if ($this->user !== null && $this->password !== null) {
            curl_setopt($ch, CURLOPT_USERPWD, $this->user . ":" . $this->password);
        }

233
        if ($this->headers !== null) {
234
235
236
237
            $headers = [];
            foreach ($this->headers as $key => $value) {
                $headers[] = $key . ":" . $value;
            }
238
239
            # Headers are in the Form:
            # <key>:<value>;<key>:<value>
240
            curl_setopt($ch, CURLOPT_HTTPHEADER, $headers);
241
242
        }

243
244
245
        return $ch;
    }
}