RequestFetcher.php 7.64 KB
Newer Older
1
2
3
4
<?php

namespace App\Console\Commands;

Dominik Hebeler's avatar
Dominik Hebeler committed
5
use Cache;
6
7
use Illuminate\Console\Command;
use Illuminate\Support\Facades\Redis;
Dominik Hebeler's avatar
Dominik Hebeler committed
8
use Log;
Dominik Hebeler's avatar
Dominik Hebeler committed
9
use Carbon;
10

Dominik Hebeler's avatar
Dominik Hebeler committed
11
class RequestFetcher extends Command
12
{
Dominik Hebeler's avatar
Dominik Hebeler committed
13
14
15
    const HEALTHCHECK_KEY = "fetcher_healthcheck";
    const HEALTHCHECK_FORMAT = "Y-m-d H:i:s";

16
17
18
19
20
    /**
     * The name and signature of the console command.
     *
     * @var string
     */
Dominik Hebeler's avatar
Dominik Hebeler committed
21
    protected $signature = 'requests:fetcher';
22
23
24
25
26
27

    /**
     * The console command description.
     *
     * @var string
     */
Dominik Hebeler's avatar
Dominik Hebeler committed
28
    protected $description = 'This commands fetches requests to the installed search engines';
29
30

    protected $shouldRun = true;
Dominik Hebeler's avatar
Dominik Hebeler committed
31
    protected $multicurl = null;
Dominik Hebeler's avatar
Dominik Hebeler committed
32
33
34
    protected $proxyhost;
    protected $proxyuser;
    protected $proxypassword;
35
36
37
38
39
40
41
42
43

    /**
     * Create a new command instance.
     *
     * @return void
     */
    public function __construct()
    {
        parent::__construct();
Dominik Hebeler's avatar
Dominik Hebeler committed
44
        $this->multicurl = curl_multi_init();
45
46
47
48
        $this->proxyhost = config("metager.metager.fetcher.proxy.host");
        $this->proxyport = config("metager.metager.fetcher.proxy.port");
        $this->proxyuser = config("metager.metager.fetcher.proxy.user");
        $this->proxypassword = config("metager.metager.fetcher.proxy.password");
49
50
51
52
53
54
55
56
57
    }

    /**
     * Execute the console command.
     *
     * @return mixed
     */
    public function handle()
    {
58
59
60
        pcntl_signal(SIGINT, [$this, "sig_handler"]);
        pcntl_signal(SIGTERM, [$this, "sig_handler"]);
        pcntl_signal(SIGHUP, [$this, "sig_handler"]);
61

62
63
64
65
66
67
68
69
70
71
72
73
74
75
        // Redis might not be available now
        for ($count = 0; $count < 10; $count++) {
            try {
                Redis::connection();
                break;
            } catch (\Predis\Connection\ConnectionException $e) {
                if ($count >= 9) {
                    // If its not available after 10 seconds we will exit
                    return;
                }
                sleep(1);
            }
        }

76
77
        try {
            while ($this->shouldRun) {
Dominik Hebeler's avatar
Dominik Hebeler committed
78
                Redis::set(self::HEALTHCHECK_KEY, Carbon::now()->format(self::HEALTHCHECK_FORMAT));
Dominik Hebeler's avatar
Dominik Hebeler committed
79
                $operationsRunning = true;
80
81
82
83
                curl_multi_exec($this->multicurl, $operationsRunning);
                $status = $this->readMultiCurl($this->multicurl);
                $answersRead = $status[0];
                $messagesLeft = $status[1];
Dominik Hebeler's avatar
Dominik Hebeler committed
84
                $newJobs = $this->checkNewJobs($operationsRunning, $messagesLeft);
Dominik Hebeler's avatar
Dominik Hebeler committed
85

86
                if ($newJobs === 0 && $answersRead === 0) {
Dominik Hebeler's avatar
Dominik Hebeler committed
87
                    usleep(10 * 1000);
88
89
90
                }
            }
        } finally {
Dominik Hebeler's avatar
Dominik Hebeler committed
91
            curl_multi_close($this->multicurl);
92
93
94
        }
    }

Dominik Hebeler's avatar
Dominik Hebeler committed
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
    /**
     * Checks the Redis queue if any new fetch jobs where submitted
     * and adds them to multicurl if there are.
     * Will be blocking call to redis if there are no running jobs in multicurl
     */
    private function checkNewJobs($operationsRunning, $messagesLeft)
    {
        $newJobs = [];
        if ($operationsRunning === 0 && $messagesLeft === -1) {
            $newJob = Redis::blpop(\App\MetaGer::FETCHQUEUE_KEY, 1);
            if (!empty($newJob)) {
                $newJobs[] = $newJob[1];
            }
        } else {
            $elements = Redis::pipeline(function ($redis) {
                $redis->lrange(\App\MetaGer::FETCHQUEUE_KEY, 0, -1);
                $redis->del(\App\MetaGer::FETCHQUEUE_KEY);
            });
            $newJobs = $elements[0];
        }

        $addedJobs = 0;
        foreach ($newJobs as $newJob) {
            $newJob = json_decode($newJob, true);
            $ch = $this->getCurlHandle($newJob);
            if (curl_multi_add_handle($this->multicurl, $ch) !== 0) {
                $this->shouldRun = false;
                Log::error("Couldn't add Handle to multicurl");
                break;
            } else {
                $addedJobs++;
            }
        }

        return $addedJobs;
    }

132
133
    private function readMultiCurl($mc)
    {
Dominik Hebeler's avatar
Dominik Hebeler committed
134
        $messagesLeft = -1;
135
        $answersRead = 0;
Dominik Hebeler's avatar
Dominik Hebeler committed
136
        while (($info = curl_multi_info_read($mc, $messagesLeft)) !== false) {
137
            try {
138
                $answersRead++;
139
140
141
142
                $infos = curl_getinfo($info["handle"], CURLINFO_PRIVATE);
                $infos = explode(";", $infos);
                $resulthash = $infos[0];
                $cacheDurationMinutes = intval($infos[1]);
143
                $name = $infos[2];
144
                $responseCode = curl_getinfo($info["handle"], CURLINFO_HTTP_CODE);
145
                $body = "no-result";
146

Dominik Hebeler's avatar
Dominik Hebeler committed
147
                $totalTime = curl_getinfo($info["handle"], CURLINFO_TOTAL_TIME);
148
149
                \App\PrometheusExporter::Duration($totalTime, $name);

150
151
152
153
154
                $error = curl_error($info["handle"]);
                if (!empty($error)) {
                    Log::error($error);
                }

155
                if ($responseCode !== 200 && $responseCode !== 201) {
156
                    Log::debug($resulthash);
157
158
159
160
161
162
                    Log::debug("Got responsecode " . $responseCode . " fetching \"" . curl_getinfo($info["handle"], CURLINFO_EFFECTIVE_URL) . "\n");
                } else {
                    $body = \curl_multi_getcontent($info["handle"]);
                }

                Redis::pipeline(function ($pipe) use ($resulthash, $body, $cacheDurationMinutes) {
163
                    $pipe->lpush($resulthash, $body);
164
165
                    $pipe->expire($resulthash, 60);
                });
Dominik Hebeler's avatar
Dominik Hebeler committed
166
167
168
169
170
171
172
173

                if ($cacheDurationMinutes > 0) {
                    try {
                        Cache::put($resulthash, $body, $cacheDurationMinutes * 60);
                    } catch (\Exception $e) {
                        Log::error($e->getMessage());
                    }
                }
174
175
176
177
            } finally {
                \curl_multi_remove_handle($mc, $info["handle"]);
            }
        }
178
        return [$answersRead, $messagesLeft];
179
180
    }

Dominik Hebeler's avatar
Dominik Hebeler committed
181
    private function getCurlHandle($job)
182
    {
Dominik Hebeler's avatar
Dominik Hebeler committed
183
        $ch = curl_init();
Dominik Hebeler's avatar
Dominik Hebeler committed
184

Dominik Hebeler's avatar
Dominik Hebeler committed
185
186
        curl_setopt_array($ch, array(
            CURLOPT_URL => $job["url"],
187
            CURLOPT_PRIVATE => $job["resulthash"] . ";" . $job["cacheDuration"] . ";" . $job["name"],
Dominik Hebeler's avatar
Dominik Hebeler committed
188
            CURLOPT_RETURNTRANSFER => 1,
189
            CURLOPT_USERAGENT => $job["useragent"],
Dominik Hebeler's avatar
Dominik Hebeler committed
190
            CURLOPT_FOLLOWLOCATION => true,
Dominik Hebeler's avatar
Dominik Hebeler committed
191
            CURLOPT_CONNECTTIMEOUT => 8,
Dominik Hebeler's avatar
Dominik Hebeler committed
192
            CURLOPT_MAXCONNECTS => 500,
Dominik Hebeler's avatar
Dominik Hebeler committed
193
            CURLOPT_LOW_SPEED_LIMIT => 50000,
Dominik Hebeler's avatar
Dominik Hebeler committed
194
195
            CURLOPT_LOW_SPEED_TIME => 10,
            CURLOPT_TIMEOUT => 10,
Dominik Hebeler's avatar
Dominik Hebeler committed
196
197
        ));

198
199
200
201
        if (!empty($job["curlopts"])) {
            curl_setopt_array($ch, $job["curlopts"]);
        }

202
        if (!empty($this->proxyhost) && !empty($this->proxyport)) {
Dominik Hebeler's avatar
Dominik Hebeler committed
203
            curl_setopt($ch, CURLOPT_PROXY, $this->proxyhost);
Dominik Hebeler's avatar
Dominik Hebeler committed
204
            if (!empty($this->proxyuser) && !empty($this->proxypassword)) {
205
206
                curl_setopt($ch, CURLOPT_PROXYUSERPWD, $this->proxyuser . ":" . $this->proxypassword);
            }
Dominik Hebeler's avatar
Dominik Hebeler committed
207
            curl_setopt($ch, CURLOPT_PROXYPORT, $this->proxyport);
208
            curl_setopt($ch, CURLOPT_PROXYTYPE, CURLPROXY_HTTP);
Dominik Hebeler's avatar
Dominik Hebeler committed
209
210
211
212
213
214
        }

        if (!empty($job["username"]) && !empty($job["password"])) {
            curl_setopt($ch, CURLOPT_USERPWD, $job["username"] . ":" . $job["password"]);
        }

Dominik Hebeler's avatar
Dominik Hebeler committed
215
        if (!empty($job["headers"]) && sizeof($job["headers"]) > 0) {
Dominik Hebeler's avatar
Dominik Hebeler committed
216
217
218
219
220
221
222
            $headers = [];
            foreach ($job["headers"] as $key => $value) {
                $headers[] = $key . ":" . $value;
            }
            # Headers are in the Form:
            # <key>:<value>;<key>:<value>
            curl_setopt($ch, CURLOPT_HTTPHEADER, $headers);
223
        }
Dominik Hebeler's avatar
Dominik Hebeler committed
224
225

        return $ch;
226
227
    }

Dominik Hebeler's avatar
Dominik Hebeler committed
228
    public function sig_handler($sig)
229
230
    {
        $this->shouldRun = false;
Dominik Hebeler's avatar
Dominik Hebeler committed
231
        echo ("Terminating Process\n");
232
233
    }
}