Search.php 10.3 KB
Newer Older
1
2
3
4
5
6
7
<?php

namespace App\Jobs;

use App\Jobs\Job;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Http\Request;
8
9
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
10
11
12
13
14
use Redis;

class Search extends Job implements ShouldQueue
{
    use InteractsWithQueue, SerializesModels;
15

16
    protected $hash, $host, $port, $name, $getString, $useragent, $fp;
17
18
19
20
21
22
23
    protected $buffer_length = 8192;

    /**
     * Create a new job instance.
     *
     * @return void
     */
24
    public function __construct($hash, $host, $port, $name, $getString, $useragent)
25
    {
26
27
28
29
        $this->hash      = $hash;
        $this->host      = $host;
        $this->port      = $port;
        $this->name      = $name;
30
31
32
33
34
35
36
37
38
39
40
41
        $this->getString = $getString;
        $this->useragent = $useragent;
    }

    /**
     * Execute the job.
     *
     * @return void
     */
    public function handle(Request $request)
    {
        $this->fp = $this->getFreeSocket();
42
43
44

        if ($this->fp) {
            if ($this->writeRequest()) {
45
46
47
48
49
                $this->readAnswer();
            }
        }
    }

50
    private function readAnswer()
51
    {
52
        $time    = microtime(true);
53
        $headers = '';
54
55
        $body    = '';
        $length  = 0;
56

57
        if (!$this->fp) {
58
59
60
61
62
63
            return;
        }

        // get headers FIRST
        $c = 0;
        stream_set_blocking($this->fp, 1);
64
        do {
65
66
67
68
            // use fgets() not fread(), fgets stops reading at first newline
            // or buffer which ever one is reached first
            $data = fgets($this->fp, 8192);
            // a sincle CRLF indicates end of headers
69
            if ($data === false || $data == "\r\n" || feof($this->fp)) {
70
71
72
                // break BEFORE OUTPUT
                break;
            }
73
            if (sizeof(($tmp = explode(": ", $data))) === 2) {
74
                $headers[strtolower(trim($tmp[0]))] = trim($tmp[1]);
75
76
            }

77
            $c++;
78
        } while (true);
79
80

        // end of headers
81
        if (sizeof($headers) > 1) {
82
            $bodySize = 0;
83
            if (isset($headers["transfer-encoding"]) && $headers["transfer-encoding"] === "chunked") {
84
                $body = $this->readChunked();
85
86

            } elseif (isset($headers['content-length'])) {
87
                $length = trim($headers['content-length']);
88
                if (is_numeric($length) && $length >= 1) {
89
                    $body = $this->readBody($length);
90
91
                }

92
                $bodySize = strlen($body);
93
            } else {
94
95
                exit;
            }
96
        } else {
97
98
99
100
            return;
        }

        Redis::del($this->host . "." . $this->socketNumber);
101
        if (isset($headers["content-encoding"]) && $headers['content-encoding'] === "gzip") {
102
103
104
105
106
107
108
109
110
            $body = $this->gunzip($body);
        }
        Redis::hset('search.' . $this->hash, $this->name, $body);
        Redis::expire('search.' . $this->hash, 5);
    }

    private function readBody($length)
    {
        $theData = '';
111
        $done    = false;
112
113
        stream_set_blocking($this->fp, 0);
        $startTime = time();
114
115
        $lastTime  = $startTime;
        while (!feof($this->fp) && !$done && (($startTime + 1) > time()) && $length !== 0) {
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
            usleep(100);
            $theNewData = fgets($this->fp, 8192);
            $theData .= $theNewData;
            $length -= strlen($theNewData);
            $done = (trim($theNewData) === '0');

        }
        return $theData;
    }

    private function readChunked()
    {
        $body = '';
        // read from chunked stream
        // loop though the stream
131
        do {
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
            // NOTE: for chunked encoding to work properly make sure
            // there is NOTHING (besides newlines) before the first hexlength

            // get the line which has the length of this chunk (use fgets here)
            $line = fgets($this->fp, 8192);

            // if it's only a newline this normally means it's read
            // the total amount of data requested minus the newline
            // continue to next loop to make sure we're done
            if ($line == "\r\n") {
                continue;
            }

            // the length of the block is sent in hex decode it then loop through
            // that much data get the length
            // NOTE: hexdec() ignores all non hexadecimal chars it finds
            $length = hexdec($line);

            if (!is_int($length)) {
                trigger_error('Most likely not chunked encoding', E_USER_ERROR);
            }

            // zero is sent when at the end of the chunks
            // or the end of the stream or error
            if ($line === false || $length < 1 || feof($this->fp)) {
157
158
159
160
                if ($length <= 0) {
                    fgets($this->fp, 8192);
                }

161
162
163
164
165
                // break out of the streams loop
                break;
            }

            // loop though the chunk
166
            do {
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
                // read $length amount of data
                // (use fread here)
                $data = fread($this->fp, $length);

                // remove the amount received from the total length on the next loop
                // it'll attempt to read that much less data
                $length -= strlen($data);

                // PRINT out directly
                // you could also save it directly to a file here

                // store in string for later use
                $body .= $data;

                // zero or less or end of connection break
182
                if ($length <= 0 || feof($this->fp)) {
183
                    // break out of the chunk loop
184
                    if ($length <= 0) {
185
                        fgets($this->fp, 8192);
186
187
                    }

188
189
                    break;
                }
190
            } while (true);
191
            // end of chunk loop
192
        } while (true);
193
194
195
196
        // end of stream loop
        return $body;
    }

197
198
199
200
201
202
203
204
205
206
207
208
209
210
    private function gunzip($zipped)
    {
        $offset = 0;
        if (substr($zipped, 0, 2) == "\x1f\x8b") {
            $offset = 2;
        }

        if (substr($zipped, $offset, 1) == "\x08") {
            try
            {
                return gzinflate(substr($zipped, $offset + 8));
            } catch (\Exception $e) {
                abort(500, "Fehler beim unzip des Ergebnisses von folgendem Anbieter: " . $this->name);
            }
211
        }
212
213
        return "Unknown Format";
    }
214

215
    private function writeRequest()
216
    {
217

218
219
220
221
222
223
224
225
        $out = "GET " . $this->getString . " HTTP/1.1\r\n";
        $out .= "Host: " . $this->host . "\r\n";
        $out .= "User-Agent: " . $this->useragent . "\r\n";
        $out .= "Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8\r\n";
        $out .= "Accept-Language: de,en-US;q=0.7,en;q=0.3\r\n";
        $out .= "Accept-Encoding: gzip, deflate, br\r\n";
        $out .= "Connection: keep-alive\r\n\r\n";
        # Anfrage senden:
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
        $sent   = 0;
        $string = $out;
        $time   = microtime(true);
        while (true) {
            $timeElapsed = microtime(true) - $time;
            if ($timeElapsed > 0.5) {
                # Irgendwas ist mit unserem Socket passiert. Wir brauchen einen neuen:
                if ($this->fp) {
                    fclose($this->fp);
                }

                Redis::del($this->name . "." . $this->socketNumber);
                $this->fp = $this->getFreeSocket();
                $sent     = 0;
                $string   = $out;
                break;
            }
            try {
244
                $tmp = fwrite($this->fp, $string);
245
            } catch (\ErrorException $e) {
246
                # Irgendwas ist mit unserem Socket passiert. Wir brauchen einen neuen:
247
248
249
250
251
252
                try {
                    fclose($this->fp);
                } catch (\ErrorException $e) {

                }

253
254
                Redis::del($this->name . "." . $this->socketNumber);
                $this->fp = $this->getFreeSocket();
255
256
                $sent     = 0;
                $string   = $out;
257
258
                continue;
            }
259
            if ($tmp) {
260
261
262
263
                $sent += $tmp;
                $string = substr($string, $tmp);
            }

264
            if ($sent >= strlen($out)) {
265
                break;
266
267
            }

268
        }
269
        if ($sent === strlen($out)) {
270
271
272
273
274
            return true;
        }
        return false;
    }

275
    public function getFreeSocket()
276
277
    {
        # Je nach Auslastung des Servers ( gleichzeitige Abfragen ), kann es sein, dass wir mehrere Sockets benötigen um die Abfragen ohne Wartezeit beantworten zu können.
278
        # pfsockopen öffnet dabei einen persistenten Socket, der also auch zwischen den verschiedenen php Prozessen geteilt werden kann.
279
280
281
282
283
        # Wenn der Hostname mit einem bereits erstellten Socket übereinstimmt, wird die Verbindung also aufgegriffen und fortgeführt.
        # Allerdings dürfen wir diesen nur verwenden, wenn er nicht bereits von einem anderen Prozess zur Kommunikation verwendet wird.
        # Wenn dem so ist, probieren wir den nächsten Socket zu verwenden.
        # Dies festzustellen ist komplizierter, als man sich das vorstellt. Folgendes System sollte funktionieren:
        # 1. Stelle fest, ob dieser Socket neu erstellt wurde, oder ob ein existierender geöffnet wurde.
284
285
286
287
288
        $counter = 0;
        $fp      = null;
        do {

            if (intval(Redis::exists($this->host . ".$counter")) === 0) {
289
290
291
292
293
294
295
                Redis::set($this->host . ".$counter", 1);
                Redis::expire($this->host . ".$counter", 5);
                $this->socketNumber = $counter;

                try
                {
                    $fp = pfsockopen($this->getHost() . ":" . $this->port . "/$counter", $this->port, $errstr, $errno, 1);
296
                } catch (\ErrorException $e) {
297
298
299
300
301
                    break;
                }
                # Wir gucken, ob der Lesepuffer leer ist:
                stream_set_blocking($fp, 0);
                $string = fgets($fp, 8192);
302
                if ($string !== false || feof($fp)) {
303
304
305
306
307
308
                    fclose($fp);
                    continue;
                }
                break;
            }
            $counter++;
309
        } while (true);
310
311
312
        return $fp;
    }

313
    public function getHost()
314
315
    {
        $return = "";
316
        if ($this->port === "443") {
317
            $return .= "tls://";
318
        } else {
319
320
321
322
323
324
            $return .= "tcp://";
        }
        $return .= $this->host;
        return $return;
    }
}