@@ -105,58 +105,82 @@ public function watchdog(string $jid, string $worker, ?int $pid = null): void
105105
106106 call_user_func_array ($ callable , $ this ->channels );
107107
108- /** @var \stdClass $message */
109- foreach ($ pubsub as $ message ) {
110- if ($ message ->kind !== 'message ' || empty ($ message ->payload )) {
111- continue ;
112- }
113-
114- $ payload = json_decode ($ message ->payload , true );
115- if (empty ($ payload )) {
116- continue ;
117- }
118-
119- if (empty ($ payload ['event ' ]) || !is_array ($ payload )) {
120- continue ;
121- }
122-
123- if (!in_array ($ payload ['event ' ], self ::WATCHDOG_EVENTS , true ) || empty ($ payload ['jid ' ])) {
124- continue ;
125- }
126-
127- if ($ payload ['jid ' ] !== $ jid ) {
128- continue ;
129- }
130-
131- $ who = 'watchdog: ' . $ worker ;
132-
133- switch ($ payload ['event ' ]) {
134- case self ::LOCK_LOST :
135- if (!empty ($ payload ['worker ' ]) && $ payload ['worker ' ] === $ worker ) {
136- $ this ->logger ->info (
137- "{type}: sending SIGKILL to child {$ pid }; job {jid} handed out to another worker " ,
138- ['type ' => $ who , 'jid ' => $ jid ]
139- );
140-
141- $ this ->system ->posixKill ($ pid , SIGKILL );
142- $ pubsub ->stop ();
143- }
144- break ;
145- case self ::CANCELED :
146- if (!empty ($ payload ['worker ' ]) && $ payload ['worker ' ] === $ worker ) {
147- $ this ->logger ->info (
148- "{type}: sending SIGKILL to child {$ pid }; job {jid} canceled " ,
149- ['type ' => $ who , 'jid ' => $ jid ]
150- );
151- $ this ->system ->posixKill ($ pid , SIGKILL );
108+ try {
109+ /** @var \stdClass $message */
110+ foreach ($ pubsub as $ message ) {
111+ if ($ message ->kind !== 'message ' || empty ($ message ->payload )) {
112+ continue ;
113+ }
114+
115+ $ payload = json_decode ($ message ->payload , true );
116+ if (empty ($ payload )) {
117+ continue ;
118+ }
119+
120+ if (empty ($ payload ['event ' ]) || !is_array ($ payload )) {
121+ continue ;
122+ }
123+
124+ if (!in_array ($ payload ['event ' ], self ::WATCHDOG_EVENTS , true ) || empty ($ payload ['jid ' ])) {
125+ continue ;
126+ }
127+
128+ if ($ payload ['jid ' ] !== $ jid ) {
129+ continue ;
130+ }
131+
132+ switch ($ payload ['event ' ]) {
133+ case self ::LOCK_LOST :
134+ if (!empty ($ payload ['worker ' ]) && $ payload ['worker ' ] === $ worker ) {
135+ $ this ->logger ->info (
136+ "{type}: sending SIGKILL to child {$ pid }; job {jid} handed out to another worker " ,
137+ [
138+ 'type ' => 'watchdog: ' . $ worker ,
139+ 'jid ' => $ jid ,
140+ ]
141+ );
142+
143+ $ this ->system ->posixKill ($ pid , SIGKILL );
144+ $ pubsub ->stop ();
145+ }
146+ break ;
147+ case self ::CANCELED :
148+ if (!empty ($ payload ['worker ' ]) && $ payload ['worker ' ] === $ worker ) {
149+ $ this ->logger ->info (
150+ "{type}: sending SIGKILL to child {$ pid }; job {jid} canceled " ,
151+ [
152+ 'type ' => 'watchdog: ' . $ worker ,
153+ 'jid ' => $ jid ,
154+ ]
155+ );
156+ $ this ->system ->posixKill ($ pid , SIGKILL );
157+ $ pubsub ->stop ();
158+ }
159+ break ;
160+ case self ::COMPLETED :
161+ case self ::FAILED :
152162 $ pubsub ->stop ();
153- }
154- break ;
155- case self ::COMPLETED :
156- case self ::FAILED :
157- $ pubsub ->stop ();
158- break ;
163+ break ;
164+ }
159165 }
166+ } catch (\Throwable $ exception ) {
167+ $ this ->logger ->critical (
168+ "Critical error " ,
169+ [
170+ 'type ' => 'watchdog: ' . $ worker ,
171+ 'jid ' => $ jid ,
172+ 'exception ' => $ exception
173+ ]
174+ );
175+ $ this ->logger ->info (
176+ "{type}: sending SIGKILL to child {$ pid }; job {jid} canceled due to exception " ,
177+ [
178+ 'type ' => 'watchdog: ' . $ worker ,
179+ 'jid ' => $ jid ,
180+ ]
181+ );
182+ $ this ->system ->posixKill ($ pid , SIGKILL );
183+ $ pubsub ->stop ();
160184 }
161185
162186 // Always unset the pubsub consumer instance when you are done! The
0 commit comments