Skip to content
Snippets Groups Projects
Commit d989f5ad authored by Dominik Hebeler's avatar Dominik Hebeler
Browse files

Merge branch '485-ux-bei-such-timeout' into 'development'

Cache wieder angeschaltet

Closes #485

See merge request !860
parents 1f8eae28 1b68945c
No related branches found
No related tags found
2 merge requests!860Cache wieder angeschaltet,!859Development
<?php
namespace App\Jobs;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Facades\Redis;
class Search implements ShouldQueue
{
use InteractsWithQueue, Queueable, SerializesModels;
protected $hash, $host, $port, $name, $getString, $useragent, $fp, $additionalHeaders;
protected $buffer_length = 8192;
/**
* Create a new job instance.
*
* @return void
*/
public function __construct($hash, $host, $port, $name, $getString, $useragent, $additionalHeaders)
{
$this->hash = $hash;
$this->host = $host;
$this->port = $port;
$this->name = $name;
$this->getString = $getString;
$this->useragent = $useragent;
$this->additionalHeaders = $additionalHeaders;
}
/**
* Execute the job.
*
* @return void
*/
public function handle()
{
$url = "";
if($this->port === "443"){
$url = "https://";
}else{
$url = "http://";
}
$url .= $this->host . $this->getString;
$ch = curl_init($url);
curl_setopt_array($ch, array(
CURLOPT_RETURNTRANSFER => 1,
CURLOPT_URL => $url,
CURLOPT_USERAGENT => $this->useragent,
CURLOPT_FOLLOWLOCATION => TRUE,
CURLOPT_CONNECTTIMEOUT => 10,
CURLOPT_MAXCONNECTS => 50,
CURLOPT_LOW_SPEED_LIMIT => 500,
CURLOPT_LOW_SPEED_TIME => 5,
CURLOPT_TIMEOUT => 10
));
$result = curl_exec($ch);
curl_close($ch);
Redis::hset('search.' . $this->hash, $this->name, $result);
}
private function readAnswer()
{
$time = microtime(true);
$headers = '';
$body = '';
$length = 0;
if (!$this->fp) {
return;
}
// get headers FIRST
$c = 0;
stream_set_blocking($this->fp, 1);
do {
// use fgets() not fread(), fgets stops reading at first newline
// or buffer which ever one is reached first
$data = fgets($this->fp, 8192);
// a sincle CRLF indicates end of headers
if ($data === false || $data == "\r\n" || feof($this->fp)) {
// break BEFORE OUTPUT
break;
}
if (sizeof(($tmp = explode(": ", $data))) === 2) {
$headers[strtolower(trim($tmp[0]))] = trim($tmp[1]);
}
$c++;
} while (true);
// end of headers
if (sizeof($headers) > 1) {
$bodySize = 0;
if (isset($headers["transfer-encoding"]) && $headers["transfer-encoding"] === "chunked") {
$body = $this->readChunked();
} elseif (isset($headers['content-length'])) {
$length = trim($headers['content-length']);
if (is_numeric($length) && $length >= 1) {
$body = $this->readBody($length);
}
$bodySize = strlen($body);
} elseif (isset($headers["connection"]) && strtolower($headers["connection"]) === "close") {
$body = $this->readUntilClose();
}else {
exit;
}
} else {
return;
}
Redis::del($this->host . "." . $this->socketNumber);
if (isset($headers["content-encoding"]) && $headers['content-encoding'] === "gzip") {
$body = $this->gunzip($body);
}
Redis::hset('search.' . $this->hash, $this->name, $body);
Redis::expire('search.' . $this->hash, 5);
}
private function readUntilClose()
{
$data = '';
stream_set_blocking($this->fp, 1);
while (!feof($this->fp)) {
$data .= fgets($this->fp, 8192);
}
# Bei dieser Funktion unterstützt der Host kein Keep-Alive:
# Wir beenden die Verbindung:
fclose($this->fp);
Redis::del($this->host . "." . $this->socketNumber);
return $data;
}
private function readBody($length)
{
$theData = '';
$done = false;
stream_set_blocking($this->fp, 0);
$startTime = time();
$lastTime = $startTime;
while (!feof($this->fp) && !$done && (($startTime + 1) > time()) && $length !== 0) {
usleep(100);
$theNewData = fgets($this->fp, 8192);
$theData .= $theNewData;
$length -= strlen($theNewData);
$done = (trim($theNewData) === '0');
}
return $theData;
}
private function readChunked()
{
$body = '';
// read from chunked stream
// loop though the stream
do {
// NOTE: for chunked encoding to work properly make sure
// there is NOTHING (besides newlines) before the first hexlength
// get the line which has the length of this chunk (use fgets here)
$line = fgets($this->fp, 8192);
// if it's only a newline this normally means it's read
// the total amount of data requested minus the newline
// continue to next loop to make sure we're done
if ($line == "\r\n") {
continue;
}
// the length of the block is sent in hex decode it then loop through
// that much data get the length
// NOTE: hexdec() ignores all non hexadecimal chars it finds
$length = hexdec($line);
if (!is_int($length)) {
trigger_error('Most likely not chunked encoding', E_USER_ERROR);
}
// zero is sent when at the end of the chunks
// or the end of the stream or error
if ($line === false || $length < 1 || feof($this->fp)) {
if ($length <= 0) {
fgets($this->fp, 8192);
}
// break out of the streams loop
break;
}
// loop though the chunk
do {
// read $length amount of data
// (use fread here)
$data = fread($this->fp, $length);
// remove the amount received from the total length on the next loop
// it'll attempt to read that much less data
$length -= strlen($data);
// PRINT out directly
// you could also save it directly to a file here
// store in string for later use
$body .= $data;
// zero or less or end of connection break
if ($length <= 0 || feof($this->fp)) {
// break out of the chunk loop
if ($length <= 0) {
fgets($this->fp, 8192);
}
break;
}
} while (true);
// end of chunk loop
} while (true);
// end of stream loop
return $body;
}
private function gunzip($zipped)
{
$offset = 0;
if (substr($zipped, 0, 2) == "\x1f\x8b") {
$offset = 2;
}
if (substr($zipped, $offset, 1) == "\x08") {
try
{
return gzinflate(substr($zipped, $offset + 8));
} catch (\Exception $e) {
abort(500, "Fehler beim unzip des Ergebnisses von folgendem Anbieter: " . $this->name);
}
}
return "Unknown Format";
}
private function writeRequest()
{
$out = "GET " . $this->getString . " HTTP/1.1\r\n";
$out .= "Host: " . $this->host . "\r\n";
$out .= "User-Agent: " . $this->useragent . "\r\n";
$out .= "Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8\r\n";
$out .= "Accept-Language: de,en-US;q=0.7,en;q=0.3\r\n";
$out .= "Accept-Encoding: gzip, deflate, br\r\n";
$out .= str_replace("$#!#$", "\r\n", $this->additionalHeaders);
$out .= "Connection: keep-alive\r\n\r\n";
# Anfrage senden:
$sent = 0;
$string = $out;
$time = microtime(true);
while (true) {
$timeElapsed = microtime(true) - $time;
if ($timeElapsed > 1.0) {
# Irgendwas ist mit unserem Socket passiert. Wir brauchen einen neuen:
if ($this->fp && is_resource($this->fp)) {
fclose($this->fp);
}
Redis::del($this->name . "." . $this->socketNumber);
$this->fp = $this->getFreeSocket();
$sent = 0;
$string = $out;
continue;
}
try {
$tmp = fwrite($this->fp, $string);
} catch (\ErrorException $e) {
# Irgendwas ist mit unserem Socket passiert. Wir brauchen einen neuen:
if ($this->fp && is_resource($this->fp)) {
fclose($this->fp);
}
Redis::del($this->name . "." . $this->socketNumber);
$this->fp = $this->getFreeSocket();
$sent = 0;
$string = $out;
continue;
}
if ($tmp) {
$sent += $tmp;
$string = substr($string, $tmp);
}
if ($sent >= strlen($out)) {
break;
}
}
if ($sent === strlen($out)) {
return true;
}
return false;
}
public function getFreeSocket()
{
# Je nach Auslastung des Servers ( gleichzeitige Abfragen ), kann es sein, dass wir mehrere Sockets benötigen um die Abfragen ohne Wartezeit beantworten zu können.
# pfsockopen öffnet dabei einen persistenten Socket, der also auch zwischen den verschiedenen php Prozessen geteilt werden kann.
# Wenn der Hostname mit einem bereits erstellten Socket übereinstimmt, wird die Verbindung also aufgegriffen und fortgeführt.
# Allerdings dürfen wir diesen nur verwenden, wenn er nicht bereits von einem anderen Prozess zur Kommunikation verwendet wird.
# Wenn dem so ist, probieren wir den nächsten Socket zu verwenden.
# Dies festzustellen ist komplizierter, als man sich das vorstellt. Folgendes System sollte funktionieren:
# 1. Stelle fest, ob dieser Socket neu erstellt wurde, oder ob ein existierender geöffnet wurde.
$counter = 0;
$fp = null;
$time = microtime(true);
do {
if (intval(Redis::exists($this->host . ".$counter")) === 0) {
Redis::set($this->host . ".$counter", 1);
Redis::expire($this->host . ".$counter", 5);
$this->socketNumber = $counter;
try
{
$fp = pfsockopen($this->getHost(), $this->port, $errstr, $errno, 1);
} catch (\ErrorException $e) {
break;
}
# Wir gucken, ob der Lesepuffer leer ist:
stream_set_blocking($fp, 0);
$string = fgets($fp, 8192);
if ($string !== false || feof($fp)) {
if ($this->fp && is_resource($this->fp)) {
fclose($fp);
}
$this->socketNumber = null;
Redis::del($this->host . ".$counter");
continue;
}
break;
}
$counter++;
} while (true);
return $fp;
}
public function getHost()
{
$return = "";
if ($this->port === "443") {
$return .= "tls://";
} else {
$return .= "tcp://";
}
$return .= $this->host;
return $return;
}
}
...@@ -161,7 +161,6 @@ class Searcher implements ShouldQueue ...@@ -161,7 +161,6 @@ class Searcher implements ShouldQueue
} }
// We should close our curl handle before we do so // We should close our curl handle before we do so
curl_close($this->ch); curl_close($this->ch);
Log::info("Exiting here!");
} }
private function initCurlHandle(){ private function initCurlHandle(){
......
...@@ -105,7 +105,7 @@ abstract class Searchengine ...@@ -105,7 +105,7 @@ abstract class Searchengine
# Prüft, ob die Suche bereits gecached ist, ansonsted wird sie als Job dispatched # Prüft, ob die Suche bereits gecached ist, ansonsted wird sie als Job dispatched
public function startSearch(\App\MetaGer $metager) public function startSearch(\App\MetaGer $metager)
{ {
if ($this->canCache && Cache::has($this->hash) && 0 == 1) { if ($this->canCache && Cache::has($this->hash)) {
$this->cached = true; $this->cached = true;
$this->retrieveResults($metager); $this->retrieveResults($metager);
} else { } else {
...@@ -155,7 +155,6 @@ abstract class Searchengine ...@@ -155,7 +155,6 @@ abstract class Searchengine
// of course need to spawn a new one. // of course need to spawn a new one.
if(sizeof($searcherData) === 0){ if(sizeof($searcherData) === 0){
$needSearcher = true; $needSearcher = true;
Log::info("Request new Searcher beacause none is running");
}else{ }else{
// There we go: // There we go:
// There's at least one Fetcher running for this search engine. // There's at least one Fetcher running for this search engine.
...@@ -171,7 +170,6 @@ abstract class Searchengine ...@@ -171,7 +170,6 @@ abstract class Searchengine
$median /= sizeof($searcherData); $median /= sizeof($searcherData);
if($median < .1){ if($median < .1){
$needSearcher = true; $needSearcher = true;
Log::info("Requesting new Searcher because: $median \n" . print_r($searcherData, true));
} }
} }
if($needSearcher && Redis::get($this->name) !== "locked"){ if($needSearcher && Redis::get($this->name) !== "locked"){
...@@ -234,14 +232,13 @@ abstract class Searchengine ...@@ -234,14 +232,13 @@ abstract class Searchengine
} }
$body = ""; $body = "";
if ($this->canCache && $this->cacheDuration > 0 && Cache::has($this->hash) && 0 === 1) { if ($this->canCache && $this->cacheDuration > 0 && Cache::has($this->hash)) {
$body = Cache::get($this->hash); $body = Cache::get($this->hash);
} elseif (Redis::hexists('search.' . $this->resultHash, $this->name)) { } elseif (Redis::hexists('search.' . $this->resultHash, $this->name)) {
$body = Redis::hget('search.' . $this->resultHash, $this->name); $body = Redis::hget('search.' . $this->resultHash, $this->name);
if ($this->canCache && $this->cacheDuration > 0 && 0 === 1) { if ($this->canCache && $this->cacheDuration > 0) {
Cache::put($this->hash, $body, $this->cacheDuration); Cache::put($this->hash, $body, $this->cacheDuration);
} }
} }
if ($body !== "") { if ($body !== "") {
$this->loadResults($body); $this->loadResults($body);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment