Skip to content
66 changes: 47 additions & 19 deletions lib/puma/plugin/solid_queue.rb
Original file line number Diff line number Diff line change
@@ -1,40 +1,68 @@
require "puma/plugin"

module Puma
class DSL
def solid_queue_mode(mode = :fork)
@options[:solid_queue_mode] = mode.to_sym
end
end
end

Puma::Plugin.create do
attr_reader :puma_pid, :solid_queue_pid, :log_writer, :solid_queue_supervisor

def start(launcher)
@log_writer = launcher.log_writer
@puma_pid = $$

in_background do
monitor_solid_queue
if launcher.options[:solid_queue_mode] == :async
start_async(launcher)
else
start_forked(launcher)
end
end

if Gem::Version.new(Puma::Const::VERSION) < Gem::Version.new("7")
launcher.events.on_booted do
@solid_queue_pid = fork do
Thread.new { monitor_puma }
SolidQueue::Supervisor.start
end
private
def start_forked(launcher)
in_background do
monitor_solid_queue
end

launcher.events.on_stopped { stop_solid_queue }
launcher.events.on_restart { stop_solid_queue }
else
launcher.events.after_booted do
@solid_queue_pid = fork do
Thread.new { monitor_puma }
SolidQueue::Supervisor.start
if Gem::Version.new(Puma::Const::VERSION) < Gem::Version.new("7")
launcher.events.on_booted do
@solid_queue_pid = fork do
Thread.new { monitor_puma }
SolidQueue::Supervisor.start(mode: :fork)
end
end

launcher.events.on_stopped { stop_solid_queue }
launcher.events.on_restart { stop_solid_queue }
else
launcher.events.after_booted do
@solid_queue_pid = fork do
Thread.new { monitor_puma }
SolidQueue::Supervisor.start(mode: :fork)
end
end

launcher.events.after_stopped { stop_solid_queue }
launcher.events.before_restart { stop_solid_queue }
end
end

launcher.events.after_stopped { stop_solid_queue }
launcher.events.before_restart { stop_solid_queue }
def start_async(launcher)
if Gem::Version.new(Puma::Const::VERSION) < Gem::Version.new("7")
launcher.events.on_booted { @solid_queue_supervisor = SolidQueue::Supervisor.start(mode: :async) }
launcher.events.on_stopped { solid_queue_supervisor&.stop }
launcher.events.on_restart { solid_queue_supervisor&.stop; @solid_queue_supervisor = SolidQueue::Supervisor.start(mode: :async) }
else
launcher.events.after_booted { @solid_queue_supervisor = SolidQueue::Supervisor.start(mode: :async) }
launcher.events.after_stopped { solid_queue_supervisor&.stop }
launcher.events.before_restart { solid_queue_supervisor&.stop; @solid_queue_supervisor = SolidQueue::Supervisor.start(mode: :async) }
end
end
end

private
def stop_solid_queue
Process.waitpid(solid_queue_pid, Process::WNOHANG)
log "Stopping Solid Queue..."
Expand Down
98 changes: 98 additions & 0 deletions lib/solid_queue/async_supervisor.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
# frozen_string_literal: true

module SolidQueue
class AsyncSupervisor < Supervisor
private
attr_reader :threads

def start_processes
@threads = {}

configuration.configured_processes.each { |configured_process| start_process(configured_process) }
end

def start_process(configured_process)
process_instance = configured_process.instantiate.tap do |instance|
instance.supervised_by process
instance.mode = :async
end

thread = Thread.new do
begin
process_instance.start
rescue Exception => e
puts "Error in thread: #{e.message}"
puts e.backtrace
end
end
threads[thread] = [ process_instance, configured_process ]
end

def terminate_gracefully
SolidQueue.instrument(:graceful_termination, process_id: process_id, supervisor_pid: ::Process.pid, supervised_processes: supervised_processes) do |payload|
processes.each(&:stop)

Timer.wait_until(SolidQueue.shutdown_timeout, -> { all_threads_terminated? }) do
# No-op, we just wait
end

unless all_threads_terminated?
payload[:shutdown_timeout_exceeded] = true
terminate_immediately
end
end
end

def terminate_immediately
SolidQueue.instrument(:immediate_termination, process_id: process_id, supervisor_pid: ::Process.pid, supervised_processes: supervised_processes) do
threads.keys.each(&:kill)
end
end

def supervised_processes
processes.map(&:to_s)
end

def reap_and_replace_terminated_forks
# No-op in async mode, we'll check for dead threads in the supervise loop
end

def all_threads_terminated?
threads.keys.all? { |thread| !thread.alive? }
end

def supervise
loop do
break if stopped?

set_procline
process_signal_queue

unless stopped?
check_and_replace_terminated_threads
interruptible_sleep(1.second)
end
end
ensure
shutdown
end

def check_and_replace_terminated_threads
terminated_threads = {}
threads.each do |thread, (process, configured_process)|
unless thread.alive?
terminated_threads[thread] = configured_process
end
end

terminated_threads.each do |thread, configured_process|
threads.delete(thread)
start_process(configured_process)
end
end

def processes
threads.values.map(&:first)
end
end
end
2 changes: 2 additions & 0 deletions lib/solid_queue/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ class Cli < Thor
desc: "Path to config file (default: #{Configuration::DEFAULT_CONFIG_FILE_PATH}).",
banner: "SOLID_QUEUE_CONFIG"

class_option :mode, type: :string, default: "fork", enum: %w[ fork async ], desc: "Whether to fork processes for workers and dispatchers (fork) or to run these in the same process as the supervisor (async)"

class_option :recurring_schedule_file, type: :string,
desc: "Path to recurring schedule definition (default: #{Configuration::DEFAULT_RECURRING_SCHEDULE_FILE_PATH}).",
banner: "SOLID_QUEUE_RECURRING_SCHEDULE"
Expand Down
11 changes: 10 additions & 1 deletion lib/solid_queue/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,11 @@ def instantiate
DEFAULT_CONFIG_FILE_PATH = "config/queue.yml"
DEFAULT_RECURRING_SCHEDULE_FILE_PATH = "config/recurring.yml"

attr_reader :mode

def initialize(**options)
@options = options.with_defaults(default_options)
@mode = @options[:mode].to_s.inquiry
end

def configured_processes
Expand Down Expand Up @@ -84,6 +87,7 @@ def ensure_correctly_sized_thread_pool

def default_options
{
mode: :fork,
config_file: Rails.root.join(ENV["SOLID_QUEUE_CONFIG"] || DEFAULT_CONFIG_FILE_PATH),
recurring_schedule_file: Rails.root.join(ENV["SOLID_QUEUE_RECURRING_SCHEDULE"] || DEFAULT_RECURRING_SCHEDULE_FILE_PATH),
only_work: false,
Expand All @@ -110,7 +114,12 @@ def skip_recurring_tasks?

def workers
workers_options.flat_map do |worker_options|
processes = worker_options.fetch(:processes, WORKER_DEFAULTS[:processes])
processes = if mode.fork?
worker_options.fetch(:processes, WORKER_DEFAULTS[:processes])
else
1
end

processes.times.map { Process.new(:worker, worker_options.with_defaults(WORKER_DEFAULTS)) }
end
end
Expand Down
111 changes: 111 additions & 0 deletions lib/solid_queue/fork_supervisor.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
# frozen_string_literal: true

module SolidQueue
class ForkSupervisor < Supervisor
private
attr_reader :forks, :configured_processes

def start_processes
@forks = {}
@configured_processes = {}

configuration.configured_processes.each { |configured_process| start_process(configured_process) }
end

def start_process(configured_process)
process_instance = configured_process.instantiate.tap do |instance|
instance.supervised_by process
instance.mode = :fork
end

pid = fork do
process_instance.start
end

configured_processes[pid] = configured_process
forks[pid] = process_instance
end

def terminate_gracefully
SolidQueue.instrument(:graceful_termination, process_id: process_id, supervisor_pid: ::Process.pid, supervised_processes: supervised_processes) do |payload|
term_forks

Timer.wait_until(SolidQueue.shutdown_timeout, -> { all_forks_terminated? }) do
reap_terminated_forks
end

unless all_forks_terminated?
payload[:shutdown_timeout_exceeded] = true
terminate_immediately
end
end
end

def terminate_immediately
SolidQueue.instrument(:immediate_termination, process_id: process_id, supervisor_pid: ::Process.pid, supervised_processes: supervised_processes) do
quit_forks
end
end

def supervised_processes
forks.keys
end

def term_forks
signal_processes(forks.keys, :TERM)
end

def quit_forks
signal_processes(forks.keys, :QUIT)
end

def reap_and_replace_terminated_forks
loop do
pid, status = ::Process.waitpid2(-1, ::Process::WNOHANG)
break unless pid

replace_fork(pid, status)
end
end

def reap_terminated_forks
loop do
pid, status = ::Process.waitpid2(-1, ::Process::WNOHANG)
break unless pid

if (terminated_fork = forks.delete(pid)) && (!status.exited? || status.exitstatus > 0)
handle_claimed_jobs_by(terminated_fork, status)
end

configured_processes.delete(pid)
end
rescue SystemCallError
# All children already reaped
end

def replace_fork(pid, status)
SolidQueue.instrument(:replace_fork, supervisor_pid: ::Process.pid, pid: pid, status: status) do |payload|
if terminated_fork = forks.delete(pid)
payload[:fork] = terminated_fork
handle_claimed_jobs_by(terminated_fork, status)

start_process(configured_processes.delete(pid))
end
end
end

# When a supervised fork crashes or exits we need to mark all the
# executions it had claimed as failed so that they can be retried
# by some other worker.
def handle_claimed_jobs_by(terminated_fork, status)
if registered_process = SolidQueue::Process.find_by(name: terminated_fork.name)
error = Processes::ProcessExitError.new(status)
registered_process.fail_all_claimed_executions_with(error)
end
end

def all_forks_terminated?
forks.empty?
end
end
end
6 changes: 1 addition & 5 deletions lib/solid_queue/processes/runnable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,7 @@ module Runnable
def start
boot

if running_async?
@thread = create_thread { run }
else
run
end
run
end

def stop
Expand Down
Loading
Loading