Commit 63a7b9aa authored by Dominik Hebeler's avatar Dominik Hebeler
Browse files

Merge branch '71-suche-beschleunigen' into 'master'

Resolve "Suche beschleunigen"

Closes #71

See merge request !84
parents da6ea61f c24d4d26
<?php
namespace App\Jobs;
use App\Jobs\Job;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Http\Request;
use Redis;
use Log;
class Search extends Job implements ShouldQueue
{
use InteractsWithQueue, SerializesModels;
protected $hash, $host, $port, $name, $getString, $useragent, $fp, $sumaFile;
protected $buffer_length = 8192;
/**
* Create a new job instance.
*
* @return void
*/
public function __construct($hash, $host, $port, $name, $getString, $useragent, $sumaFile)
{
$this->hash = $hash;
$this->host = $host;
$this->port = $port;
$this->name = $name;
$this->getString = $getString;
$this->useragent = $useragent;
$this->sumaFile = $sumaFile;
}
/**
* Execute the job.
*
* @return void
*/
public function handle(Request $request)
{
$this->fp = $this->getFreeSocket();
if(!$this->fp)
{
$this->disable($this->sumaFile, "Die Suchmaschine " . $this->name . " wurde für 1h deaktiviert, weil keine Verbindung aufgebaut werden konnte");
}else
{
if($this->writeRequest())
{
$this->readAnswer();
}
}
}
public function disable($sumaFile, $message)
{
$xml = simplexml_load_file($sumaFile);
$xml->xpath("//sumas/suma[@name='" . $this->name . "']")['0']['disabled'] = date(DATE_RFC822, mktime(date("H")+1,date("i"), date("s"), date("m"), date("d"), date("Y")));
$xml->saveXML($sumaFile);
}
private function readAnswer ()
{
$time = microtime(true);
$headers = '';
$body = '';
$length = 0;
if(!$this->fp)
{
return;
}
// get headers FIRST
$c = 0;
stream_set_blocking($this->fp, 1);
do
{
// use fgets() not fread(), fgets stops reading at first newline
// or buffer which ever one is reached first
$data = fgets($this->fp, 8192);
// a sincle CRLF indicates end of headers
if ($data === false || $data == "\r\n" || feof($this->fp) ) {
// break BEFORE OUTPUT
break;
}
if( sizeof(($tmp = explode(": ", $data))) === 2 )
$headers[trim($tmp[0])] = trim($tmp[1]);
$c++;
}
while (true);
// end of headers
if(sizeof($headers) > 1){
$bodySize = 0;
if( isset($headers["Transfer-Encoding"]) && $headers["Transfer-Encoding"] === "chunked" )
{
$body = $this->readChunked();
}elseif( isset($headers['Content-Length']) )
{
$length = trim($headers['Content-Length']);
if(is_numeric($length) && $length >= 1)
$body = $this->readBody($length);
$bodySize = strlen($body);
}else
{
Log::error("Konnte nicht herausfinden, wie ich die Serverantwort von: " . $this->name . " auslesen soll. Header war: " . print_r($headers));
exit;
}
}else
{
return;
}
Redis::del($this->host . "." . $this->socketNumber);
if( isset($headers["Content-Encoding"]) && $headers['Content-Encoding'] === "gzip")
{
$body = $this->gunzip($body);
}
Redis::hset('search.' . $this->hash, $this->name, $body);
Redis::expire('search.' . $this->hash, 5);
}
private function readBody($length)
{
$theData = '';
$done = false;
stream_set_blocking($this->fp, 0);
$startTime = time();
$lastTime = $startTime;
while (!feof($this->fp) && !$done && (($startTime + 1) > time()) && $length !== 0)
{
usleep(100);
$theNewData = fgets($this->fp, 8192);
$theData .= $theNewData;
$length -= strlen($theNewData);
$done = (trim($theNewData) === '0');
}
return $theData;
}
private function readChunked()
{
$body = '';
// read from chunked stream
// loop though the stream
do
{
// NOTE: for chunked encoding to work properly make sure
// there is NOTHING (besides newlines) before the first hexlength
// get the line which has the length of this chunk (use fgets here)
$line = fgets($this->fp, 8192);
// if it's only a newline this normally means it's read
// the total amount of data requested minus the newline
// continue to next loop to make sure we're done
if ($line == "\r\n") {
continue;
}
// the length of the block is sent in hex decode it then loop through
// that much data get the length
// NOTE: hexdec() ignores all non hexadecimal chars it finds
$length = hexdec($line);
if (!is_int($length)) {
trigger_error('Most likely not chunked encoding', E_USER_ERROR);
}
// zero is sent when at the end of the chunks
// or the end of the stream or error
if ($line === false || $length < 1 || feof($this->fp)) {
if($length <= 0)
fgets($this->fp, 8192);
// break out of the streams loop
break;
}
// loop though the chunk
do
{
// read $length amount of data
// (use fread here)
$data = fread($this->fp, $length);
// remove the amount received from the total length on the next loop
// it'll attempt to read that much less data
$length -= strlen($data);
// PRINT out directly
// you could also save it directly to a file here
// store in string for later use
$body .= $data;
// zero or less or end of connection break
if ($length <= 0 || feof($this->fp))
{
// break out of the chunk loop
if($length <= 0)
fgets($this->fp, 8192);
break;
}
}
while (true);
// end of chunk loop
}
while (true);
// end of stream loop
return $body;
}
private function gunzip($zipped) {
$offset = 0;
if (substr($zipped,0,2) == "\x1f\x8b")
$offset = 2;
if (substr($zipped,$offset,1) == "\x08")
{
try
{
return gzinflate(substr($zipped, $offset + 8));
} catch (\Exception $e)
{
abort(500, "Fehler beim unzip des Ergebnisses von folgendem Anbieter: " . $this->name);
}
}
return "Unknown Format";
}
private function writeRequest ()
{
$out = "GET " . $this->getString . " HTTP/1.1\r\n";
$out .= "Host: " . $this->host . "\r\n";
$out .= "User-Agent: " . $this->useragent . "\r\n";
$out .= "Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8\r\n";
$out .= "Accept-Language: de,en-US;q=0.7,en;q=0.3\r\n";
$out .= "Accept-Encoding: gzip, deflate, br\r\n";
$out .= "Connection: keep-alive\r\n\r\n";
# Anfrage senden:
$sent = 0; $string = $out; $time = microtime(true);
while(true)
{
try{
$tmp = fwrite($this->fp, $string);
}catch(\ErrorException $e)
{
# Irgendwas ist mit unserem Socket passiert. Wir brauchen einen neuen:
fclose($this->fp);
Redis::del($this->name . "." . $this->socketNumber);
$this->fp = $this->getFreeSocket();
$sent = 0;
$string = $out;
continue;
}
if($tmp){
$sent += $tmp;
$string = substr($string, $tmp);
}else
Log::error("Fehler beim schreiben.");
if(((microtime(true) - $time) / 1000000) >= 500)
{
Log::error("Konnte die Request Daten nicht an: " . $this->name . " senden");
}
if($sent >= strlen($out))
break;
}
if( $sent === strlen($out) )
{
return true;
}
return false;
}
private function getFreeSocket()
{
# Je nach Auslastung des Servers ( gleichzeitige Abfragen ), kann es sein, dass wir mehrere Sockets benötigen um die Abfragen ohne Wartezeit beantworten zu können.
# pfsockopen öffnet dabei einen persistenten Socket, der also auch zwischen den verschiedenen php Prozessen geteilt werden kann.
# Wenn der Hostname mit einem bereits erstellten Socket übereinstimmt, wird die Verbindung also aufgegriffen und fortgeführt.
# Allerdings dürfen wir diesen nur verwenden, wenn er nicht bereits von einem anderen Prozess zur Kommunikation verwendet wird.
# Wenn dem so ist, probieren wir den nächsten Socket zu verwenden.
# Dies festzustellen ist komplizierter, als man sich das vorstellt. Folgendes System sollte funktionieren:
# 1. Stelle fest, ob dieser Socket neu erstellt wurde, oder ob ein existierender geöffnet wurde.
$counter = 0; $fp = null;
do
{
if( intval(Redis::exists($this->host . ".$counter")) === 0 )
{
Redis::set($this->host . ".$counter", 1);
Redis::expire($this->host . ".$counter", 5);
$this->socketNumber = $counter;
try
{
$fp = pfsockopen($this->getHost() . ":" . $this->port . "/$counter", $this->port, $errstr, $errno, 1);
if(!$fp)
Log::error("lkajng");
}catch(\ErrorException $e)
{
Log::error('Fehler beim erstellen des Sockets zu: ' . $this->getHost());
break;
}
# Wir gucken, ob der Lesepuffer leer ist:
stream_set_blocking($fp, 0);
$string = fgets($fp, 8192);
if( $string !== false || feof($fp) )
{
Log::error("Der Lesepuffer von: " . $this->name . " war nach dem Erstellen nicht leer. Musste den Socket neu starten.");
fclose($fp);
continue;
}
Log::info($this->getHost() . ":" . $this->port . "/$counter");
break;
}
$counter++;
}while(true);
return $fp;
}
private function getHost()
{
$return = "";
if( $this->port === "443" )
{
$return .= "tls://";
}else
{
$return .= "tcp://";
}
$return .= $this->host;
return $return;
}
}
......@@ -52,8 +52,6 @@ class MetaGer
function __construct()
{
$this->starttime = microtime(true);
define('CRLF', "\r\n");
define('BUFFER_LENGTH', 8192);
if( file_exists(config_path() . "/blacklistDomains.txt") && file_exists(config_path() . "/blacklistUrl.txt") )
{
# Blacklists einlesen:
......@@ -70,6 +68,12 @@ class MetaGer
$this->languageDetect->setNameMode("2");
}
public function getHashCode ()
{
$string = url()->full();
return md5($string);
}
public function rankAll ()
{
foreach( $this->engines as $engine )
......@@ -278,7 +282,6 @@ class MetaGer
public function createSearchEngines (Request $request)
{
#die(SocketRocket::get("tls", "dominik-pfennig.de", "", 443));
# Überprüfe, welche Sumas eingeschaltet sind
......@@ -312,7 +315,7 @@ class MetaGer
if(!(isset($suma['disabled']) && $suma['disabled']->__toString() === "1"))
{
if($suma["name"]->__toString() === "overture")
if($suma["name"]->__toString() === "overture" || $suma["name"]->__toString() === "overtureAds")
{
$overtureEnabled = TRUE;
}
......@@ -336,7 +339,7 @@ class MetaGer
){
if(!(isset($suma['disabled']) && $suma['disabled']->__toString() === "1"))
{
if($suma["name"]->__toString() === "overture")
if($suma["name"]->__toString() === "overture" || $suma["name"]->__toString() === "overtureAds")
{
$overtureEnabled = TRUE;
}
......@@ -353,6 +356,8 @@ class MetaGer
$this->errors[] = "Achtung: Sie haben in ihren Einstellungen keine Suchmaschine ausgewählt.";
}
$engines = [];
foreach($enabledSearchengines as $engine){
......@@ -360,8 +365,8 @@ class MetaGer
{
continue;
}
# Wenn diese Suchmaschine gar nicht eingeschaltet sein soll
# Wenn diese Suchmaschine gar nicht eingeschaltet sein soll
$path = "App\Models\parserSkripte\\" . ucfirst($engine["package"]->__toString());
$time = microtime();
......@@ -378,43 +383,51 @@ class MetaGer
$this->sockets[$tmp->name] = $tmp->fp;
}
}
# Nun passiert ein elementarer Schritt.
# Wir warten auf die Antwort der Suchmaschinen, da wir vorher nicht weiter machen können.
# aber natürlich nicht ewig.
# Die Verbindung steht zu diesem Zeitpunkt und auch unsere Request wurde schon gesendet.
# Wir geben der Suchmaschine nun bis zu 500ms Zeit zu antworten.
# Jetzt lesen wir alles aus, was da ist und verwerfen den Rest:
$enginesToLoad = count($engines);
$loadedEngines = 0;
$time = 0;
$timeStart = microtime(true);
while( true )
{
$time = (microtime(true) - $timeStart) * 1000;
$loadedEngines = intval(Redis::hlen('search.' . $this->getHashCode()));
$canBreak = true;
if( $overtureEnabled && !Redis::hexists('search.' . $this->getHashCode(), 'overture') && !Redis::hexists('search.' . $this->getHashCode(), 'overtureAds'))
$canBreak = false;
# Abbruchbedingung
if($time < 500)
{
if($loadedEngines >= $enginesToLoad)
if($loadedEngines >= $enginesToLoad && $canBreak)
break;
}elseif( $time >= 500 && $time < $this->time)
{
if( ($loadedEngines / ($enginesToLoad * 1.0)) >= 0.8 )
if( ($loadedEngines / ($enginesToLoad * 1.0)) >= 0.8 && $canBreak )
break;
}else
{
break;
}
foreach($engines as $engine)
usleep(50000);
}
foreach($engines as $engine)
{
if(!$engine->loaded)
{
if(!$engine->loaded)
{
$success = $engine->retrieveResults();
if($engine->loaded)
$loadedEngines += 1;
}
$engine->retrieveResults();
}
usleep(50000);
$time += 50;
}
# und verwerfen den Rest:
foreach( $engines as $engine )
{
if( !$engine->loaded )
......
......@@ -4,9 +4,13 @@ namespace App\Models;
use App\MetaGer;
use Log;
use Redis;
use App\Jobs\Search;
use Illuminate\Foundation\Bus\DispatchesJobs;
abstract class Searchengine
abstract class Searchengine
{
use DispatchesJobs;
protected $ch; # Curl Handle zum erhalten der Ergebnisse
public $fp;
......@@ -58,67 +62,20 @@ abstract class Searchengine
$this->startTime = microtime();
$this->getString = $this->generateGetString($metager->getQ(), $metager->getUrl(), $metager->getLanguage(), $metager->getCategory());
$counter = 0;
# Wir benötigen einen verfügbaren Socket, über den wir kommunizieren können:
$time = microtime(true);
$this->fp = $this->getFreeSocket();
$this->setStatistic("connection_time", ((microtime(true)-$time) / 1000000));
if(!$this->fp)
{
$this->disable($metager->getSumaFile(), "Die Suchmaschine " . $this->name . " wurde für 1h deaktiviert, weil keine Verbindung aufgebaut werden konnte");
}else
{
$time = microtime(true);
$this->writeRequest();
$this->setStatistic("write_time", ((microtime(true)-$time) / 1000000));
}
$this->hash = $metager->getHashCode();
# Die Anfragen an die Suchmaschinen werden nun von der Laravel-Queue bearbeitet:
# Hinweis: solange in der .env der QUEUE_DRIVER auf "sync" gestellt ist, werden die Abfragen
# nacheinander abgeschickt.
# Sollen diese Parallel verarbeitet werden, muss ein anderer QUEUE_DRIVER verwendet werden.
# siehe auch: https://laravel.com/docs/5.2/queues
$this->dispatch(new Search($this->hash, $this->host, $this->port, $this->name, $this->getString, $this->useragent, $metager->getSumaFile()));
}
public abstract function loadResults($result);
private function writeRequest ()
{
$out = "GET " . $this->getString . " HTTP/1.1\r\n";
$out .= "Host: " . $this->host . "\r\n";
$out .= "User-Agent: " . $this->useragent . "\r\n";
$out .= "Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8\r\n";
$out .= "Accept-Language: de,en-US;q=0.7,en;q=0.3\r\n";
$out .= "Accept-Encoding: gzip, deflate, br\r\n";
$out .= "Connection: keep-alive\r\n\r\n";
# Anfrage senden:
$sent = 0; $string = $out; $time = microtime(true);
while(true)
{
try{
$tmp = fwrite($this->fp, $string);
}catch(\ErrorException $e)
{
# Irgendwas ist mit unserem Socket passiert. Wir brauchen einen neuen:
fclose($this->fp);
Redis::del($this->name . "." . $this->socketNumber);
$this->fp = $this->getFreeSocket();
$sent = 0;
$string = $out;
continue;
}
if($tmp){
$sent += $tmp;
$string = substr($string, $tmp);
}else
abort(500, "Fehler beim schreiben.");
if(((microtime(true) - $time) / 1000000) >= 500)
{
abort(500, "Konnte die Request Daten nicht an: " . $this->name . " senden");
}
if($sent >= strlen($out))
break;
}
}
public function rank (\App\MetaGer $metager)
{
......@@ -128,47 +85,7 @@ abstract class Searchengine
}
}
private function getFreeSocket()
{
# Je nach Auslastung des Servers ( gleichzeitige Abfragen ), kann es sein, dass wir mehrere Sockets benötigen um die Abfragen ohne Wartezeit beantworten zu können.
# pfsockopen öffnet dabei einen persistenten Socket, der also auch zwischen den verschiedenen php Prozessen geteilt werden kann.
# Wenn der Hostname mit einem bereits erstellten Socket übereinstimmt, wird die Verbindung also aufgegriffen und fortgeführt.
# Allerdings dürfen wir diesen nur verwenden, wenn er nicht bereits von einem anderen Prozess zur Kommunikation verwendet wird.
# Wenn dem so ist, probieren wir den nächsten Socket zu verwenden.
# Dies festzustellen ist komplizierter, als man sich das vorstellt. Folgendes System sollte funktionieren:
# 1. Stelle fest, ob dieser Socket neu erstellt wurde, oder ob ein existierender geöffnet wurde.
$counter = 0; $fp = null;
do
{
if( intval(Redis::exists($this->host . ".$counter")) === 0 )
{
Redis::set($this->host . ".$counter", 1);
Redis::expire($this->host . ".$counter", 5);
$this->socketNumber = $counter;
try
{
$fp = pfsockopen($this->getHost() . ":" . $this->port . "/$counter", $this->port, $errstr, $errno, 1);
}catch(\ErrorException $e)
{
break;
}
# Wir gucken, ob der Lesepuffer leer ist:
stream_set_blocking($fp, 0);
if(fgets($fp, BUFFER_LENGTH) !== false)
{
Log::error("Der Lesepuffer von: " . $this->name . " war nach dem Erstellen nicht leer. Musste den Socket neu starten.");
fclose($fp);
$fp = pfsockopen($this->getHost() . ":" . $this->port . "/$counter", $this->port, $errstr, $errno, 1);
}
break;
}
$counter++;
}while(true);
return $fp;
}
private function setStatistic($key, $val)
{
......@@ -179,13 +96,7 @@ abstract class Searchengine
$this->$key = $newVal;
}
public function disable($sumaFile, $message)
{
Log::info($message);
$xml = simplexml_load_file($sumaFile);
$xml->xpath("//sumas/suma[@name='" . $this->name . "']")['0']['disabled'] = date(DATE_RFC822, mktime(date("H")+1,date("i"), date("s"), date("m"), date("d"), date("Y")));
$xml->saveXML($sumaFile);
}