Commit fa392d62 authored by Dominik Hebeler's avatar Dominik Hebeler
Browse files

Weitere Änderungen am Searcher

parent a80de739
...@@ -12,9 +12,9 @@ use Log; ...@@ -12,9 +12,9 @@ use Log;
class Searcher implements ShouldQueue class Searcher implements ShouldQueue
{ {
use InteractsWithQueue, Queueable, SerializesModels, DispatchesJobs; use InteractsWithQueue, Queueable, SerializesModels;
protected $name, $ch; protected $name, $ch, $pid, $counter, $lastTime;
protected $MAX_REQUESTS = 500; protected $MAX_REQUESTS = 500;
/** /**
...@@ -31,6 +31,7 @@ class Searcher implements ShouldQueue ...@@ -31,6 +31,7 @@ class Searcher implements ShouldQueue
public function __construct($name) public function __construct($name)
{ {
$this->name = $name; $this->name = $name;
$this->pid = getmypid();
// Submit this worker to the Redis System // Submit this worker to the Redis System
Redis::set($this->name, "running"); Redis::set($this->name, "running");
Redis::expire($this->name, 5); Redis::expire($this->name, 5);
...@@ -46,7 +47,7 @@ class Searcher implements ShouldQueue ...@@ -46,7 +47,7 @@ class Searcher implements ShouldQueue
// This Searches is freshly called so we need to initialize the curl handle $ch // This Searches is freshly called so we need to initialize the curl handle $ch
$this->ch = $this->initCurlHandle(); $this->ch = $this->initCurlHandle();
$this->counter = 0; // Counts the number of answered jobs $this->counter = 0; // Counts the number of answered jobs
$lastJob = microtime(true); $time = microtime(true);
while(true){ while(true){
// Update the expire // Update the expire
Redis::expire($this->name, 5); Redis::expire($this->name, 5);
...@@ -59,15 +60,12 @@ class Searcher implements ShouldQueue ...@@ -59,15 +60,12 @@ class Searcher implements ShouldQueue
// The mission can be empty when blpop hit the timeout // The mission can be empty when blpop hit the timeout
if(empty($mission)){ if(empty($mission)){
// In that case it should be safe to simply exit this job continue;
if(((microtime(true) - $lastJob) ) > 300)
break;
else
continue;
}else{ }else{
$mission = $mission[1]; $mission = $mission[1];
$this->counter++; $this->counter++;#
$lastJob = microtime(true); $poptime = microtime(true) - $time;
$time = microtime(true);
} }
// The mission is a String which can be divided to retrieve two informations: // The mission is a String which can be divided to retrieve two informations:
...@@ -81,18 +79,16 @@ class Searcher implements ShouldQueue ...@@ -81,18 +79,16 @@ class Searcher implements ShouldQueue
$result = $this->retrieveUrl($url); $result = $this->retrieveUrl($url);
$this->storeResult($result, $hashValue); $this->storeResult($result, $poptime, $hashValue);
// In sync mode every Searcher may only retrieve one result because it would block // In sync mode every Searcher may only retrieve one result because it would block
// the execution of the remaining code otherwise: // the execution of the remaining code otherwise:
if(getenv("QUEUE_DRIVER") === "sync" || $this->counter > $this->MAX_REQUESTS){ if(getenv("QUEUE_DRIVER") === "sync" || $this->counter > $this->MAX_REQUESTS){
if(getenv("QUEUE_DRIVER") === "sync") Redis::del($this->name);
break; break;
} }
} }
// When we reach this point, time has come for this Searcher to retire // When we reach this point, time has come for this Searcher to retire
// We should close our curl handle before we do so $this->exit();
curl_close($this->ch);
} }
private function retrieveUrl($url){ private function retrieveUrl($url){
...@@ -104,8 +100,20 @@ class Searcher implements ShouldQueue ...@@ -104,8 +100,20 @@ class Searcher implements ShouldQueue
return $result; return $result;
} }
private function storeResult($result, $hashValue){ private function storeResult($result, $poptime, $hashValue){
Redis::hset('search.' . $hashValue, $this->name, $result); Redis::hset('search.' . $hashValue, $this->name, $result);
$connectionInfo = base64_encode(json_encode(curl_getinfo($this->ch), true));
Redis::hset($this->name . ".stats", $this->pid, $connectionInfo . ";" . $poptime);
$this->lastTime = microtime(true);
}
private function exit(){
Redis::hdel($this->name . ".stats", $this->pid);
if(sizeof(Redis::hgetall($this->name . ".stats")) === 0){
Redis::del($this->name);
}
// We should close our curl handle before we do so
curl_close($this->ch);
} }
private function initCurlHandle(){ private function initCurlHandle(){
......
...@@ -509,6 +509,7 @@ class MetaGer ...@@ -509,6 +509,7 @@ class MetaGer
$sumaCount += 1; $sumaCount += 1;
} }
$enabledSearchengines[] = $suma; $enabledSearchengines[] = $suma;
break;
} }
} }
......
...@@ -129,15 +129,37 @@ abstract class Searchengine ...@@ -129,15 +129,37 @@ abstract class Searchengine
// each Searcher has it's own queue lying under the redis key <name>.queue // each Searcher has it's own queue lying under the redis key <name>.queue
Redis::rpush($this->name . ".queue", $mission); Redis::rpush($this->name . ".queue", $mission);
// If there is no Searcher process for this engine running at this time, we start one /**
if(Redis::get($this->name) === NULL){ * We have Searcher processes running for MetaGer
Log::info("Starting Searcher"); * Each Searcher is dedicated to one specific Searchengine and fetches it's results.
/* Die Anfragen an die Suchmaschinen werden nun von der Laravel-Queue bearbeitet: * We can have multiple Searchers for each engine, if needed.
* Hinweis: solange in der .env der QUEUE_DRIVER auf "sync" gestellt ist, werden die Abfragen * At this point we need to decide, whether we need to start a new Searcher process or
* nacheinander abgeschickt. * if we have enough of them running.
* Sollen diese Parallel verarbeitet werden, muss ein anderer QUEUE_DRIVER verwendet werden. * The information for that is provided through the redis system. Each running searcher
* siehe auch: https://laravel.com/docs/5.2/queues * gives information how long it has waited to be given the last fetcher job.
*/ * The longer this time value is, the less frequent the search engine is used and the less
* searcher of that type we need.
* But if it's too low, i.e. 100ms, then the searcher is near to it's full workload and needs assistence.
**/
$needSearcher = false;
$searcherData = Redis::hgetall($this->name . ".stats");
// We now have an array of statistical data from the searchers
// Each searcher has one entry in it.
// So if it's empty, then we have currently no searcher running and
// of course need to spawn a new one.
if(sizeof($searcherData) === 0){
$needSearcher = true;
}else{
// There we go:
// There's at least one Fetcher running for this search engine.
// Now we have to check if the current count is enough to fetch all the
// searches or if it needs help.
// Let's hardcode a minimum of 100ms between every search job.
die(var_dump($searcherData));
}
if($needSearcher){
$this->dispatch(new Searcher($this->name)); $this->dispatch(new Searcher($this->name));
} }
} }
......
...@@ -111,19 +111,19 @@ return [ ...@@ -111,19 +111,19 @@ return [
'default' => [ 'default' => [
'host' => env('REDIS_HOST', 'localhost'), 'host' => env('REDIS_HOST', 'localhost'),
#'password' => env('REDIS_PASSWORD', null), 'password' => env('REDIS_PASSWORD', null),
'port' => env('REDIS_PORT', 6379), 'port' => env('REDIS_PORT', 6379),
'database' => 0, 'database' => 0,
], ],
'redisLogs' => [ 'redisLogs' => [
'host' => env('REDIS_LOGS_HOST', 'localhost'), 'host' => env('REDIS_LOGS_HOST', 'localhost'),
#'password' => env('REDIS_LOGS_PASSWORD', env('REDIS_PASSWORD', null)), 'password' => env('REDIS_LOGS_PASSWORD', env('REDIS_PASSWORD', null)),
'port' => env('REDIS_LOGS_PORT', 6379), 'port' => env('REDIS_LOGS_PORT', 6379),
'database' => 1, 'database' => 1,
], ],
'redisCache' => [ 'redisCache' => [
'host' => env('REDIS_CACHE_HOST', 'localhost'), 'host' => env('REDIS_CACHE_HOST', 'localhost'),
#'password' => env('REDIS_CACHE_PASSWORD', env('REDIS_PASSWORD', null)), 'password' => env('REDIS_CACHE_PASSWORD', env('REDIS_PASSWORD', null)),
'port' => env('REDIS_CACHE_PORT', 6379), 'port' => env('REDIS_CACHE_PORT', 6379),
'database' => 2, 'database' => 2,
], ],
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment