Search.php 11.3 KB
Newer Older
1
2
3
4
<?php

namespace App\Jobs;

5
use Illuminate\Bus\Queueable;
6
use Illuminate\Contracts\Queue\ShouldQueue;
7
8
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
9
use Illuminate\Support\Facades\Redis;
10

11
class Search implements ShouldQueue
12
{
13
    use InteractsWithQueue, Queueable, SerializesModels;
14

Phil Höfer's avatar
Phil Höfer committed
15
    protected $hash, $host, $port, $name, $getString, $useragent, $fp, $additionalHeaders;
16
17
18
19
20
21
22
    protected $buffer_length = 8192;

    /**
     * Create a new job instance.
     *
     * @return void
     */
Phil Höfer's avatar
Phil Höfer committed
23
    public function __construct($hash, $host, $port, $name, $getString, $useragent, $additionalHeaders)
24
    {
Phil Höfer's avatar
Phil Höfer committed
25
26
27
28
29
30
31
        $this->hash              = $hash;
        $this->host              = $host;
        $this->port              = $port;
        $this->name              = $name;
        $this->getString         = $getString;
        $this->useragent         = $useragent;
        $this->additionalHeaders = $additionalHeaders;
32
33
34
35
36
37
38
    }

    /**
     * Execute the job.
     *
     * @return void
     */
39
    public function handle()
40
41
    {
        $this->fp = $this->getFreeSocket();
42
43
44

        if ($this->fp) {
            if ($this->writeRequest()) {
45
46
47
48
49
                $this->readAnswer();
            }
        }
    }

50
    private function readAnswer()
51
    {
52
        $time    = microtime(true);
53
        $headers = '';
54
55
        $body    = '';
        $length  = 0;
56

57
        if (!$this->fp) {
58
59
60
61
62
63
            return;
        }

        // get headers FIRST
        $c = 0;
        stream_set_blocking($this->fp, 1);
64
        do {
65
66
67
68
            // use fgets() not fread(), fgets stops reading at first newline
            // or buffer which ever one is reached first
            $data = fgets($this->fp, 8192);
            // a sincle CRLF indicates end of headers
69
            if ($data === false || $data == "\r\n" || feof($this->fp)) {
70
71
72
                // break BEFORE OUTPUT
                break;
            }
73
            if (sizeof(($tmp = explode(": ", $data))) === 2) {
74
                $headers[strtolower(trim($tmp[0]))] = trim($tmp[1]);
75
            }
76
            $c++;
77
        } while (true);
78
79

        // end of headers
80
        if (sizeof($headers) > 1) {
81
            $bodySize = 0;
82
            if (isset($headers["transfer-encoding"]) && $headers["transfer-encoding"] === "chunked") {
83
                $body = $this->readChunked();
84
85

            } elseif (isset($headers['content-length'])) {
86
                $length = trim($headers['content-length']);
87
                if (is_numeric($length) && $length >= 1) {
88
                    $body = $this->readBody($length);
89
90
                }

91
                $bodySize = strlen($body);
92
93
94
            } elseif (isset($headers["connection"]) && strtolower($headers["connection"]) === "close") {
                $body = $this->readUntilClose();
            }else {
95
96
                exit;
            }
97
        } else {
98
99
100
101
            return;
        }

        Redis::del($this->host . "." . $this->socketNumber);
102
        if (isset($headers["content-encoding"]) && $headers['content-encoding'] === "gzip") {
103
104
105
106
107
            $body = $this->gunzip($body);
        }
        Redis::hset('search.' . $this->hash, $this->name, $body);
        Redis::expire('search.' . $this->hash, 5);
    }
108
109
110
111
112
113
114
115
116
117
118
119
120
121
    
    private function readUntilClose()
    {
        $data = '';
        stream_set_blocking($this->fp, 1);
        while (!feof($this->fp)) {
            $data .= fgets($this->fp, 8192);
        }
        # Bei dieser Funktion unterstützt der Host kein Keep-Alive:
        # Wir beenden die Verbindung:
        fclose($this->fp);
        Redis::del($this->host . "." . $this->socketNumber);
        return $data;
    }
122
123
124
125

    private function readBody($length)
    {
        $theData = '';
126
        $done    = false;
127
128
        stream_set_blocking($this->fp, 0);
        $startTime = time();
129
130
        $lastTime  = $startTime;
        while (!feof($this->fp) && !$done && (($startTime + 1) > time()) && $length !== 0) {
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
            usleep(100);
            $theNewData = fgets($this->fp, 8192);
            $theData .= $theNewData;
            $length -= strlen($theNewData);
            $done = (trim($theNewData) === '0');

        }
        return $theData;
    }

    private function readChunked()
    {
        $body = '';
        // read from chunked stream
        // loop though the stream
146
        do {
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
            // NOTE: for chunked encoding to work properly make sure
            // there is NOTHING (besides newlines) before the first hexlength

            // get the line which has the length of this chunk (use fgets here)
            $line = fgets($this->fp, 8192);

            // if it's only a newline this normally means it's read
            // the total amount of data requested minus the newline
            // continue to next loop to make sure we're done
            if ($line == "\r\n") {
                continue;
            }

            // the length of the block is sent in hex decode it then loop through
            // that much data get the length
            // NOTE: hexdec() ignores all non hexadecimal chars it finds
            $length = hexdec($line);

            if (!is_int($length)) {
                trigger_error('Most likely not chunked encoding', E_USER_ERROR);
            }

            // zero is sent when at the end of the chunks
            // or the end of the stream or error
            if ($line === false || $length < 1 || feof($this->fp)) {
172
173
174
175
                if ($length <= 0) {
                    fgets($this->fp, 8192);
                }

176
177
178
179
180
                // break out of the streams loop
                break;
            }

            // loop though the chunk
181
            do {
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
                // read $length amount of data
                // (use fread here)
                $data = fread($this->fp, $length);

                // remove the amount received from the total length on the next loop
                // it'll attempt to read that much less data
                $length -= strlen($data);

                // PRINT out directly
                // you could also save it directly to a file here

                // store in string for later use
                $body .= $data;

                // zero or less or end of connection break
197
                if ($length <= 0 || feof($this->fp)) {
198
                    // break out of the chunk loop
199
                    if ($length <= 0) {
200
                        fgets($this->fp, 8192);
201
202
                    }

203
204
                    break;
                }
205
            } while (true);
206
            // end of chunk loop
207
        } while (true);
208
209
210
211
        // end of stream loop
        return $body;
    }

212
213
214
215
216
217
218
219
220
221
222
223
224
225
    private function gunzip($zipped)
    {
        $offset = 0;
        if (substr($zipped, 0, 2) == "\x1f\x8b") {
            $offset = 2;
        }

        if (substr($zipped, $offset, 1) == "\x08") {
            try
            {
                return gzinflate(substr($zipped, $offset + 8));
            } catch (\Exception $e) {
                abort(500, "Fehler beim unzip des Ergebnisses von folgendem Anbieter: " . $this->name);
            }
226
        }
227
228
        return "Unknown Format";
    }
229

230
    private function writeRequest()
231
    {
232

233
234
235
236
237
238
        $out = "GET " . $this->getString . " HTTP/1.1\r\n";
        $out .= "Host: " . $this->host . "\r\n";
        $out .= "User-Agent: " . $this->useragent . "\r\n";
        $out .= "Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8\r\n";
        $out .= "Accept-Language: de,en-US;q=0.7,en;q=0.3\r\n";
        $out .= "Accept-Encoding: gzip, deflate, br\r\n";
Phil Höfer's avatar
Phil Höfer committed
239
        $out .= str_replace("$#!#$", "\r\n", $this->additionalHeaders);
240
241
        $out .= "Connection: keep-alive\r\n\r\n";
        # Anfrage senden:
242
243
244
245
246
        $sent   = 0;
        $string = $out;
        $time   = microtime(true);
        while (true) {
            $timeElapsed = microtime(true) - $time;
247
            if ($timeElapsed > 1.0) {
248
                # Irgendwas ist mit unserem Socket passiert. Wir brauchen einen neuen:
249
                if ($this->fp && is_resource($this->fp)) {
250
251
252
253
254
255
256
                    fclose($this->fp);
                }

                Redis::del($this->name . "." . $this->socketNumber);
                $this->fp = $this->getFreeSocket();
                $sent     = 0;
                $string   = $out;
257
                continue;
258
259
            }
            try {
260
                $tmp = fwrite($this->fp, $string);
261
            } catch (\ErrorException $e) {
262
                # Irgendwas ist mit unserem Socket passiert. Wir brauchen einen neuen:
263
                if ($this->fp && is_resource($this->fp)) {
264
265
266
                    fclose($this->fp);
                }

267
268
                Redis::del($this->name . "." . $this->socketNumber);
                $this->fp = $this->getFreeSocket();
269
270
                $sent     = 0;
                $string   = $out;
271
272
                continue;
            }
273
            if ($tmp) {
274
275
276
277
                $sent += $tmp;
                $string = substr($string, $tmp);
            }

278
            if ($sent >= strlen($out)) {
279
                break;
280
281
            }

282
        }
283

284
        if ($sent === strlen($out)) {
285
286
            return true;
        }
287

288
289
290
        return false;
    }

291
    public function getFreeSocket()
292
293
    {
        # Je nach Auslastung des Servers ( gleichzeitige Abfragen ), kann es sein, dass wir mehrere Sockets benötigen um die Abfragen ohne Wartezeit beantworten zu können.
294
        # pfsockopen öffnet dabei einen persistenten Socket, der also auch zwischen den verschiedenen php Prozessen geteilt werden kann.
295
296
297
298
299
        # Wenn der Hostname mit einem bereits erstellten Socket übereinstimmt, wird die Verbindung also aufgegriffen und fortgeführt.
        # Allerdings dürfen wir diesen nur verwenden, wenn er nicht bereits von einem anderen Prozess zur Kommunikation verwendet wird.
        # Wenn dem so ist, probieren wir den nächsten Socket zu verwenden.
        # Dies festzustellen ist komplizierter, als man sich das vorstellt. Folgendes System sollte funktionieren:
        # 1. Stelle fest, ob dieser Socket neu erstellt wurde, oder ob ein existierender geöffnet wurde.
300
301
        $counter = 0;
        $fp      = null;
302
        $time    = microtime(true);
303
304
305
        do {

            if (intval(Redis::exists($this->host . ".$counter")) === 0) {
306
307
308
309
310
311
                Redis::set($this->host . ".$counter", 1);
                Redis::expire($this->host . ".$counter", 5);
                $this->socketNumber = $counter;

                try
                {
Dominik Hebeler's avatar
Dominik Hebeler committed
312
                    $fp = pfsockopen($this->getHost(), $this->port, $errstr, $errno, 1);
313
                } catch (\ErrorException $e) {
314
315
316
317
318
                    break;
                }
                # Wir gucken, ob der Lesepuffer leer ist:
                stream_set_blocking($fp, 0);
                $string = fgets($fp, 8192);
319
                if ($string !== false || feof($fp)) {
320
321
322
                    if ($this->fp && is_resource($this->fp)) {
                        fclose($fp);
                    }
323
324
                    $this->socketNumber = null;
                    Redis::del($this->host . ".$counter");
325
326
327
328
329
                    continue;
                }
                break;
            }
            $counter++;
330
        } while (true);
331
332
333
        return $fp;
    }

334
    public function getHost()
335
336
    {
        $return = "";
337
        if ($this->port === "443") {
338
            $return .= "tls://";
339
        } else {
340
341
342
343
344
345
            $return .= "tcp://";
        }
        $return .= $this->host;
        return $return;
    }
}