WorkerSpawner.php 4.44 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
<?php

namespace App\Console\Commands;

use Illuminate\Console\Command;
use Illuminate\Support\Facades\Redis;

class WorkerSpawner extends Command
{
    /**
     * The name and signature of the console command.
     *
     * @var string
     */
    protected $signature = 'worker:spawner';

    /**
     * The console command description.
     *
     * @var string
     */
    protected $description = 'This command makes sure that enough worker processes are spawned';

    protected $shouldRun = true;
    protected $processes = [];

    /**
     * Create a new command instance.
     *
     * @return void
     */
    public function __construct()
    {
        parent::__construct();
    }

    /**
     * Execute the console command.
     *
     * @return mixed
     */
    public function handle()
    {
        pcntl_async_signals(true);
        pcntl_signal(SIGINT, [$this, "sig_handler"]);
        pcntl_signal(SIGTERM, [$this, "sig_handler"]);
        pcntl_signal(SIGHUP, [$this, "sig_handler"]);

        try {
            $counter = 0;
            while ($this->shouldRun) {
                $counter++;
                $counter = $counter % 10;
                $length = Redis::llen("queues:default");
                if ($length > 0) {
                    while (true) {
                        usleep(50 * 1000);
                        if (Redis::llen("queues:default") !== $length) {
                            $length = Redis::llen("queues:default");
                        } else {
                            break;
                        }
                    }
                    $jobs = Redis::lrange("queues:default", 0, -1);
                    $length = sizeof($jobs) + 5;
                    $ids = $this->getJobIds($jobs);
                    for ($i = 0; $i <= $length; $i++) {
                        $this->processes[] = $this->spawnWorker();
                    }
                    while (sizeof($ids) > 0) {
                        $jobs = Redis::lrange("queues:default", 0, -1);
                        $newIds = $this->getJobIds($jobs);
                        foreach ($ids as $index => $id) {
                            foreach ($newIds as $newId) {
                                if ($id === $newId) {
                                    continue 2;
                                }
                            }
                            unset($ids[$index]);
                            break;
                        }
                    }
                } else {
                    usleep(100 * 1000); // Sleep for 100ms
                }
                if ($counter === 0) {
                    $newProcs = [];
                    foreach ($this->processes as $process) {
                        $infos = proc_get_status($process["process"]);
                        if (!$infos["running"]) {
                            fclose($process["pipes"][1]);
                            proc_close($process["process"]);
                        } else {
                            $newProcs[] = $process;
                        }
                    }
                    $this->processes = $newProcs;
                }
            }
        } finally {
            foreach ($this->processes as $process) {
                fclose($process["pipes"][1]);
                proc_close($process["process"]);
            }
        }
    }

    private function getJobIds($jobs)
    {
        $result = [];
        foreach ($jobs as $job) {
            $result[] = json_decode($job, true)["id"];
        }
        return $result;
    }

    private function sig_handler($sig)
    {
        $this->shouldRun = false;
        echo ("Terminating Process\n");
    }

    private function spawnWorker()
    {
        $descriptorspec = array(
            0 => array("pipe", "r"), // STDIN ist eine Pipe, von der das Child liest
            1 => array("pipe", "w"), // STDOUT ist eine Pipe, in die das Child schreibt
            2 => array("file", "/tmp/worker-error.txt", "a"), // STDERR ist eine Datei,
            // in die geschrieben wird
        );
        $cwd = getcwd();
        $env = array();

        $process = proc_open('php artisan queue:work --stop-when-empty --sleep=1', $descriptorspec, $pipes, $cwd, $env);
        if (is_resource($process)) {
            fclose($pipes[0]);
            \stream_set_blocking($pipes[1], 0);
            return [
                "process" => $process,
                "pipes" => $pipes,
                "working" => false,
            ];
        }

    }
}