Searcher.php 5.17 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
<?php

namespace App\Jobs;

use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\DispatchesJobs;
use Illuminate\Support\Facades\Redis;
use Log;

class Searcher implements ShouldQueue
{
15
    use InteractsWithQueue, Queueable, SerializesModels;
16

17
    protected $name, $ch, $pid, $counter, $lastTime;
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
    protected $MAX_REQUESTS = 500;

    /**
     * Create a new job instance.
     * This is our new Worker/Searcher Class
     * It will take it's name from the sumas.xml as constructor argument
     * Each Searcher is dedicated to one remote server from our supported Searchengines
     * It will listen to a queue in the Redis Database within the handle() method and
     * answer requests to this specific search engine.
     * The curl handle will be left initialized and opened so that we can make full use of
     * keep-alive requests.
     * @return void
     */
    public function __construct($name)
    {
        $this->name = $name;
34
        $this->pid = getmypid();
35
36
37
38
39
40
41
42
43
44
45
46
47
48
        // Submit this worker to the Redis System
        Redis::expire($this->name, 5);
    }

    /**
     * Execute the job.
     *
     * @return void
     */
    public function handle()
    {
        // This Searches is freshly called so we need to initialize the curl handle $ch
        $this->ch = $this->initCurlHandle();
        $this->counter = 0;                 // Counts the number of answered jobs
49
        $time = microtime(true);
50
51
52
        while(true){
            // Update the expire
            Redis::expire($this->name, 5);
Dominik Hebeler's avatar
Dominik Hebeler committed
53
            Redis::expire($this->name . ".stats", 5);
54
55
56
57
58
59
60
61
62
            // One Searcher can handle a ton of requests to the same server
            // Each search to the server of this Searcher will be submitted to a queue
            // stored in redis which has the same name as this searchengine appended by a ".queue"
            // We will perform a blocking pop on this queue so the queue can remain empty for a while 
            // without killing this searcher directly.
            $mission = Redis::blpop($this->name . ".queue", 4);

            // The mission can be empty when blpop hit the timeout
            if(empty($mission)){
Dominik Hebeler's avatar
Dominik Hebeler committed
63
                break;
64
65
            }else{
                $mission = $mission[1];
66
67
                $this->counter++;#
                $poptime = microtime(true) - $time;
68
69
70
71
72
73
74
75
76
77
78
79
80
            }

            // The mission is a String which can be divided to retrieve two informations:
            // 1. The Hash Value where the result should be stored
            // 2. The Url to Retrieve
            // These two informations are divided by a ";" in the mission string
            $hashValue = substr($mission, 0, strpos($mission, ";"));
            $url = substr($mission, strpos($mission, ";") + 1);

            Redis::hset('search.' . $hashValue, $this->name, "connected");

            $result = $this->retrieveUrl($url);

81
            $this->storeResult($result, $poptime, $hashValue);
82

Dominik Hebeler's avatar
Dominik Hebeler committed
83
84
85
86
87
88
89
90
91
92
            if($this->counter === 3){
                Redis::set($this->name, "running");
            }

            // Reset the time of the last Job so we can calculate
            // the time we have spend waiting for a new job
            // We submit that calculation to the Redis systemin the method
            // storeResult()
            $time = microtime(true);

93
94
95
96
97
98
99
            // In sync mode every Searcher may only retrieve one result because it would block
            // the execution of the remaining code otherwise:
            if(getenv("QUEUE_DRIVER") === "sync" || $this->counter > $this->MAX_REQUESTS){
                break;
            } 
        }
        // When we reach this point, time has come for this Searcher to retire
Dominik Hebeler's avatar
Bugfix    
Dominik Hebeler committed
100
        $this->shutdown();
101
102
103
104
105
106
107
108
109
110
111
    }

    private function retrieveUrl($url){
        // Set this URL to the Curl handle
        curl_setopt($this->ch, CURLOPT_URL, $url);

        $result = curl_exec($this->ch);

        return $result;
    }

112
    private function storeResult($result, $poptime, $hashValue){
113
        Redis::hset('search.' . $hashValue, $this->name, $result);
114
115
116
117
118
        $connectionInfo = base64_encode(json_encode(curl_getinfo($this->ch), true));
        Redis::hset($this->name . ".stats", $this->pid, $connectionInfo . ";" . $poptime);
        $this->lastTime = microtime(true);
    }

Dominik Hebeler's avatar
Bugfix    
Dominik Hebeler committed
119
    private function shutdown(){
120
121
122
123
124
125
        Redis::hdel($this->name . ".stats", $this->pid);
        if(sizeof(Redis::hgetall($this->name . ".stats")) === 0){
            Redis::del($this->name);
        }
        // We should close our curl handle before we do so
        curl_close($this->ch);
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
    }

    private function initCurlHandle(){
        $ch = curl_init();

        curl_setopt_array($ch, array(
                CURLOPT_RETURNTRANSFER => 1,
                CURLOPT_USERAGENT => "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:40.0) Gecko/20100101 Firefox/40.1",
                CURLOPT_FOLLOWLOCATION => TRUE,
                CURLOPT_CONNECTTIMEOUT => 10,
                CURLOPT_MAXCONNECTS => 500,
                CURLOPT_LOW_SPEED_LIMIT => 500,
                CURLOPT_LOW_SPEED_TIME => 5,
                CURLOPT_TIMEOUT => 10
        ));

        return $ch;
    }
}