Commit 9466d8e0 authored by Dominik Hebeler's avatar Dominik Hebeler

Erster Versuch für eine neue Fetcher Logik

parent a0455023
<?php
namespace App\Jobs;
use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\DispatchesJobs;
use Illuminate\Support\Facades\Redis;
use Log;
class Searcher implements ShouldQueue
{
use InteractsWithQueue, Queueable, SerializesModels, DispatchesJobs;
protected $name, $ch;
protected $MAX_REQUESTS = 500;
/**
* Create a new job instance.
* This is our new Worker/Searcher Class
* It will take it's name from the sumas.xml as constructor argument
* Each Searcher is dedicated to one remote server from our supported Searchengines
* It will listen to a queue in the Redis Database within the handle() method and
* answer requests to this specific search engine.
* The curl handle will be left initialized and opened so that we can make full use of
* keep-alive requests.
* @return void
*/
public function __construct($name)
{
$this->name = $name;
// Submit this worker to the Redis System
Redis::set($this->name, "running");
Redis::expire($this->name, 5);
}
/**
* Execute the job.
*
* @return void
*/
public function handle()
{
// This Searches is freshly called so we need to initialize the curl handle $ch
$this->ch = $this->initCurlHandle();
$this->counter = 0; // Counts the number of answered jobs
while(true){
// Update the expire
Redis::expire($this->name, 5);
// One Searcher can handle a ton of requests to the same server
// Each search to the server of this Searcher will be submitted to a queue
// stored in redis which has the same name as this searchengine appended by a ".queue"
// We will perform a blocking pop on this queue so the queue can remain empty for a while
// without killing this searcher directly.
$mission = Redis::blpop($this->name . ".queue", 4);
// The mission can be empty when blpop hit the timeout
if(empty($mission)){
// In that case it should be safe to simply exit this job
break;
}else{
$mission = $mission[1];
$this->counter++;
// A running Searcher checks whether more of it are needed to properly work on the
// Queue without delay
if(getenv("QUEUE_DRIVER") !== "sync" && intval(Redis::llen($this->name . ".queue")) > 1){
$this->dispatch(new Searcher($this->name));
}
}
// The mission is a String which can be divided to retrieve two informations:
// 1. The Hash Value where the result should be stored
// 2. The Url to Retrieve
// These two informations are divided by a ";" in the mission string
$hashValue = substr($mission, 0, strpos($mission, ";"));
$url = substr($mission, strpos($mission, ";") + 1);
Redis::hset('search.' . $hashValue, $this->name, "connected");
$result = $this->retrieveUrl($url);
$this->storeResult($result, $hashValue);
// In sync mode every Searcher may only retrieve one result because it would block
// the execution of the remaining code otherwise:
if(getenv("QUEUE_DRIVER") === "sync" || $this->counter > $this->MAX_REQUESTS){
if(getenv("QUEUE_DRIVER") === "sync") Redis::del($this->name);
break;
}
}
// When we reach this point, time has come for this Searcher to retire
// We should close our curl handle before we do so
curl_close($this->ch);
}
private function retrieveUrl($url){
// Set this URL to the Curl handle
curl_setopt($this->ch, CURLOPT_URL, $url);
$result = curl_exec($this->ch);
return $result;
}
private function storeResult($result, $hashValue){
Redis::hset('search.' . $hashValue, $this->name, $result);
}
private function initCurlHandle(){
$ch = curl_init();
curl_setopt_array($ch, array(
CURLOPT_RETURNTRANSFER => 1,
CURLOPT_USERAGENT => "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:40.0) Gecko/20100101 Firefox/40.1",
CURLOPT_FOLLOWLOCATION => TRUE,
CURLOPT_CONNECTTIMEOUT => 10,
CURLOPT_MAXCONNECTS => 500,
CURLOPT_LOW_SPEED_LIMIT => 500,
CURLOPT_LOW_SPEED_TIME => 5,
CURLOPT_TIMEOUT => 10
));
return $ch;
}
}
......@@ -813,39 +813,41 @@ class MetaGer
public function waitForResults($enginesToLoad, $overtureEnabled, $canBreak)
{
$loadedEngines = 0;
$timeStart = microtime(true);
# Auf wie viele Suchmaschinen warten wir?
$engineCount = count($enginesToLoad);
$timeStart = microtime(true);
$results = null;
while (true) {
$time = (microtime(true) - $timeStart) * 1000;
$loadedEngines = intval(Redis::hlen('search.' . $this->getHashCode()));
if ($overtureEnabled && (Redis::hexists('search.' . $this->getHashCode(), 'overture') || Redis::hexists('search.' . $this->getHashCode(), 'overtureAds'))) {
$canBreak = true;
}
# Abbruchbedingung
if ($time < 500) {
if (($engineCount === 0 || $loadedEngines >= $engineCount) && $canBreak) {
break;
$results = Redis::hgetall('search.' . $this->getHashCode());
$ready = true;
// When every
$connected = true;
foreach($results as $key => $value){
if($value === "waiting" || $value === "connected"){
$ready = false;
}
} elseif ($time >= 500 && $time < $this->time) {
if (($engineCount === 0 || ($loadedEngines / ($engineCount * 1.0)) >= 0.8) && $canBreak) {
break;
if($value === "waiting"){
$connected = false;
}
}
} else {
// If $ready is false at this point, we're waiting for more searchengines
// But we have to check for the timeout, too
if(!$connected) $timeStart = microtime(true);
$time = (microtime(true) - $timeStart) * 1000;
// We will apply the timeout only if it's not Yahoo we're waiting for since they are one the most
// important search engines.
$canTimeout = !((isset($results["overture"]) && $results["overture"] === "waiting") || (isset($results["overtureAds"]) && $results["overtureAds"] === "waiting"));
if($time > $this->time && $canTimeout) $ready = true;
if($ready){
break;
}
usleep(50000);
}
# Wir haben nun so lange wie möglich gewartet. Wir registrieren nun noch die Suchmaschinen, die geanwortet haben.
$answered = Redis::hgetall('search.' . $this->getHashCode());
foreach ($answered as $key => $value) {
foreach ($results as $key => $value) {
$enginesToLoad[$key] = true;
}
$this->enginesToLoad = $enginesToLoad;
......@@ -921,7 +923,7 @@ class MetaGer
# Category
$this->category = $request->input('category', '');
# Request Times
$this->time = $request->input('time', 1000);
$this->time = $request->input('time', 1500);
# Page
$this->page = 1;
# Lang
......
......@@ -2,7 +2,7 @@
namespace App\Models;
use App\Jobs\Search;
use App\Jobs\Searcher;
use App\MetaGer;
use Cache;
use Illuminate\Foundation\Bus\DispatchesJobs;
......@@ -105,17 +105,41 @@ abstract class Searchengine
# Prüft, ob die Suche bereits gecached ist, ansonsted wird sie als Job dispatched
public function startSearch(\App\MetaGer $metager)
{
if ($this->canCache && Cache::has($this->hash)) {
if ($this->canCache && Cache::has($this->hash) && 0 == 1) {
$this->cached = true;
$this->retrieveResults($metager);
} else {
/* Die Anfragen an die Suchmaschinen werden nun von der Laravel-Queue bearbeitet:
* Hinweis: solange in der .env der QUEUE_DRIVER auf "sync" gestellt ist, werden die Abfragen
* nacheinander abgeschickt.
* Sollen diese Parallel verarbeitet werden, muss ein anderer QUEUE_DRIVER verwendet werden.
* siehe auch: https://laravel.com/docs/5.2/queues
*/
$this->dispatch(new Search($this->resultHash, $this->host, $this->port, $this->name, $this->getString, $this->useragent, $this->additionalHeaders));
// We will push the confirmation of the submission to the Result Hash
Redis::hset('search.' . $this->resultHash, $this->name, "waiting");
// We need to submit a action that one of our workers can understand
// The missions are submitted to a redis queue in the following string format
// <ResultHash>;<URL to fetch>
// With <ResultHash> being the Hash Value where the fetcher will store the result.
// and <URL to fetch> being the full URL to the searchengine
$url = "";
if($this->port === "443"){
$url = "https://";
}else{
$url = "http://";
}
$url .= $this->host . $this->getString;
$mission = $this->resultHash . ";" . $url;
// Submit this mission to the corresponding Redis Queue
// Since each Searcher is dedicated to one specific search engine
// each Searcher has it's own queue lying under the redis key <name>.queue
Redis::rpush($this->name . ".queue", $mission);
// If there is no Searcher process for this engine running at this time, we start one
if(Redis::get($this->name) === NULL){
Log::info("Starting Searcher");
/* Die Anfragen an die Suchmaschinen werden nun von der Laravel-Queue bearbeitet:
* Hinweis: solange in der .env der QUEUE_DRIVER auf "sync" gestellt ist, werden die Abfragen
* nacheinander abgeschickt.
* Sollen diese Parallel verarbeitet werden, muss ein anderer QUEUE_DRIVER verwendet werden.
* siehe auch: https://laravel.com/docs/5.2/queues
*/
$this->dispatch(new Searcher($this->name));
}
}
}
......@@ -172,11 +196,11 @@ abstract class Searchengine
}
$body = "";
if ($this->canCache && $this->cacheDuration > 0 && Cache::has($this->hash)) {
if ($this->canCache && $this->cacheDuration > 0 && Cache::has($this->hash) && 0 === 1) {
$body = Cache::get($this->hash);
} elseif (Redis::hexists('search.' . $this->resultHash, $this->name)) {
$body = Redis::hget('search.' . $this->resultHash, $this->name);
if ($this->canCache && $this->cacheDuration > 0) {
if ($this->canCache && $this->cacheDuration > 0 && 0 === 1) {
Cache::put($this->hash, $body, $this->cacheDuration);
}
......
......@@ -111,19 +111,19 @@ return [
'default' => [
'host' => env('REDIS_HOST', 'localhost'),
'password' => env('REDIS_PASSWORD', null),
#'password' => env('REDIS_PASSWORD', null),
'port' => env('REDIS_PORT', 6379),
'database' => 0,
],
'redisLogs' => [
'host' => env('REDIS_LOGS_HOST', 'localhost'),
'password' => env('REDIS_LOGS_PASSWORD', env('REDIS_PASSWORD', null)),
#'password' => env('REDIS_LOGS_PASSWORD', env('REDIS_PASSWORD', null)),
'port' => env('REDIS_LOGS_PORT', 6379),
'database' => 1,
],
'redisCache' => [
'host' => env('REDIS_CACHE_HOST', 'localhost'),
'password' => env('REDIS_CACHE_PASSWORD', env('REDIS_PASSWORD', null)),
#'password' => env('REDIS_CACHE_PASSWORD', env('REDIS_PASSWORD', null)),
'port' => env('REDIS_CACHE_PORT', 6379),
'database' => 2,
],
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment