WorkerSpawner.php 5.43 KB
Newer Older
1
2
3
4
5
6
<?php

namespace App\Console\Commands;

use Illuminate\Console\Command;
use Illuminate\Support\Facades\Redis;
Dominik Hebeler's avatar
Dominik Hebeler committed
7
use Log;
8
9
10
11
12
13
14
15

class WorkerSpawner extends Command
{
    /**
     * The name and signature of the console command.
     *
     * @var string
     */
Dominik Hebeler's avatar
Dominik Hebeler committed
16
    protected $signature = 'requests:fetcher';
17
18
19
20
21
22
23
24
25

    /**
     * The console command description.
     *
     * @var string
     */
    protected $description = 'This command makes sure that enough worker processes are spawned';

    protected $shouldRun = true;
Dominik Hebeler's avatar
Dominik Hebeler committed
26
27
    protected $multicurl = null;
    protected $proxyhost, $proxyuser, $proxypassword;
28
29
30
31
32
33
34
35
36

    /**
     * Create a new command instance.
     *
     * @return void
     */
    public function __construct()
    {
        parent::__construct();
Dominik Hebeler's avatar
Dominik Hebeler committed
37
38
39
40
41
42
        $this->multicurl = curl_multi_init();
        $this->proxyhost = env("PROXY_HOST", "");
        $this->proxyport = env("PROXY_PORT", "");
        $this->proxyuser = env("PROXY_USER", "");
        $this->proxypassword = env("PROXY_PASSWORD", "");

43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
    }

    /**
     * Execute the console command.
     *
     * @return mixed
     */
    public function handle()
    {
        pcntl_async_signals(true);
        pcntl_signal(SIGINT, [$this, "sig_handler"]);
        pcntl_signal(SIGTERM, [$this, "sig_handler"]);
        pcntl_signal(SIGHUP, [$this, "sig_handler"]);

        try {
Dominik Hebeler's avatar
Dominik Hebeler committed
58
            $blocking = false;
59
            while ($this->shouldRun) {
Dominik Hebeler's avatar
Dominik Hebeler committed
60
61
62
63
64
65
66
67
                $status = curl_multi_exec($this->multicurl, $active);
                $currentJob = null;
                if (!$blocking) {
                    $currentJob = Redis::lpop(\App\MetaGer::FETCHQUEUE_KEY);
                } else {
                    $currentJob = Redis::blpop(\App\MetaGer::FETCHQUEUE_KEY, 10);
                    if (!empty($currentJob)) {
                        $currentJob = $currentJob[1];
68
                    }
Dominik Hebeler's avatar
Dominik Hebeler committed
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
                }

                if (!empty($currentJob)) {
                    $currentJob = json_decode($currentJob, true);
                    $ch = $this->getCurlHandle($currentJob);
                    curl_multi_add_handle($this->multicurl, $ch);
                    $blocking = false;
                    $active = true;
                }

                $answerRead = false;
                while (($info = curl_multi_info_read($this->multicurl)) !== false) {
                    $answerRead = true;
                    $infos = curl_getinfo($info["handle"], CURLINFO_PRIVATE);
                    $infos = explode(";", $infos);
                    $resulthash = $infos[0];
                    $cacheDuration = intval($infos[1]);
                    $responseCode = curl_getinfo($info["handle"], CURLINFO_HTTP_CODE);
                    $body = "";

                    $error = curl_error($info["handle"]);
                    if (!empty($error)) {
                        Log::error($error);
92
                    }
Dominik Hebeler's avatar
Dominik Hebeler committed
93
94
95
96
97

                    if ($responseCode !== 200) {
                        Log::debug("Got responsecode " . $responseCode . " fetching \"" . curl_getinfo($info["handle"], CURLINFO_EFFECTIVE_URL) . "\n");
                    } else {
                        $body = \curl_multi_getcontent($info["handle"]);
98
                    }
99
100
101
102
103
104

                    Redis::pipeline(function ($pipe) use ($resulthash, $body) {
                        $pipe->set($resulthash, $body);
                        $pipe->expire($resulthash, 60);
                    });
                    #Cache::put($resulthash, $body, now()->addMinutes($cacheDuration));
Dominik Hebeler's avatar
Dominik Hebeler committed
105
                    \curl_multi_remove_handle($this->multicurl, $info["handle"]);
106
                }
Dominik Hebeler's avatar
Dominik Hebeler committed
107
108
                if (!$active && !$answerRead) {
                    $blocking = true;
109
110
111
                }
            }
        } finally {
Dominik Hebeler's avatar
Dominik Hebeler committed
112
            curl_multi_close($this->multicurl);
113
114
115
        }
    }

Dominik Hebeler's avatar
Dominik Hebeler committed
116
    private function getCurlHandle($job)
117
    {
Dominik Hebeler's avatar
Dominik Hebeler committed
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
        $ch = curl_init();

        curl_setopt_array($ch, array(
            CURLOPT_URL => $job["url"],
            CURLOPT_PRIVATE => $job["resulthash"] . ";" . $job["cacheDuration"],
            CURLOPT_RETURNTRANSFER => 1,
            CURLOPT_USERAGENT => "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:40.0) Gecko/20100101 Firefox/40.1",
            CURLOPT_FOLLOWLOCATION => true,
            CURLOPT_CONNECTTIMEOUT => 10,
            CURLOPT_MAXCONNECTS => 500,
            CURLOPT_LOW_SPEED_LIMIT => 500,
            CURLOPT_LOW_SPEED_TIME => 5,
            CURLOPT_TIMEOUT => 10,
        ));

        if (!empty($this->proxyhost) && !empty($this->proxyport) && !empty($this->proxyuser) && !empty($this->proxypassword)) {
            curl_setopt($ch, CURLOPT_PROXY, $this->proxyhost);
            curl_setopt($ch, CURLOPT_PROXYUSERPWD, $this->proxyuser . ":" . $this->proxypassword);
            curl_setopt($ch, CURLOPT_PROXYPORT, $this->proxyport);
            curl_setopt($ch, CURLOPT_PROXYTYPE, CURLPROXY_SOCKS5);
        }

        if (!empty($job["username"]) && !empty($job["password"])) {
            curl_setopt($ch, CURLOPT_USERPWD, $job["username"] . ":" . $job["password"]);
        }

        if (!empty($job["headers"])) {
            $headers = [];
            foreach ($job["headers"] as $key => $value) {
                $headers[] = $key . ":" . $value;
            }
            # Headers are in the Form:
            # <key>:<value>;<key>:<value>
            curl_setopt($ch, CURLOPT_HTTPHEADER, $headers);
152
        }
Dominik Hebeler's avatar
Dominik Hebeler committed
153
154

        return $ch;
155
156
    }

Dominik Hebeler's avatar
Dominik Hebeler committed
157
    public function sig_handler($sig)
158
159
160
161
162
163
    {
        $this->shouldRun = false;
        echo ("Terminating Process\n");
    }

}