Commit 2b99a53e authored by Dominik Hebeler's avatar Dominik Hebeler
Browse files

Wenn keine Verbindung zum Cache Server aufgebaut werden kann (z.B. bei einem...

Wenn keine Verbindung zum Cache Server aufgebaut werden kann (z.B. bei einem Update), benutzen wir den Cache nun einfach nicht mehr anstatt abzustürzen.
parent 82ac5f45
......@@ -3,16 +3,16 @@
namespace App\Jobs;
use App\Jobs\Job;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Http\Request;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Redis;
class Search extends Job implements ShouldQueue
{
use InteractsWithQueue, SerializesModels;
protected $hash, $host, $port, $name, $getString, $useragent, $fp;
protected $buffer_length = 8192;
......@@ -23,10 +23,10 @@ class Search extends Job implements ShouldQueue
*/
public function __construct($hash, $host, $port, $name, $getString, $useragent)
{
$this->hash = $hash;
$this->host = $host;
$this->port = $port;
$this->name = $name;
$this->hash = $hash;
$this->host = $host;
$this->port = $port;
$this->name = $name;
$this->getString = $getString;
$this->useragent = $useragent;
}
......@@ -39,72 +39,66 @@ class Search extends Job implements ShouldQueue
public function handle(Request $request)
{
$this->fp = $this->getFreeSocket();
if($this->fp)
{
if($this->writeRequest())
{
if ($this->fp) {
if ($this->writeRequest()) {
$this->readAnswer();
}
}
}
private function readAnswer ()
private function readAnswer()
{
$time = microtime(true);
$time = microtime(true);
$headers = '';
$body = '';
$length = 0;
$body = '';
$length = 0;
if(!$this->fp)
{
if (!$this->fp) {
return;
}
// get headers FIRST
$c = 0;
stream_set_blocking($this->fp, 1);
do
{
do {
// use fgets() not fread(), fgets stops reading at first newline
// or buffer which ever one is reached first
$data = fgets($this->fp, 8192);
// a sincle CRLF indicates end of headers
if ($data === false || $data == "\r\n" || feof($this->fp) ) {
if ($data === false || $data == "\r\n" || feof($this->fp)) {
// break BEFORE OUTPUT
break;
}
if( sizeof(($tmp = explode(": ", $data))) === 2 )
if (sizeof(($tmp = explode(": ", $data))) === 2) {
$headers[strtolower(trim($tmp[0]))] = trim($tmp[1]);
}
$c++;
}
while (true);
} while (true);
// end of headers
if(sizeof($headers) > 1){
if (sizeof($headers) > 1) {
$bodySize = 0;
if( isset($headers["transfer-encoding"]) && $headers["transfer-encoding"] === "chunked" )
{
if (isset($headers["transfer-encoding"]) && $headers["transfer-encoding"] === "chunked") {
$body = $this->readChunked();
}elseif( isset($headers['content-length']) )
{
} elseif (isset($headers['content-length'])) {
$length = trim($headers['content-length']);
if(is_numeric($length) && $length >= 1)
if (is_numeric($length) && $length >= 1) {
$body = $this->readBody($length);
}
$bodySize = strlen($body);
}else
{
} else {
exit;
}
}else
{
} else {
return;
}
Redis::del($this->host . "." . $this->socketNumber);
if( isset($headers["content-encoding"]) && $headers['content-encoding'] === "gzip")
{
if (isset($headers["content-encoding"]) && $headers['content-encoding'] === "gzip") {
$body = $this->gunzip($body);
}
Redis::hset('search.' . $this->hash, $this->name, $body);
......@@ -114,12 +108,11 @@ class Search extends Job implements ShouldQueue
private function readBody($length)
{
$theData = '';
$done = false;
$done = false;
stream_set_blocking($this->fp, 0);
$startTime = time();
$lastTime = $startTime;
while (!feof($this->fp) && !$done && (($startTime + 1) > time()) && $length !== 0)
{
$lastTime = $startTime;
while (!feof($this->fp) && !$done && (($startTime + 1) > time()) && $length !== 0) {
usleep(100);
$theNewData = fgets($this->fp, 8192);
$theData .= $theNewData;
......@@ -135,8 +128,7 @@ class Search extends Job implements ShouldQueue
$body = '';
// read from chunked stream
// loop though the stream
do
{
do {
// NOTE: for chunked encoding to work properly make sure
// there is NOTHING (besides newlines) before the first hexlength
......@@ -162,15 +154,16 @@ class Search extends Job implements ShouldQueue
// zero is sent when at the end of the chunks
// or the end of the stream or error
if ($line === false || $length < 1 || feof($this->fp)) {
if($length <= 0)
fgets($this->fp, 8192);
if ($length <= 0) {
fgets($this->fp, 8192);
}
// break out of the streams loop
break;
}
// loop though the chunk
do
{
do {
// read $length amount of data
// (use fread here)
$data = fread($this->fp, $length);
......@@ -186,42 +179,42 @@ class Search extends Job implements ShouldQueue
$body .= $data;
// zero or less or end of connection break
if ($length <= 0 || feof($this->fp))
{
if ($length <= 0 || feof($this->fp)) {
// break out of the chunk loop
if($length <= 0)
if ($length <= 0) {
fgets($this->fp, 8192);
}
break;
}
}
while (true);
} while (true);
// end of chunk loop
}
while (true);
} while (true);
// end of stream loop
return $body;
}
private function gunzip($zipped) {
$offset = 0;
if (substr($zipped,0,2) == "\x1f\x8b")
$offset = 2;
if (substr($zipped,$offset,1) == "\x08")
{
try
{
return gzinflate(substr($zipped, $offset + 8));
} catch (\Exception $e)
{
abort(500, "Fehler beim unzip des Ergebnisses von folgendem Anbieter: " . $this->name);
private function gunzip($zipped)
{
$offset = 0;
if (substr($zipped, 0, 2) == "\x1f\x8b") {
$offset = 2;
}
if (substr($zipped, $offset, 1) == "\x08") {
try
{
return gzinflate(substr($zipped, $offset + 8));
} catch (\Exception $e) {
abort(500, "Fehler beim unzip des Ergebnisses von folgendem Anbieter: " . $this->name);
}
}
}
return "Unknown Format";
}
return "Unknown Format";
}
private function writeRequest ()
private function writeRequest()
{
$out = "GET " . $this->getString . " HTTP/1.1\r\n";
$out .= "Host: " . $this->host . "\r\n";
$out .= "User-Agent: " . $this->useragent . "\r\n";
......@@ -230,51 +223,69 @@ class Search extends Job implements ShouldQueue
$out .= "Accept-Encoding: gzip, deflate, br\r\n";
$out .= "Connection: keep-alive\r\n\r\n";
# Anfrage senden:
$sent = 0; $string = $out; $time = microtime(true);
while(true)
{
try{
$sent = 0;
$string = $out;
$time = microtime(true);
while (true) {
$timeElapsed = microtime(true) - $time;
if ($timeElapsed > 0.5) {
# Irgendwas ist mit unserem Socket passiert. Wir brauchen einen neuen:
if ($this->fp) {
fclose($this->fp);
}
Redis::del($this->name . "." . $this->socketNumber);
$this->fp = $this->getFreeSocket();
$sent = 0;
$string = $out;
break;
}
try {
$tmp = fwrite($this->fp, $string);
}catch(\ErrorException $e)
{
} catch (\ErrorException $e) {
# Irgendwas ist mit unserem Socket passiert. Wir brauchen einen neuen:
fclose($this->fp);
try {
fclose($this->fp);
} catch (\ErrorException $e) {
}
Redis::del($this->name . "." . $this->socketNumber);
$this->fp = $this->getFreeSocket();
$sent = 0;
$string = $out;
$sent = 0;
$string = $out;
continue;
}
if($tmp){
if ($tmp) {
$sent += $tmp;
$string = substr($string, $tmp);
}
if($sent >= strlen($out))
if ($sent >= strlen($out)) {
break;
}
}
if( $sent === strlen($out) )
{
if ($sent === strlen($out)) {
return true;
}
return false;
}
private function getFreeSocket()
public function getFreeSocket()
{
# Je nach Auslastung des Servers ( gleichzeitige Abfragen ), kann es sein, dass wir mehrere Sockets benötigen um die Abfragen ohne Wartezeit beantworten zu können.
# pfsockopen öffnet dabei einen persistenten Socket, der also auch zwischen den verschiedenen php Prozessen geteilt werden kann.
# pfsockopen öffnet dabei einen persistenten Socket, der also auch zwischen den verschiedenen php Prozessen geteilt werden kann.
# Wenn der Hostname mit einem bereits erstellten Socket übereinstimmt, wird die Verbindung also aufgegriffen und fortgeführt.
# Allerdings dürfen wir diesen nur verwenden, wenn er nicht bereits von einem anderen Prozess zur Kommunikation verwendet wird.
# Wenn dem so ist, probieren wir den nächsten Socket zu verwenden.
# Dies festzustellen ist komplizierter, als man sich das vorstellt. Folgendes System sollte funktionieren:
# 1. Stelle fest, ob dieser Socket neu erstellt wurde, oder ob ein existierender geöffnet wurde.
$counter = 0; $fp = null;
do
{
if( intval(Redis::exists($this->host . ".$counter")) === 0 )
{
$counter = 0;
$fp = null;
do {
if (intval(Redis::exists($this->host . ".$counter")) === 0) {
Redis::set($this->host . ".$counter", 1);
Redis::expire($this->host . ".$counter", 5);
$this->socketNumber = $counter;
......@@ -282,33 +293,29 @@ class Search extends Job implements ShouldQueue
try
{
$fp = pfsockopen($this->getHost() . ":" . $this->port . "/$counter", $this->port, $errstr, $errno, 1);
}catch(\ErrorException $e)
{
} catch (\ErrorException $e) {
break;
}
# Wir gucken, ob der Lesepuffer leer ist:
stream_set_blocking($fp, 0);
$string = fgets($fp, 8192);
if( $string !== false || feof($fp) )
{
if ($string !== false || feof($fp)) {
fclose($fp);
continue;
}
break;
}
$counter++;
}while(true);
} while (true);
return $fp;
}
private function getHost()
public function getHost()
{
$return = "";
if( $this->port === "443" )
{
if ($this->port === "443") {
$return .= "tls://";
}else
{
} else {
$return .= "tcp://";
}
$return .= $this->host;
......
......@@ -8,6 +8,7 @@ use Illuminate\Http\Request;
use Jenssegers\Agent\Agent;
use LaravelLocalization;
use Log;
use Predis\Connection\ConnectionException;
use Redis;
class MetaGer
......@@ -33,6 +34,7 @@ class MetaGer
protected $errors = [];
protected $addedHosts = [];
protected $startCount = 0;
protected $canCache = false;
# Daten über die Abfrage
protected $ip;
protected $language;
......@@ -70,6 +72,13 @@ class MetaGer
$this->languageDetect = new TextLanguageDetect();
$this->languageDetect->setNameMode("2");
try {
Cache::has('test');
$this->canCache = true;
} catch (ConnectionException $e) {
$this->canCache = false;
}
}
public function getHashCode()
......@@ -287,17 +296,7 @@ class MetaGer
$this->errors[] = "Leider konnten wir zu Ihrer Sucheingabe keine passenden Ergebnisse finden.";
}
if (isset($this->last) && count($this->last) > 0) {
$page = $this->page - 1;
$this->last = [
'page' => $page,
'startBackwards' => $this->results[0]->number,
'engines' => $this->last,
];
Cache::put(md5(serialize($this->last)), serialize($this->last), 60);
}
if (isset($this->next) && count($this->next) > 0 && count($this->results) > 0) {
if ($this->canCache() && isset($this->next) && count($this->next) > 0 && count($this->results) > 0) {
$page = $this->page + 1;
$this->next = [
'page' => $page,
......@@ -305,6 +304,8 @@ class MetaGer
'engines' => $this->next,
];
Cache::put(md5(serialize($this->next)), serialize($this->next), 60);
} else {
$this->next = [];
}
}
......@@ -973,6 +974,10 @@ class MetaGer
$this->addedHosts[$hash] = 1;
}
}
public function canCache()
{
return $this->canCache;
}
public function getSite()
{
return $this->site;
......
......@@ -71,6 +71,7 @@ abstract class Searchengine
$this->getString = $this->generateGetString($q, $metager->getUrl(), $metager->getLanguage(), $metager->getCategory());
$this->hash = md5($this->host . $this->getString . $this->port . $this->name);
$this->resultHash = $metager->getHashCode();
$this->canCache = $metager->canCache();
}
abstract public function loadResults($result);
......@@ -82,7 +83,7 @@ abstract class Searchengine
public function startSearch(\App\MetaGer $metager)
{
if (Cache::has($this->hash)) {
if ($this->canCache && Cache::has($this->hash)) {
$this->cached = true;
$this->retrieveResults($metager);
} else {
......@@ -142,11 +143,11 @@ abstract class Searchengine
}
$body = "";
if ($this->cacheDuration > 0 && Cache::has($this->hash)) {
if ($this->canCache && $this->cacheDuration > 0 && Cache::has($this->hash)) {
$body = Cache::get($this->hash);
} elseif (Redis::hexists('search.' . $this->resultHash, $this->name)) {
$body = Redis::hget('search.' . $this->resultHash, $this->name);
if ($this->cacheDuration > 0) {
if ($this->canCache && $this->cacheDuration > 0) {
Cache::put($this->hash, $body, $this->cacheDuration);
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment