Commit abf7782c authored by Dominik Hebeler's avatar Dominik Hebeler
Browse files

Removed Searcher Job

parent 68d0815c
...@@ -41,4 +41,4 @@ CMD /etc/init.d/cron start && \ ...@@ -41,4 +41,4 @@ CMD /etc/init.d/cron start && \
/etc/init.d/redis-server start && \ /etc/init.d/redis-server start && \
chmod -R 0777 /html/storage && \ chmod -R 0777 /html/storage && \
chmod -R 0777 /html/bootstrap/cache && \ chmod -R 0777 /html/bootstrap/cache && \
php artisan worker:spawner php artisan requests:fetcher
...@@ -2,8 +2,10 @@ ...@@ -2,8 +2,10 @@
namespace App\Console\Commands; namespace App\Console\Commands;
use Cache;
use Illuminate\Console\Command; use Illuminate\Console\Command;
use Illuminate\Support\Facades\Redis; use Illuminate\Support\Facades\Redis;
use Log;
class WorkerSpawner extends Command class WorkerSpawner extends Command
{ {
...@@ -12,7 +14,7 @@ class WorkerSpawner extends Command ...@@ -12,7 +14,7 @@ class WorkerSpawner extends Command
* *
* @var string * @var string
*/ */
protected $signature = 'worker:spawner'; protected $signature = 'requests:fetcher';
/** /**
* The console command description. * The console command description.
...@@ -22,7 +24,8 @@ class WorkerSpawner extends Command ...@@ -22,7 +24,8 @@ class WorkerSpawner extends Command
protected $description = 'This command makes sure that enough worker processes are spawned'; protected $description = 'This command makes sure that enough worker processes are spawned';
protected $shouldRun = true; protected $shouldRun = true;
protected $processes = []; protected $multicurl = null;
protected $proxyhost, $proxyuser, $proxypassword;
/** /**
* Create a new command instance. * Create a new command instance.
...@@ -32,6 +35,12 @@ class WorkerSpawner extends Command ...@@ -32,6 +35,12 @@ class WorkerSpawner extends Command
public function __construct() public function __construct()
{ {
parent::__construct(); parent::__construct();
$this->multicurl = curl_multi_init();
$this->proxyhost = env("PROXY_HOST", "");
$this->proxyport = env("PROXY_PORT", "");
$this->proxyuser = env("PROXY_USER", "");
$this->proxypassword = env("PROXY_PASSWORD", "");
} }
/** /**
...@@ -47,100 +56,104 @@ class WorkerSpawner extends Command ...@@ -47,100 +56,104 @@ class WorkerSpawner extends Command
pcntl_signal(SIGHUP, [$this, "sig_handler"]); pcntl_signal(SIGHUP, [$this, "sig_handler"]);
try { try {
$counter = 0; $blocking = false;
while ($this->shouldRun) { while ($this->shouldRun) {
$counter++; $status = curl_multi_exec($this->multicurl, $active);
$counter = $counter % 10; $currentJob = null;
$length = Redis::llen("queues:default"); if (!$blocking) {
if ($length > 0) { $currentJob = Redis::lpop(\App\MetaGer::FETCHQUEUE_KEY);
while (true) { } else {
usleep(50 * 1000); $currentJob = Redis::blpop(\App\MetaGer::FETCHQUEUE_KEY, 10);
if (Redis::llen("queues:default") !== $length) { if (!empty($currentJob)) {
$length = Redis::llen("queues:default"); $currentJob = $currentJob[1];
} else {
break;
}
} }
$jobs = Redis::lrange("queues:default", 0, -1); }
$length = sizeof($jobs) + 5;
$ids = $this->getJobIds($jobs); if (!empty($currentJob)) {
for ($i = 0; $i <= $length; $i++) { $currentJob = json_decode($currentJob, true);
$this->processes[] = $this->spawnWorker(); $ch = $this->getCurlHandle($currentJob);
curl_multi_add_handle($this->multicurl, $ch);
$blocking = false;
$active = true;
}
$answerRead = false;
while (($info = curl_multi_info_read($this->multicurl)) !== false) {
$answerRead = true;
$infos = curl_getinfo($info["handle"], CURLINFO_PRIVATE);
$infos = explode(";", $infos);
$resulthash = $infos[0];
$cacheDuration = intval($infos[1]);
$responseCode = curl_getinfo($info["handle"], CURLINFO_HTTP_CODE);
$body = "";
$error = curl_error($info["handle"]);
if (!empty($error)) {
Log::error($error);
} }
while (sizeof($ids) > 0) {
$jobs = Redis::lrange("queues:default", 0, -1); if ($responseCode !== 200) {
$newIds = $this->getJobIds($jobs); Log::debug("Got responsecode " . $responseCode . " fetching \"" . curl_getinfo($info["handle"], CURLINFO_EFFECTIVE_URL) . "\n");
foreach ($ids as $index => $id) { } else {
foreach ($newIds as $newId) { $body = \curl_multi_getcontent($info["handle"]);
if ($id === $newId) {
continue 2;
}
}
unset($ids[$index]);
break;
}
} }
} else { Cache::put($resulthash, $body, now()->addMinutes($cacheDuration));
usleep(100 * 1000); // Sleep for 100ms \curl_multi_remove_handle($this->multicurl, $info["handle"]);
} }
if ($counter === 0) { if (!$active && !$answerRead) {
$newProcs = []; $blocking = true;
foreach ($this->processes as $process) {
$infos = proc_get_status($process["process"]);
if (!$infos["running"]) {
fclose($process["pipes"][1]);
proc_close($process["process"]);
} else {
$newProcs[] = $process;
}
}
$this->processes = $newProcs;
} }
} }
} finally { } finally {
foreach ($this->processes as $process) { curl_multi_close($this->multicurl);
fclose($process["pipes"][1]);
proc_close($process["process"]);
}
} }
} }
private function getJobIds($jobs) private function getCurlHandle($job)
{ {
$result = []; $ch = curl_init();
foreach ($jobs as $job) {
$result[] = json_decode($job, true)["id"]; curl_setopt_array($ch, array(
CURLOPT_URL => $job["url"],
CURLOPT_PRIVATE => $job["resulthash"] . ";" . $job["cacheDuration"],
CURLOPT_RETURNTRANSFER => 1,
CURLOPT_USERAGENT => "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:40.0) Gecko/20100101 Firefox/40.1",
CURLOPT_FOLLOWLOCATION => true,
CURLOPT_CONNECTTIMEOUT => 10,
CURLOPT_MAXCONNECTS => 500,
CURLOPT_LOW_SPEED_LIMIT => 500,
CURLOPT_LOW_SPEED_TIME => 5,
CURLOPT_TIMEOUT => 10,
));
if (!empty($this->proxyhost) && !empty($this->proxyport) && !empty($this->proxyuser) && !empty($this->proxypassword)) {
curl_setopt($ch, CURLOPT_PROXY, $this->proxyhost);
curl_setopt($ch, CURLOPT_PROXYUSERPWD, $this->proxyuser . ":" . $this->proxypassword);
curl_setopt($ch, CURLOPT_PROXYPORT, $this->proxyport);
curl_setopt($ch, CURLOPT_PROXYTYPE, CURLPROXY_SOCKS5);
}
if (!empty($job["username"]) && !empty($job["password"])) {
curl_setopt($ch, CURLOPT_USERPWD, $job["username"] . ":" . $job["password"]);
}
if (!empty($job["headers"])) {
$headers = [];
foreach ($job["headers"] as $key => $value) {
$headers[] = $key . ":" . $value;
}
# Headers are in the Form:
# <key>:<value>;<key>:<value>
curl_setopt($ch, CURLOPT_HTTPHEADER, $headers);
} }
return $result;
return $ch;
} }
private function sig_handler($sig) public function sig_handler($sig)
{ {
$this->shouldRun = false; $this->shouldRun = false;
echo ("Terminating Process\n"); echo ("Terminating Process\n");
} }
private function spawnWorker()
{
$descriptorspec = array(
0 => array("pipe", "r"), // STDIN ist eine Pipe, von der das Child liest
1 => array("pipe", "w"), // STDOUT ist eine Pipe, in die das Child schreibt
2 => array("file", "/tmp/worker-error.txt", "a"), // STDERR ist eine Datei,
// in die geschrieben wird
);
$cwd = getcwd();
$env = array();
$process = proc_open('php artisan queue:work --stop-when-empty --sleep=1', $descriptorspec, $pipes, $cwd, $env);
if (is_resource($process)) {
fclose($pipes[0]);
\stream_set_blocking($pipes[1], 0);
return [
"process" => $process,
"pipes" => $pipes,
"working" => false,
];
}
}
} }
...@@ -14,6 +14,7 @@ class MetaGerSearch extends Controller ...@@ -14,6 +14,7 @@ class MetaGerSearch extends Controller
{ {
public function search(Request $request, MetaGer $metager) public function search(Request $request, MetaGer $metager)
{ {
$time = microtime(true);
$spamEntries = []; $spamEntries = [];
if (file_exists(config_path('spam.txt'))) { if (file_exists(config_path('spam.txt'))) {
$spamEntries = file(config_path('spam.txt')); $spamEntries = file(config_path('spam.txt'));
......
<?php
namespace App\Jobs;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Facades\Redis;
class Searcher implements ShouldQueue
{
use InteractsWithQueue, Queueable, SerializesModels;
public $tries = 1;
/**
* The number of seconds the job can run before timing out.
*
* @var int
*/
public $timeout = 300;
protected $name, $ch, $pid, $counter, $lastTime, $connectionInfo, $user, $password, $headers;
protected $proxyhost, $proxyuser, $proxypassword;
# Each Searcher will shutdown after a specified time(s) or number of requests
protected $MAX_REQUESTS = 100;
# This value should always be below the retry_after value in config/queue.php
protected $MAX_TIME = 240;
protected $startTime = null;
protected $importantEngines = array("Fastbot", "overture", "overtureAds");
protected $recheck;
/**
* Create a new job instance.
* This is our new Worker/Searcher Class
* It will take it's name from the sumas.xml as constructor argument
* Each Searcher is dedicated to one remote server from our supported Searchengines
* It will listen to a queue in the Redis Database within the handle() method and
* answer requests to this specific search engine.
* The curl handle will be left initialized and opened so that we can make full use of
* keep-alive requests.
* @return void
*/
public function __construct($name, $user = null, $password = null, $headers = null)
{
$this->name = $name;
$this->pid = getmypid();
$this->recheck = false;
$this->startTime = microtime(true);
$this->user = $user;
$this->password = $password;
$this->headers = $headers;
$this->proxyhost = env("PROXY_HOST", "");
$this->proxyport = env("PROXY_PORT", "");
$this->proxyuser = env("PROXY_USER", "");
$this->proxypassword = env("PROXY_PASSWORD", "");
// Submit this worker to the Redis System
Redis::expire($this->name, 5);
}
/**
* Execute the job.
*
* @return void
*/
public function handle()
{
// This Searches is freshly called so we need to initialize the curl handle $ch
$this->ch = $this->initCurlHandle();
try {
$this->counter = 0; // Counts the number of answered jobs
$time = microtime(true);
while (true) {
// Update the expire
Redis::expire($this->name, 5);
Redis::expire($this->name . ".stats", 5);
// One Searcher can handle a ton of requests to the same server
// Each search to the server of this Searcher will be submitted to a queue
// stored in redis which has the same name as this searchengine appended by a ".queue"
// We will perform a blocking pop on this queue so the queue can remain empty for a while
// without killing this searcher directly.
$mission = Redis::blpop($this->name . ".queue", 4);
$this->counter++;
$this->updateStats(microtime(true) - $time);
$this->switchToRunning();
// The mission can be empty when blpop hit the timeout
if (!empty($mission)) {
$mission = $mission[1];
$poptime = microtime(true) - $time;
// The mission is a String which can be divided to retrieve three informations:
// 1. The Hash Value where the result should be stored
// 2. The Url to Retrieve
// 3. The maximum time to take
// These three informations are divided by a ";" in the mission string
$mission = explode(";", $mission);
$hashValue = $mission[0]; // The hash value for redis to store the results under
$url = base64_decode($mission[1]); // The url to fetch
$timeout = $mission[2]; // Timeout from the MetaGer process in ms
$medianFetchTime = $this->getFetchTime(); // The median Fetch time of the search engine in ms
Redis::hset('search.' . $hashValue . ".results." . $this->name, "status", "connected");
$result = $this->retrieveUrl($url);
$this->storeResult($result, $poptime, $hashValue);
// Reset the time of the last Job so we can calculate
// the time we have spend waiting for a new job
// We submit that calculation to the Redis systemin the method
$time = microtime(true);
}
// In sync mode every Searcher may only retrieve one result because it would block
// the execution of the remaining code otherwise:
if (getenv("QUEUE_CONNECTION") === "sync"
|| $this->counter > $this->MAX_REQUESTS
|| (microtime(true) - $this->startTime) > $this->MAX_TIME) {
break;
}
}
} finally {
// When we reach this point, time has come for this Searcher to retire
$this->shutdown();
}
}
private function switchToRunning()
{
/**
* When a Searcher is initially started the redis value for $this->name is set to "locked"
* which effectively will prevent new Searchers of this type to be started. (Value is checked by the MetaGer process which starts the Searchers)
* This is done so the MetaGer processes won't start hundreds of Searchers parallely when under high work load.
* It will force that Searchers can only be started one after the other.
* When a new Searcher has served a minimum of three requests we have enough data to decide whether we need even more Searchers.
* To do so we will then set the redis value for $this->name to "running".
* There is a case where we don't want new Searchers to be started even if we would need to do so to serve every Request:
* When a search engine needs more time to produce search results than the timeout of the MetaGer process, we won't even bother of spawning
* more and more Searchers because they would just block free worker processes from serving the important engines which will give results in time.
**/
if ($this->counter === 3 || getenv("QUEUE_CONNECTION") === "sync") {
# If the MetaGer process waits longer for the results than this Fetcher will probably need to fetch
# Or if this engine is in the array of important engines which we will always try to serve
Redis::set($this->name, "running");
$this->recheck = false;
}
}
private function updateStats($poptime)
{
if ($this->connectionInfo !== null) {
$connectionInfo = base64_encode(json_encode($this->connectionInfo));
Redis::hset($this->name . ".stats", $this->pid, $connectionInfo . ";" . $poptime);
}
}
private function getFetchTime()
{
$vals = Redis::hgetall($this->name . ".stats");
if (sizeof($vals) === 0) {
return 0;
} else {
$totalTime = 0;
foreach ($vals as $pid => $value) {
$time = floatval(json_decode(base64_decode(explode(";", $value)[0]), true)["total_time"]);
$time *= 1000; // Transform from seconds to milliseconds
$totalTime += $time;
}
$totalTime /= sizeof($vals);
return $totalTime;
}
}
private function retrieveUrl($url)
{
// Set this URL to the Curl handle
curl_setopt($this->ch, CURLOPT_URL, $url);
$result = curl_exec($this->ch);
$this->connectionInfo = curl_getinfo($this->ch);
return $result;
}
private function storeResult($result, $poptime, $hashValue)
{
$redis = Redis::connection(env('REDIS_RESULT_CONNECTION'));
$pipeline = $redis->pipeline();
$pipeline->hset('search.' . $hashValue . ".results." . $this->name, "response", $result);
$pipeline->hset('search.' . $hashValue . ".results." . $this->name, "delivered", "0");
$pipeline->hincrby('search.' . $hashValue . ".results.status", "engineAnswered", 1);
// After 60 seconds the results should be read by the MetaGer Process and stored in the Cache instead
$pipeline->expire('search.' . $hashValue . ".results." . $this->name, env('REDIS_RESULT_CACHE_DURATION'));
$pipeline->rpush('search.' . $hashValue . ".ready", $this->name);
$pipeline->expire('search.' . $hashValue . ".ready", env('REDIS_RESULT_CACHE_DURATION'));
$pipeline->sadd('search.' . $hashValue . ".engines", $this->name);
$pipeline->expire('search.' . $hashValue . ".engines", env('REDIS_RESULT_CACHE_DURATION'));
$pipeline->execute();
$this->lastTime = microtime(true);
}
private function shutdown()
{
Redis::hdel($this->name . ".stats", $this->pid);
if (sizeof(Redis::hgetall($this->name . ".stats")) === 0) {
Redis::del($this->name);
}
// We should close our curl handle before we do so
curl_close($this->ch);
}
private function initCurlHandle()
{
$ch = curl_init();
curl_setopt_array($ch, array(
CURLOPT_RETURNTRANSFER => 1,
CURLOPT_USERAGENT => "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:40.0) Gecko/20100101 Firefox/40.1",
CURLOPT_FOLLOWLOCATION => true,
CURLOPT_CONNECTTIMEOUT => 10,
CURLOPT_MAXCONNECTS => 500,
CURLOPT_LOW_SPEED_LIMIT => 500,
CURLOPT_LOW_SPEED_TIME => 5,
CURLOPT_TIMEOUT => 10,
));
if (!empty($this->proxyhost) && !empty($this->proxyport) && !empty($this->proxyuser) && !empty($this->proxypassword)) {
curl_setopt(CURLOPT_PROXY, $this->proxyhost);
curl_setopt(CURLOPT_PROXYUSERPWD, $this->proxyuser . ":" . $this->proxypassword);
curl_setopt(CURLOPT_PROXYPORT, $this->proxyport);
curl_setopt(CURLOPT_PROXYTYPE, CURLPROXY_SOCKS5);
}
if ($this->user !== null && $this->password !== null) {
curl_setopt($ch, CURLOPT_USERPWD, $this->user . ":" . $this->password);
}
if ($this->headers !== null) {
$headers = [];
foreach ($this->headers as $key => $value) {
$headers[] = $key . ":" . $value;
}
# Headers are in the Form:
# <key>:<value>;<key>:<value>
curl_setopt($ch, CURLOPT_HTTPHEADER, $headers);
}
return $ch;
}
}
...@@ -6,7 +6,6 @@ use App; ...@@ -6,7 +6,6 @@ use App;
use Cache; use Cache;
use Carbon; use Carbon;
use Illuminate\Http\Request; use Illuminate\Http\Request;
use Illuminate\Support\Facades\Redis;
use Jenssegers\Agent\Agent; use Jenssegers\Agent\Agent;
use LaravelLocalization; use LaravelLocalization;
use Log; use Log;
...@@ -14,6 +13,8 @@ use Predis\Connection\ConnectionException; ...@@ -14,6 +13,8 @@ use Predis\Connection\ConnectionException;
class MetaGer class MetaGer
{ {
const FETCHQUEUE_KEY = "fetcher.queue";
# Einstellungen für die Suche # Einstellungen für die Suche
public $alteredQuery = ""; public $alteredQuery = "";
public $alterationOverrideQuery = ""; public $alterationOverrideQuery = "";
...@@ -780,13 +781,12 @@ class MetaGer ...@@ -780,13 +781,12 @@ class MetaGer
public function waitForMainResults() public function waitForMainResults()
{ {
$redis = Redis::connection(env('REDIS_RESULT_CONNECTION'));
$engines = $this->engines; $engines = $this->engines;
$enginesToWaitFor = []; $enginesToWaitFor = [];
$mainEngines = $this->sumaFile->foki->{$this->fokus}->main; $mainEngines = $this->sumaFile->foki->{$this->fokus}->main;
foreach ($mainEngines as $mainEngine) { foreach ($mainEngines as $mainEngine) {
foreach ($engines as $engine) { foreach ($engines as $engine) {
if (!$engine->cached && $engine->name === $mainEngine) { if ($engine->name === $mainEngine) {
$enginesToWaitFor[] = $engine; $enginesToWaitFor[] = $engine;
} }
} }
...@@ -803,41 +803,38 @@ class MetaGer ...@@ -803,41 +803,38 @@ class MetaGer
} }
while (sizeof($enginesToWaitFor) > 0 || ($forceTimeout !== null && (microtime(true) - $timeStart) < $forceTimeout)) { while (sizeof($enginesToWaitFor) > 0 || ($forceTimeout !== null && (microtime(true) - $timeStart) < $forceTimeout)) {
$newEngine = $redis->blpop($this->redisResultWaitingKey, 1); Log::info(sizeof($enginesToWaitFor) . " " . sizeof($answered) . " " . $enginesToWaitFor[0]->hash);
if ($newEngine === null || sizeof($newEngine) !== 2) { foreach ($enginesToWaitFor as $index => $engine) {
continue; if (Cache::has($engine->hash)) {
} else { $answered[] = $engine;
$newEngine = $newEngine[1]; unset($enginesToWaitFor[$index]);
foreach ($enginesToWaitFor as $index => $engine) { break;
if ($engine->name === $newEngine) {
unset($enginesToWaitFor[$index]);
break;
}
} }
$answered[] = $newEngine;
} }
if ((microtime(true) - $timeStart) >= 2) { if ((microtime(true) - $timeStart) >= 2) {
break; break;
} else {
usleep(50 * 1000);
} }
} }
# Now we can add an entry to Redis which defines the starting time and how many engines should answer this request # Now we can add an entry to Redis which defines the starting time and how many engines should answer this request