Skip to content
Snippets Groups Projects
Commit 77a3e52e authored by Dominik Hebeler's avatar Dominik Hebeler
Browse files

Fetcher Logik verändert

laufende Fetcher beenden sich nun nach einer gewissen Zeit.
parent de2cc25b
No related branches found
No related tags found
1 merge request!1365Resolve "Filter Options for MetaGer"
...@@ -42,9 +42,11 @@ class AdminInterface extends Controller ...@@ -42,9 +42,11 @@ class AdminInterface extends Controller
} }
// So now we can generate Median Times for every Fetcher // So now we can generate Median Times for every Fetcher
$fetcherCount = 0;
foreach($stati as $engineName => $engineStats){ foreach($stati as $engineName => $engineStats){
$connection = array(); $connection = array();
$poptime = 0; $poptime = 0;
$fetcherCount += sizeof($engineStats["fetcher"]);
foreach($engineStats["fetcher"] as $pid => $stats){ foreach($engineStats["fetcher"] as $pid => $stats){
foreach($stats["connection"] as $key => $value){ foreach($stats["connection"] as $key => $value){
if(!isset($connection[$key])){ if(!isset($connection[$key])){
...@@ -66,7 +68,8 @@ class AdminInterface extends Controller ...@@ -66,7 +68,8 @@ class AdminInterface extends Controller
return view('admin.admin') return view('admin.admin')
->with('title', 'Fetcher Status') ->with('title', 'Fetcher Status')
->with('stati', $stati); ->with('stati', $stati)
->with('fetcherCount', $fetcherCount);
$stati = json_encode($stati); $stati = json_encode($stati);
$response = Response::make($stati, 200); $response = Response::make($stati, 200);
$response->header("Content-Type", "application/json"); $response->header("Content-Type", "application/json");
......
...@@ -15,7 +15,9 @@ class Searcher implements ShouldQueue ...@@ -15,7 +15,9 @@ class Searcher implements ShouldQueue
use InteractsWithQueue, Queueable, SerializesModels; use InteractsWithQueue, Queueable, SerializesModels;
protected $name, $ch, $pid, $counter, $lastTime; protected $name, $ch, $pid, $counter, $lastTime;
protected $MAX_REQUESTS = 500; protected $MAX_REQUESTS = 100;
protected $importantEngines = array("Fastbot", "overture", "overtureAds");
protected $recheck;
/** /**
* Create a new job instance. * Create a new job instance.
...@@ -32,6 +34,7 @@ class Searcher implements ShouldQueue ...@@ -32,6 +34,7 @@ class Searcher implements ShouldQueue
{ {
$this->name = $name; $this->name = $name;
$this->pid = getmypid(); $this->pid = getmypid();
$this->recheck = false;
// Submit this worker to the Redis System // Submit this worker to the Redis System
Redis::expire($this->name, 5); Redis::expire($this->name, 5);
} }
...@@ -57,55 +60,89 @@ class Searcher implements ShouldQueue ...@@ -57,55 +60,89 @@ class Searcher implements ShouldQueue
// We will perform a blocking pop on this queue so the queue can remain empty for a while // We will perform a blocking pop on this queue so the queue can remain empty for a while
// without killing this searcher directly. // without killing this searcher directly.
$mission = Redis::blpop($this->name . ".queue", 4); $mission = Redis::blpop($this->name . ".queue", 4);
$this->counter++;
// The mission can be empty when blpop hit the timeout // The mission can be empty when blpop hit the timeout
if(empty($mission)){ if(empty($mission)){
continue; continue;
}else{ }else{
$mission = $mission[1]; $mission = $mission[1];
$this->counter++;#
$poptime = microtime(true) - $time; $poptime = microtime(true) - $time;
}
// The mission is a String which can be divided to retrieve two informations:
// 1. The Hash Value where the result should be stored
// 2. The Url to Retrieve
// These two informations are divided by a ";" in the mission string
$hashValue = substr($mission, 0, strpos($mission, ";"));
$url = substr($mission, strpos($mission, ";") + 1);
Redis::hset('search.' . $hashValue, $this->name, "connected");
$result = $this->retrieveUrl($url);
$this->storeResult($result, $poptime, $hashValue);
if($this->counter === 3){ // The mission is a String which can be divided to retrieve two informations:
Redis::set($this->name, "running"); // 1. The Hash Value where the result should be stored
// 2. The Url to Retrieve
// These two informations are divided by a ";" in the mission string
$mission = explode(";", $mission);
$hashValue = $mission[0];
$url = base64_decode($mission[1]);
$timeout = $mission[2]; // Timeout from the MetaGer process in ms
$medianFetchTime = $this->getFetchTime(); // The median Fetch time of the search engine in ms
Redis::hset('search.' . $hashValue, $this->name, "connected");
$result = $this->retrieveUrl($url);
$this->storeResult($result, $poptime, $hashValue);
/**
* When a Searcher is initially started the redis value for $this->name is set to "locked"
* which effectively will prevent new Searchers of this type to be started. (Value is checked by the MetaGer process which starts the Searchers)
* This is done so the MetaGer processes won't start hundreds of Searchers parallely when under high work load.
* It will force that Searchers can only be started one after the other.
* When a new Searcher has served a minimum of three requests we have enough data to decide whether we need even more Searchers.
* To do so we will then set the redis value for $this->name to "running".
* There is a case where we don't want new Searchers to be started even if we would need to do so to serve every Request:
* When a search engine needs more time to produce search results than the timeout of the MetaGer process, we won't even bother of spawning
* more and more Searchers because they would just block free worker processes from serving the important engines which will give results in time.
**/
if($this->counter === 3 || $this->recheck){
# If the MetaGer process waits longer for the results than this Fetcher will probably need to fetch
# Or if this engine is in the array of important engines which we will always try to serve
if($timeout >= $medianFetchTime || in_array($this->name, $this->importantEngines)){
Redis::set($this->name, "running");
$this->recheck = false;
}else{
$this->recheck = true;
}
}
// Reset the time of the last Job so we can calculate
// the time we have spend waiting for a new job
// We submit that calculation to the Redis systemin the method
$time = microtime(true);
} }
// Reset the time of the last Job so we can calculate
// the time we have spend waiting for a new job
// We submit that calculation to the Redis systemin the method
// storeResult()
$time = microtime(true);
// In sync mode every Searcher may only retrieve one result because it would block // In sync mode every Searcher may only retrieve one result because it would block
// the execution of the remaining code otherwise: // the execution of the remaining code otherwise:
if(getenv("QUEUE_DRIVER") === "sync" || $this->counter > $this->MAX_REQUESTS){ if(getenv("QUEUE_DRIVER") === "sync" || $this->counter > $this->MAX_REQUESTS){
break; break;
} }
} }
// When we reach this point, time has come for this Searcher to retire // When we reach this point, time has come for this Searcher to retire
$this->shutdown(); $this->shutdown();
} }
private function getFetchTime(){
$vals = Redis::hgetall($this->name . ".stats");
if(sizeof($vals) === 0){
return 0;
}else{
$totalTime = 0;
foreach ($vals as $pid => $value) {
$time = floatval(json_decode(base64_decode(explode(";", $value)[0]), true)["total_time"]);
$time *= 1000; // Transform from seconds to milliseconds
$totalTime += $time;
}
$totalTime /= sizeof($vals);
die(var_dump($totalTime));
return $totalTime;
}
}
private function retrieveUrl($url){ private function retrieveUrl($url){
// Set this URL to the Curl handle // Set this URL to the Curl handle
curl_setopt($this->ch, CURLOPT_URL, $url); curl_setopt($this->ch, CURLOPT_URL, $url);
$result = curl_exec($this->ch); $result = curl_exec($this->ch);
return $result; return $result;
} }
......
...@@ -123,7 +123,8 @@ abstract class Searchengine ...@@ -123,7 +123,8 @@ abstract class Searchengine
$url = "http://"; $url = "http://";
} }
$url .= $this->host . $this->getString; $url .= $this->host . $this->getString;
$mission = $this->resultHash . ";" . $url; $url = base64_encode($url);
$mission = $this->resultHash . ";" . $url . ";" . $metager->getTime();
// Submit this mission to the corresponding Redis Queue // Submit this mission to the corresponding Redis Queue
// Since each Searcher is dedicated to one specific search engine // Since each Searcher is dedicated to one specific search engine
// each Searcher has it's own queue lying under the redis key <name>.queue // each Searcher has it's own queue lying under the redis key <name>.queue
......
...@@ -3,6 +3,8 @@ ...@@ -3,6 +3,8 @@
@section('title', $title ) @section('title', $title )
@section('content') @section('content')
<h1>https://metager3.de</h1>
<p>Es laufen insgesamt <code>{{$fetcherCount}}/50</code> Fetcher.</p>
<table class="table table-bordered"> <table class="table table-bordered">
<thead> <thead>
<th>Name</th> <th>Name</th>
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment