Commit 11f21266 authored by Dominik Hebeler's avatar Dominik Hebeler

Merge branch '1024-optimize-request-fetcher' into 'development'

Resolve "Optimize Request fetcher"

Closes #1024

See merge request !1704
parents c03f2077 8d841a33
......@@ -25,10 +25,9 @@ class RequestFetcher extends Command
protected $shouldRun = true;
protected $multicurl = null;
protected $oldMultiCurl = null;
protected $maxFetchedDocuments = 100000;
protected $fetchedDocuments = 0;
protected $proxyhost, $proxyuser, $proxypassword;
protected $proxyhost;
protected $proxyuser;
protected $proxypassword;
/**
* Create a new command instance.
......@@ -43,7 +42,6 @@ class RequestFetcher extends Command
$this->proxyport = env("PROXY_PORT", "");
$this->proxyuser = env("PROXY_USER", "");
$this->proxypassword = env("PROXY_PASSWORD", "");
}
/**
......@@ -53,7 +51,6 @@ class RequestFetcher extends Command
*/
public function handle()
{
$pidFile = "/tmp/fetcher";
pcntl_signal(SIGINT, [$this, "sig_handler"]);
pcntl_signal(SIGTERM, [$this, "sig_handler"]);
......@@ -80,61 +77,16 @@ class RequestFetcher extends Command
}
try {
$blocking = false;
while ($this->shouldRun) {
$status = curl_multi_exec($this->multicurl, $active);
$currentJobs = [];
if (!$blocking) {
$elements = Redis::pipeline(function ($redis) {
$redis->lrange(\App\MetaGer::FETCHQUEUE_KEY, 0, -1);
$redis->del(\App\MetaGer::FETCHQUEUE_KEY);
});
$currentJobs = $elements[0];
} else {
$currentJob = Redis::blpop(\App\MetaGer::FETCHQUEUE_KEY, 1);
if (!empty($currentJob)) {
$currentJobs[] = $currentJob[1];
}
}
if (sizeof($currentJobs) > 0) {
foreach ($currentJobs as $currentJob) {
$currentJob = json_decode($currentJob, true);
$ch = $this->getCurlHandle($currentJob);
if (curl_multi_add_handle($this->multicurl, $ch) !== 0) {
$this->shouldRun = false;
Log::error("Couldn't add Handle to multicurl");
break;
}
$this->fetchedDocuments++;
if ($this->fetchedDocuments > $this->maxFetchedDocuments) {
Log::info("Reinitializing Multicurl after " . $this->fetchedDocuments . " requests.");
$this->oldMultiCurl = $this->multicurl;
$this->multicurl = curl_multi_init();
$this->fetchedDocuments = 0;
}
$blocking = false;
$active = true;
}
}
$answerRead = $this->readMultiCurl($this->multicurl);
if ($this->oldMultiCurl != null) {
$this->readMultiCurl($this->oldMultiCurl);
$messagesLeft = -1;
if (curl_multi_info_read($this->oldMultiCurl, $messagesLeft) === false) {
if ($messagesLeft = 0) {
Log::debug("Removing finished multicurl handle");
curl_multi_close($this->oldMultiCurl);
$this->oldMultiCurl = null;
}
}
}
if (!$active && !$answerRead) {
$blocking = true;
} else {
usleep(50 * 1000);
$operationsRunning = true;
curl_multi_exec($this->multicurl, $operationsRunning);
$status = $this->readMultiCurl($this->multicurl);
$answersRead = $status[0];
$messagesLeft = $status[1];
$newJobs = $this->checkNewJobs($operationsRunning, $messagesLeft);
if ($newJobs === 0 && $answersRead === 0) {
usleep(10 * 1000);
}
}
} finally {
......@@ -143,21 +95,59 @@ class RequestFetcher extends Command
}
}
/**
* Checks the Redis queue if any new fetch jobs where submitted
* and adds them to multicurl if there are.
* Will be blocking call to redis if there are no running jobs in multicurl
*/
private function checkNewJobs($operationsRunning, $messagesLeft)
{
$newJobs = [];
if ($operationsRunning === 0 && $messagesLeft === -1) {
$newJob = Redis::blpop(\App\MetaGer::FETCHQUEUE_KEY, 1);
if (!empty($newJob)) {
$newJobs[] = $newJob[1];
}
} else {
$elements = Redis::pipeline(function ($redis) {
$redis->lrange(\App\MetaGer::FETCHQUEUE_KEY, 0, -1);
$redis->del(\App\MetaGer::FETCHQUEUE_KEY);
});
$newJobs = $elements[0];
}
$addedJobs = 0;
foreach ($newJobs as $newJob) {
$newJob = json_decode($newJob, true);
$ch = $this->getCurlHandle($newJob);
if (curl_multi_add_handle($this->multicurl, $ch) !== 0) {
$this->shouldRun = false;
Log::error("Couldn't add Handle to multicurl");
break;
} else {
$addedJobs++;
}
}
return $addedJobs;
}
private function readMultiCurl($mc)
{
$answerRead = false;
while (($info = curl_multi_info_read($mc)) !== false) {
$messagesLeft = -1;
$answersRead = 0;
while (($info = curl_multi_info_read($mc, $messagesLeft)) !== false) {
try {
$answerRead = true;
$answersRead++;
$infos = curl_getinfo($info["handle"], CURLINFO_PRIVATE);
$infos = explode(";", $infos);
$resulthash = $infos[0];
$cacheDurationMinutes = intval($infos[1]);
$name = $infos[2];
$responseCode = curl_getinfo($info["handle"], CURLINFO_HTTP_CODE);
$body = "";
$body = "no-result";
$totalTime = curl_getinfo($info["handle"], CURLINFO_TOTAL_TIME);
$totalTime = curl_getinfo($info["handle"], CURLINFO_TOTAL_TIME);
\App\PrometheusExporter::Duration($totalTime, $name);
$error = curl_error($info["handle"]);
......@@ -166,13 +156,14 @@ class RequestFetcher extends Command
}
if ($responseCode !== 200) {
Log::debug($resulthash);
Log::debug("Got responsecode " . $responseCode . " fetching \"" . curl_getinfo($info["handle"], CURLINFO_EFFECTIVE_URL) . "\n");
} else {
$body = \curl_multi_getcontent($info["handle"]);
}
Redis::pipeline(function ($pipe) use ($resulthash, $body, $cacheDurationMinutes) {
$pipe->set($resulthash, $body);
$pipe->lpush($resulthash, $body);
$pipe->expire($resulthash, 60);
});
......@@ -187,7 +178,7 @@ class RequestFetcher extends Command
\curl_multi_remove_handle($mc, $info["handle"]);
}
}
return $answerRead;
return [$answersRead, $messagesLeft];
}
private function getCurlHandle($job)
......@@ -234,7 +225,6 @@ class RequestFetcher extends Command
public function sig_handler($sig)
{
$this->shouldRun = false;
echo ("Terminating Process\n");
echo("Terminating Process\n");
}
}
This diff is collapsed.
......@@ -109,7 +109,8 @@ abstract class Searchengine
# Standardimplementierung der getNext Funktion, damit diese immer verwendet werden kann
public function getNext(MetaGer $metager, $result)
{}
{
}
# Prüft, ob die Suche bereits gecached ist, ansonsted wird sie als Job dispatched
public function startSearch(\App\MetaGer $metager, &$timings)
......@@ -197,10 +198,13 @@ abstract class Searchengine
if ($this->loaded) {
return true;
}
if (!$this->cached) {
$body = Redis::get($this->hash);
if (!$this->cached && empty($body)) {
$body = Redis::rpoplpush($this->hash, $this->hash);
if ($body === false) {
return $body;
}
}
if ($body === "no-result") {
$body = "";
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment