From 77a3e52e0950f02cfbfab8b3549974a5fd4cc5cc Mon Sep 17 00:00:00 2001
From: Dominik Pfennig <>
Date: Wed, 10 May 2017 08:46:38 +0200
Subject: [PATCH] =?UTF-8?q?Fetcher=20Logik=20ver=C3=A4ndert?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

laufende Fetcher beenden sich nun nach einer gewissen Zeit.
 app/Http/Controllers/AdminInterface.php |  5 +-
 app/Jobs/Searcher.php                   | 91 +++++++++++++++++--------
 app/Models/Searchengine.php             |  3 +-
 resources/views/admin/admin.blade.php   |  2 +
 4 files changed, 72 insertions(+), 29 deletions(-)

diff --git a/app/Http/Controllers/AdminInterface.php b/app/Http/Controllers/AdminInterface.php
index fc25267e2..17503273e 100644
--- a/app/Http/Controllers/AdminInterface.php
+++ b/app/Http/Controllers/AdminInterface.php
@@ -42,9 +42,11 @@ class AdminInterface extends Controller
         // So now we can generate Median Times for every Fetcher
+        $fetcherCount = 0;
         foreach($stati as $engineName => $engineStats){
             $connection = array();
             $poptime = 0;
+            $fetcherCount += sizeof($engineStats["fetcher"]);
             foreach($engineStats["fetcher"] as $pid => $stats){
                 foreach($stats["connection"] as $key => $value){
@@ -66,7 +68,8 @@ class AdminInterface extends Controller
         return view('admin.admin')
             ->with('title', 'Fetcher Status')
-            ->with('stati', $stati);
+            ->with('stati', $stati)
+            ->with('fetcherCount', $fetcherCount);
         $stati = json_encode($stati);
         $response = Response::make($stati, 200);
         $response->header("Content-Type", "application/json");
diff --git a/app/Jobs/Searcher.php b/app/Jobs/Searcher.php
index 194a07536..abee4e0bd 100644
--- a/app/Jobs/Searcher.php
+++ b/app/Jobs/Searcher.php
@@ -15,7 +15,9 @@ class Searcher implements ShouldQueue
     use InteractsWithQueue, Queueable, SerializesModels;
     protected $name, $ch, $pid, $counter, $lastTime;
-    protected $MAX_REQUESTS = 500;
+    protected $MAX_REQUESTS = 100;
+    protected $importantEngines = array("Fastbot", "overture", "overtureAds");
+    protected $recheck;
      * Create a new job instance.
@@ -32,6 +34,7 @@ class Searcher implements ShouldQueue
         $this->name = $name;
         $this->pid = getmypid();
+        $this->recheck = false;
         // Submit this worker to the Redis System
         Redis::expire($this->name, 5);
@@ -57,55 +60,89 @@ class Searcher implements ShouldQueue
             // We will perform a blocking pop on this queue so the queue can remain empty for a while 
             // without killing this searcher directly.
             $mission = Redis::blpop($this->name . ".queue", 4);
+            $this->counter++;
             // The mission can be empty when blpop hit the timeout
                 $mission = $mission[1];
-                $this->counter++;#
                 $poptime = microtime(true) - $time;
-            }
-            // The mission is a String which can be divided to retrieve two informations:
-            // 1. The Hash Value where the result should be stored
-            // 2. The Url to Retrieve
-            // These two informations are divided by a ";" in the mission string
-            $hashValue = substr($mission, 0, strpos($mission, ";"));
-            $url = substr($mission, strpos($mission, ";") + 1);
-            Redis::hset('search.' . $hashValue, $this->name, "connected");
-            $result = $this->retrieveUrl($url);
-            $this->storeResult($result, $poptime, $hashValue);
-            if($this->counter === 3){
-                Redis::set($this->name, "running");
+                // The mission is a String which can be divided to retrieve two informations:
+                // 1. The Hash Value where the result should be stored
+                // 2. The Url to Retrieve
+                // These two informations are divided by a ";" in the mission string
+                $mission = explode(";", $mission);
+                $hashValue = $mission[0];
+                $url = base64_decode($mission[1]);
+                $timeout = $mission[2]; // Timeout from the MetaGer process in ms
+                $medianFetchTime = $this->getFetchTime();   // The median Fetch time of the search engine in ms
+                Redis::hset('search.' . $hashValue, $this->name, "connected");
+                $result = $this->retrieveUrl($url);
+                $this->storeResult($result, $poptime, $hashValue);
+                /**
+                * When a Searcher is initially started the redis value for $this->name is set to "locked"
+                * which effectively will prevent new Searchers of this type to be started. (Value is checked by the MetaGer process which starts the Searchers)
+                * This is done so the MetaGer processes won't start hundreds of Searchers parallely when under high work load.
+                * It will force that Searchers can only be started one after the other.
+                * When a new Searcher has served a minimum of three requests we have enough data to decide whether we need even more Searchers.
+                * To do so we will then set the redis value for $this->name to "running".
+                * There is a case where we don't want new Searchers to be started even if we would need to do so to serve every Request:
+                *   When a search engine needs more time to produce search results than the timeout of the MetaGer process, we won't even bother of spawning
+                *   more and more Searchers because they would just block free worker processes from serving the important engines which will give results in time.
+                **/
+                if($this->counter === 3 || $this->recheck){
+                    # If the MetaGer process waits longer for the results than this Fetcher will probably need to fetch
+                    # Or if this engine is in the array of important engines which we will always try to serve
+                    if($timeout >= $medianFetchTime || in_array($this->name, $this->importantEngines)){
+                        Redis::set($this->name, "running");
+                        $this->recheck = false;
+                    }else{
+                        $this->recheck = true;
+                    }
+                }
+                // Reset the time of the last Job so we can calculate
+                // the time we have spend waiting for a new job
+                // We submit that calculation to the Redis systemin the method
+                $time = microtime(true);
-            // Reset the time of the last Job so we can calculate
-            // the time we have spend waiting for a new job
-            // We submit that calculation to the Redis systemin the method
-            // storeResult()
-            $time = microtime(true);
             // In sync mode every Searcher may only retrieve one result because it would block
             // the execution of the remaining code otherwise:
             if(getenv("QUEUE_DRIVER") === "sync" || $this->counter > $this->MAX_REQUESTS){
-                break;
+               break;
         // When we reach this point, time has come for this Searcher to retire
+    private function getFetchTime(){
+        $vals = Redis::hgetall($this->name . ".stats");
+        if(sizeof($vals) === 0){
+            return 0;
+        }else{
+            $totalTime = 0;
+            foreach ($vals as $pid => $value) {
+                $time = floatval(json_decode(base64_decode(explode(";", $value)[0]), true)["total_time"]);
+                $time *= 1000;  // Transform from seconds to milliseconds
+                $totalTime += $time;
+            }
+            $totalTime /= sizeof($vals);
+            die(var_dump($totalTime));
+            return $totalTime;
+        }
+    }
     private function retrieveUrl($url){
         // Set this URL to the Curl handle
         curl_setopt($this->ch, CURLOPT_URL, $url);
         $result = curl_exec($this->ch);
         return $result;
diff --git a/app/Models/Searchengine.php b/app/Models/Searchengine.php
index 070ee3aaf..036b0fedb 100644
--- a/app/Models/Searchengine.php
+++ b/app/Models/Searchengine.php
@@ -123,7 +123,8 @@ abstract class Searchengine
                 $url = "http://";
             $url .= $this->host . $this->getString;
-            $mission = $this->resultHash . ";" . $url;
+            $url = base64_encode($url);
+            $mission = $this->resultHash . ";" . $url . ";" . $metager->getTime();
             // Submit this mission to the corresponding Redis Queue
             // Since each Searcher is dedicated to one specific search engine
             // each Searcher has it's own queue lying under the redis key <name>.queue
diff --git a/resources/views/admin/admin.blade.php b/resources/views/admin/admin.blade.php
index 7e4cc3eef..9e21b7452 100644
--- a/resources/views/admin/admin.blade.php
+++ b/resources/views/admin/admin.blade.php
@@ -3,6 +3,8 @@
 @section('title', $title )
+<p>Es laufen insgesamt <code>{{$fetcherCount}}/50</code> Fetcher.</p>
 <table class="table table-bordered">