Commit 45619a43 authored by Karl Hasselbring's avatar Karl Hasselbring
Browse files

Merge branch 'development' into 493-styles-aktualisieren-vereinfachen-uberarbeiten

parents 1c97bdbb 218093ec
......@@ -9,12 +9,6 @@ use Response;
class AdminInterface extends Controller
{
public function index(Request $request)
{
$localFetcher = file_get_contents(action("AdminInterface@getFetcherStatus"));
die(var_dump($localFetcher));
}
public function getFetcherStatus()
{
// Let's get the stats for this server.
// First we need to check, which Fetcher could be available by parsing the sumas.xml
......@@ -42,9 +36,11 @@ class AdminInterface extends Controller
}
// So now we can generate Median Times for every Fetcher
$fetcherCount = 0;
foreach($stati as $engineName => $engineStats){
$connection = array();
$poptime = 0;
$fetcherCount += sizeof($engineStats["fetcher"]);
foreach($engineStats["fetcher"] as $pid => $stats){
foreach($stats["connection"] as $key => $value){
if(!isset($connection[$key])){
......@@ -66,7 +62,8 @@ class AdminInterface extends Controller
return view('admin.admin')
->with('title', 'Fetcher Status')
->with('stati', $stati);
->with('stati', $stati)
->with('fetcherCount', $fetcherCount);
$stati = json_encode($stati);
$response = Response::make($stati, 200);
$response->header("Content-Type", "application/json");
......
......@@ -11,6 +11,7 @@ class ImageController extends Controller
{
public function generateImage(Request $request)
{
/*
#Piwik Code
PiwikTracker::$URL = 'http://piwik.metager3.de';
$piwikTracker = new PiwikTracker($idSite = 1);
......@@ -23,7 +24,7 @@ class ImageController extends Controller
// Sendet Tracker request per http
$piwikTracker->doTrackPageView($site);
*/
$path = public_path() . '/img/1px.png';
$fileType = File::type($path);
$response = Response::make(File::get($path), 200);
......
......@@ -169,10 +169,15 @@ class MetaGerSearch extends Controller
}
}
/*
# Wikipedia Quicktip
$url = "https://" . APP::getLocale() . ".wikipedia.org/w/api.php?action=opensearch&search=" . urlencode($q) . "&limit=10&namespace=0&format=json&redirects=resolve";
$decodedResponse = json_decode($this->get($url), true);
try{
$content = $this->get($url);
}catch(\ErrorException $e){
$content = "";
}
$decodedResponse = json_decode($content, true);
if (isset($decodedResponse[1][0]) && isset($decodedResponse[2][0]) && isset($decodedResponse[3][0])) {
$quicktip = [];
$firstSummary = $decodedResponse[2][0];
......@@ -198,7 +203,7 @@ class MetaGerSearch extends Controller
$quicktips[] = $quicktip;
}
$mquicktips = array_merge($mquicktips, $quicktips);
*/
if (APP::getLocale() === "de") {
# Dict.cc Quicktip
if (count(explode(' ', $q)) < 3) {
......@@ -261,7 +266,8 @@ class MetaGerSearch extends Controller
public function get($url)
{
return file_get_contents($url);
$ctx = stream_context_create(array('http'=>array('timeout' => 2,)));
return file_get_contents($url, false, $ctx);
}
private function startsWith($haystack, $needle)
......
<?php
namespace App\Jobs;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Facades\Redis;
class Search implements ShouldQueue
{
use InteractsWithQueue, Queueable, SerializesModels;
protected $hash, $host, $port, $name, $getString, $useragent, $fp, $additionalHeaders;
protected $buffer_length = 8192;
/**
* Create a new job instance.
*
* @return void
*/
public function __construct($hash, $host, $port, $name, $getString, $useragent, $additionalHeaders)
{
$this->hash = $hash;
$this->host = $host;
$this->port = $port;
$this->name = $name;
$this->getString = $getString;
$this->useragent = $useragent;
$this->additionalHeaders = $additionalHeaders;
}
/**
* Execute the job.
*
* @return void
*/
public function handle()
{
$url = "";
if($this->port === "443"){
$url = "https://";
}else{
$url = "http://";
}
$url .= $this->host . $this->getString;
$ch = curl_init($url);
curl_setopt_array($ch, array(
CURLOPT_RETURNTRANSFER => 1,
CURLOPT_URL => $url,
CURLOPT_USERAGENT => $this->useragent,
CURLOPT_FOLLOWLOCATION => TRUE,
CURLOPT_CONNECTTIMEOUT => 10,
CURLOPT_MAXCONNECTS => 50,
CURLOPT_LOW_SPEED_LIMIT => 500,
CURLOPT_LOW_SPEED_TIME => 5,
CURLOPT_TIMEOUT => 10
));
$result = curl_exec($ch);
curl_close($ch);
Redis::hset('search.' . $this->hash, $this->name, $result);
}
private function readAnswer()
{
$time = microtime(true);
$headers = '';
$body = '';
$length = 0;
if (!$this->fp) {
return;
}
// get headers FIRST
$c = 0;
stream_set_blocking($this->fp, 1);
do {
// use fgets() not fread(), fgets stops reading at first newline
// or buffer which ever one is reached first
$data = fgets($this->fp, 8192);
// a sincle CRLF indicates end of headers
if ($data === false || $data == "\r\n" || feof($this->fp)) {
// break BEFORE OUTPUT
break;
}
if (sizeof(($tmp = explode(": ", $data))) === 2) {
$headers[strtolower(trim($tmp[0]))] = trim($tmp[1]);
}
$c++;
} while (true);
// end of headers
if (sizeof($headers) > 1) {
$bodySize = 0;
if (isset($headers["transfer-encoding"]) && $headers["transfer-encoding"] === "chunked") {
$body = $this->readChunked();
} elseif (isset($headers['content-length'])) {
$length = trim($headers['content-length']);
if (is_numeric($length) && $length >= 1) {
$body = $this->readBody($length);
}
$bodySize = strlen($body);
} elseif (isset($headers["connection"]) && strtolower($headers["connection"]) === "close") {
$body = $this->readUntilClose();
}else {
exit;
}
} else {
return;
}
Redis::del($this->host . "." . $this->socketNumber);
if (isset($headers["content-encoding"]) && $headers['content-encoding'] === "gzip") {
$body = $this->gunzip($body);
}
Redis::hset('search.' . $this->hash, $this->name, $body);
Redis::expire('search.' . $this->hash, 5);
}
private function readUntilClose()
{
$data = '';
stream_set_blocking($this->fp, 1);
while (!feof($this->fp)) {
$data .= fgets($this->fp, 8192);
}
# Bei dieser Funktion unterstützt der Host kein Keep-Alive:
# Wir beenden die Verbindung:
fclose($this->fp);
Redis::del($this->host . "." . $this->socketNumber);
return $data;
}
private function readBody($length)
{
$theData = '';
$done = false;
stream_set_blocking($this->fp, 0);
$startTime = time();
$lastTime = $startTime;
while (!feof($this->fp) && !$done && (($startTime + 1) > time()) && $length !== 0) {
usleep(100);
$theNewData = fgets($this->fp, 8192);
$theData .= $theNewData;
$length -= strlen($theNewData);
$done = (trim($theNewData) === '0');
}
return $theData;
}
private function readChunked()
{
$body = '';
// read from chunked stream
// loop though the stream
do {
// NOTE: for chunked encoding to work properly make sure
// there is NOTHING (besides newlines) before the first hexlength
// get the line which has the length of this chunk (use fgets here)
$line = fgets($this->fp, 8192);
// if it's only a newline this normally means it's read
// the total amount of data requested minus the newline
// continue to next loop to make sure we're done
if ($line == "\r\n") {
continue;
}
// the length of the block is sent in hex decode it then loop through
// that much data get the length
// NOTE: hexdec() ignores all non hexadecimal chars it finds
$length = hexdec($line);
if (!is_int($length)) {
trigger_error('Most likely not chunked encoding', E_USER_ERROR);
}
// zero is sent when at the end of the chunks
// or the end of the stream or error
if ($line === false || $length < 1 || feof($this->fp)) {
if ($length <= 0) {
fgets($this->fp, 8192);
}
// break out of the streams loop
break;
}
// loop though the chunk
do {
// read $length amount of data
// (use fread here)
$data = fread($this->fp, $length);
// remove the amount received from the total length on the next loop
// it'll attempt to read that much less data
$length -= strlen($data);
// PRINT out directly
// you could also save it directly to a file here
// store in string for later use
$body .= $data;
// zero or less or end of connection break
if ($length <= 0 || feof($this->fp)) {
// break out of the chunk loop
if ($length <= 0) {
fgets($this->fp, 8192);
}
break;
}
} while (true);
// end of chunk loop
} while (true);
// end of stream loop
return $body;
}
private function gunzip($zipped)
{
$offset = 0;
if (substr($zipped, 0, 2) == "\x1f\x8b") {
$offset = 2;
}
if (substr($zipped, $offset, 1) == "\x08") {
try
{
return gzinflate(substr($zipped, $offset + 8));
} catch (\Exception $e) {
abort(500, "Fehler beim unzip des Ergebnisses von folgendem Anbieter: " . $this->name);
}
}
return "Unknown Format";
}
private function writeRequest()
{
$out = "GET " . $this->getString . " HTTP/1.1\r\n";
$out .= "Host: " . $this->host . "\r\n";
$out .= "User-Agent: " . $this->useragent . "\r\n";
$out .= "Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8\r\n";
$out .= "Accept-Language: de,en-US;q=0.7,en;q=0.3\r\n";
$out .= "Accept-Encoding: gzip, deflate, br\r\n";
$out .= str_replace("$#!#$", "\r\n", $this->additionalHeaders);
$out .= "Connection: keep-alive\r\n\r\n";
# Anfrage senden:
$sent = 0;
$string = $out;
$time = microtime(true);
while (true) {
$timeElapsed = microtime(true) - $time;
if ($timeElapsed > 1.0) {
# Irgendwas ist mit unserem Socket passiert. Wir brauchen einen neuen:
if ($this->fp && is_resource($this->fp)) {
fclose($this->fp);
}
Redis::del($this->name . "." . $this->socketNumber);
$this->fp = $this->getFreeSocket();
$sent = 0;
$string = $out;
continue;
}
try {
$tmp = fwrite($this->fp, $string);
} catch (\ErrorException $e) {
# Irgendwas ist mit unserem Socket passiert. Wir brauchen einen neuen:
if ($this->fp && is_resource($this->fp)) {
fclose($this->fp);
}
Redis::del($this->name . "." . $this->socketNumber);
$this->fp = $this->getFreeSocket();
$sent = 0;
$string = $out;
continue;
}
if ($tmp) {
$sent += $tmp;
$string = substr($string, $tmp);
}
if ($sent >= strlen($out)) {
break;
}
}
if ($sent === strlen($out)) {
return true;
}
return false;
}
public function getFreeSocket()
{
# Je nach Auslastung des Servers ( gleichzeitige Abfragen ), kann es sein, dass wir mehrere Sockets benötigen um die Abfragen ohne Wartezeit beantworten zu können.
# pfsockopen öffnet dabei einen persistenten Socket, der also auch zwischen den verschiedenen php Prozessen geteilt werden kann.
# Wenn der Hostname mit einem bereits erstellten Socket übereinstimmt, wird die Verbindung also aufgegriffen und fortgeführt.
# Allerdings dürfen wir diesen nur verwenden, wenn er nicht bereits von einem anderen Prozess zur Kommunikation verwendet wird.
# Wenn dem so ist, probieren wir den nächsten Socket zu verwenden.
# Dies festzustellen ist komplizierter, als man sich das vorstellt. Folgendes System sollte funktionieren:
# 1. Stelle fest, ob dieser Socket neu erstellt wurde, oder ob ein existierender geöffnet wurde.
$counter = 0;
$fp = null;
$time = microtime(true);
do {
if (intval(Redis::exists($this->host . ".$counter")) === 0) {
Redis::set($this->host . ".$counter", 1);
Redis::expire($this->host . ".$counter", 5);
$this->socketNumber = $counter;
try
{
$fp = pfsockopen($this->getHost(), $this->port, $errstr, $errno, 1);
} catch (\ErrorException $e) {
break;
}
# Wir gucken, ob der Lesepuffer leer ist:
stream_set_blocking($fp, 0);
$string = fgets($fp, 8192);
if ($string !== false || feof($fp)) {
if ($this->fp && is_resource($this->fp)) {
fclose($fp);
}
$this->socketNumber = null;
Redis::del($this->host . ".$counter");
continue;
}
break;
}
$counter++;
} while (true);
return $fp;
}
public function getHost()
{
$return = "";
if ($this->port === "443") {
$return .= "tls://";
} else {
$return .= "tcp://";
}
$return .= $this->host;
return $return;
}
}
......@@ -14,8 +14,10 @@ class Searcher implements ShouldQueue
{
use InteractsWithQueue, Queueable, SerializesModels;
protected $name, $ch, $pid, $counter, $lastTime;
protected $MAX_REQUESTS = 500;
protected $name, $ch, $pid, $counter, $lastTime, $connectionInfo;
protected $MAX_REQUESTS = 100;
protected $importantEngines = array("Fastbot", "overture", "overtureAds");
protected $recheck;
/**
* Create a new job instance.
......@@ -32,6 +34,7 @@ class Searcher implements ShouldQueue
{
$this->name = $name;
$this->pid = getmypid();
$this->recheck = false;
// Submit this worker to the Redis System
Redis::expire($this->name, 5);
}
......@@ -57,62 +60,97 @@ class Searcher implements ShouldQueue
// We will perform a blocking pop on this queue so the queue can remain empty for a while
// without killing this searcher directly.
$mission = Redis::blpop($this->name . ".queue", 4);
$this->counter++;
$this->updateStats(microtime(true) - $time);
$this->switchToRunning();
// The mission can be empty when blpop hit the timeout
if(empty($mission)){
continue;
}else{
if(!empty($mission)){
$mission = $mission[1];
$this->counter++;#
$poptime = microtime(true) - $time;
}
// The mission is a String which can be divided to retrieve two informations:
// 1. The Hash Value where the result should be stored
// 2. The Url to Retrieve
// These two informations are divided by a ";" in the mission string
$hashValue = substr($mission, 0, strpos($mission, ";"));
$url = substr($mission, strpos($mission, ";") + 1);
Redis::hset('search.' . $hashValue, $this->name, "connected");
$result = $this->retrieveUrl($url);
$this->storeResult($result, $poptime, $hashValue);
if($this->counter === 3){
Redis::set($this->name, "running");
// The mission is a String which can be divided to retrieve two informations:
// 1. The Hash Value where the result should be stored
// 2. The Url to Retrieve
// These two informations are divided by a ";" in the mission string
$mission = explode(";", $mission);
$hashValue = $mission[0];
$url = base64_decode($mission[1]);
$timeout = $mission[2]; // Timeout from the MetaGer process in ms
$medianFetchTime = $this->getFetchTime(); // The median Fetch time of the search engine in ms
Redis::hset('search.' . $hashValue, $this->name, "connected");
$result = $this->retrieveUrl($url);
$this->storeResult($result, $poptime, $hashValue);
// Reset the time of the last Job so we can calculate
// the time we have spend waiting for a new job
// We submit that calculation to the Redis systemin the method
$time = microtime(true);
}
// Reset the time of the last Job so we can calculate
// the time we have spend waiting for a new job
// We submit that calculation to the Redis systemin the method
// storeResult()
$time = microtime(true);
// In sync mode every Searcher may only retrieve one result because it would block
// the execution of the remaining code otherwise:
if(getenv("QUEUE_DRIVER") === "sync" || $this->counter > $this->MAX_REQUESTS){
break;
break;
}
}
// When we reach this point, time has come for this Searcher to retire
$this->shutdown();
}
private function switchToRunning(){
/**
* When a Searcher is initially started the redis value for $this->name is set to "locked"
* which effectively will prevent new Searchers of this type to be started. (Value is checked by the MetaGer process which starts the Searchers)
* This is done so the MetaGer processes won't start hundreds of Searchers parallely when under high work load.
* It will force that Searchers can only be started one after the other.
* When a new Searcher has served a minimum of three requests we have enough data to decide whether we need even more Searchers.
* To do so we will then set the redis value for $this->name to "running".
* There is a case where we don't want new Searchers to be started even if we would need to do so to serve every Request:
* When a search engine needs more time to produce search results than the timeout of the MetaGer process, we won't even bother of spawning
* more and more Searchers because they would just block free worker processes from serving the important engines which will give results in time.
**/
if($this->counter === 3){
# If the MetaGer process waits longer for the results than this Fetcher will probably need to fetch
# Or if this engine is in the array of important engines which we will always try to serve
Redis::set($this->name, "running");
$this->recheck = false;
}
}
private function updateStats($poptime){
if($this->connectionInfo !== NULL){
$connectionInfo = base64_encode(json_encode($this->connectionInfo));
Redis::hset($this->name . ".stats", $this->pid, $connectionInfo . ";" . $poptime);
}
}
private function getFetchTime(){
$vals = Redis::hgetall($this->name . ".stats");
if(sizeof($vals) === 0){
return 0;
}else{
$totalTime = 0;
foreach ($vals as $pid => $value) {
$time = floatval(json_decode(base64_decode(explode(";", $value)[0]), true)["total_time"]);
$time *= 1000; // Transform from seconds to milliseconds
$totalTime += $time;
}
$totalTime /= sizeof($vals);
return $totalTime;
}
}
private function retrieveUrl($url){
// Set this URL to the Curl handle
curl_setopt($this->ch, CURLOPT_URL, $url);
$result = curl_exec($this->ch);
$this->connectionInfo = curl_getinfo($this->ch);
return $result;
}
private function storeResult($result, $poptime, $hashValue){
Redis::hset('search.' . $hashValue, $this->name, $result);
$connectionInfo = base64_encode(json_encode(curl_getinfo($this->ch), true));
Redis::hset($this->name . ".stats", $this->pid, $connectionInfo . ";" . $poptime);
$this->lastTime = microtime(true);
}
......
......@@ -105,7 +105,7 @@ abstract class Searchengine
# Prüft, ob die Suche bereits gecached ist, ansonsted wird sie als Job dispatched
public function startSearch(\App\MetaGer $metager)
{
if ($this->canCache && Cache::has($this->hash) && 0 == 1) {
if ($this->canCache && Cache::has($this->hash)) {
$this->cached = true;
$this->retrieveResults($metager);
} else {
......@@ -122,8 +122,13 @@ abstract class Searchengine
}else{
$url = "http://";