Skip to content

Commit b4a364a

Browse files
committed
Add configurable HTTP health-check server
1 parent bd6b377 commit b4a364a

File tree

6 files changed

+336
-5
lines changed

6 files changed

+336
-5
lines changed

README.md

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ Solid Queue can be used with SQL databases such as MySQL, PostgreSQL, or SQLite,
2828
- [Failed jobs and retries](#failed-jobs-and-retries)
2929
- [Error reporting on jobs](#error-reporting-on-jobs)
3030
- [Puma plugin](#puma-plugin)
31+
- [Health-check HTTP server](#health-check-http-server)
3132
- [Jobs and transactional integrity](#jobs-and-transactional-integrity)
3233
- [Recurring tasks](#recurring-tasks)
3334
- [Inspiration](#inspiration)
@@ -603,6 +604,30 @@ that you set in production only. This is what Rails 8's default Puma config look
603604

604605
**Note**: phased restarts are not supported currently because the plugin requires [app preloading](https://github.com/puma/puma?tab=readme-ov-file#cluster-mode) to work.
605606

607+
## Health-check HTTP server
608+
609+
Solid Queue provides a tiny HTTP health-check server that runs as a supervised process.
610+
611+
- Endpoints:
612+
- `/` and `/health`:
613+
- Returns `200 OK` with body `OK` when the supervisor and all supervised processes (workers, dispatchers, scheduler, and the health server itself) have fresh heartbeats.
614+
- Returns `503 Service Unavailable` with body `Unhealthy` if any supervised process (or the supervisor) has a stale heartbeat.
615+
- Any other path: returns `404 Not Found`
616+
- Configure via `config/queue.yml` under `health_server:`. Both `host` and `port` are required.
617+
618+
Enable and configure via process configuration:
619+
620+
```yml
621+
production:
622+
health_server:
623+
host: 0.0.0.0
624+
port: 9393
625+
```
626+
627+
Note:
628+
- This runs under the supervisor just like workers/dispatchers.
629+
- When the Puma plugin is active (`plugin :solid_queue` in `puma.rb`), the configured health server is skipped to avoid running multiple HTTP servers in the same process tree. A warning is logged. If you need the health server, run Solid Queue outside Puma (for example, via `bin/jobs`) or disable the plugin on that instance.
630+
606631
## Jobs and transactional integrity
607632
:warning: Having your jobs in the same ACID-compliant database as your application data enables a powerful yet sharp tool: taking advantage of transactional integrity to ensure some action in your app is not committed unless your job is also committed and vice versa, and ensuring that your job won't be enqueued until the transaction within which you're enqueuing it is committed. This can be very powerful and useful, but it can also backfire if you base some of your logic on this behaviour, and in the future, you move to another active job backend, or if you simply move Solid Queue to its own database, and suddenly the behaviour changes under you. Because this can be quite tricky and many people shouldn't need to worry about it, by default Solid Queue is configured in a different database as the main app.
608633

lib/puma/plugin/solid_queue.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ def start(launcher)
1313

1414
if Gem::Version.new(Puma::Const::VERSION) < Gem::Version.new("7")
1515
launcher.events.on_booted do
16+
SolidQueue.puma_plugin = true
1617
@solid_queue_pid = fork do
1718
Thread.new { monitor_puma }
1819
SolidQueue::Supervisor.start
@@ -23,6 +24,7 @@ def start(launcher)
2324
launcher.events.on_restart { stop_solid_queue }
2425
else
2526
launcher.events.after_booted do
27+
SolidQueue.puma_plugin = true
2628
@solid_queue_pid = fork do
2729
Thread.new { monitor_puma }
2830
SolidQueue::Supervisor.start

lib/solid_queue.rb

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,11 @@ module SolidQueue
4141
mattr_accessor :clear_finished_jobs_after, default: 1.day
4242
mattr_accessor :default_concurrency_control_period, default: 3.minutes
4343

44+
mattr_accessor :puma_plugin, default: false
45+
4446
delegate :on_start, :on_stop, :on_exit, to: Supervisor
4547

46-
[ Dispatcher, Scheduler, Worker ].each do |process|
48+
[ Dispatcher, Scheduler, Worker, HealthServer ].each do |process|
4749
define_singleton_method(:"on_#{process.name.demodulize.downcase}_start") do |&block|
4850
process.on_start(&block)
4951
end

lib/solid_queue/configuration.rb

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,11 @@ class Configuration
77
validate :ensure_configured_processes
88
validate :ensure_valid_recurring_tasks
99
validate :ensure_correctly_sized_thread_pool
10+
validate :ensure_valid_health_server
1011

1112
class Process < Struct.new(:kind, :attributes)
1213
def instantiate
13-
"SolidQueue::#{kind.to_s.titleize}".safe_constantize.new(**attributes)
14+
"SolidQueue::#{kind.to_s.camelize}".safe_constantize.new(**attributes)
1415
end
1516
end
1617

@@ -38,7 +39,7 @@ def initialize(**options)
3839
def configured_processes
3940
if only_work? then workers
4041
else
41-
dispatchers + workers + schedulers
42+
dispatchers + workers + schedulers + health_server
4243
end
4344
end
4445

@@ -129,6 +130,31 @@ def schedulers
129130
end
130131
end
131132

133+
def health_server
134+
if SolidQueue.puma_plugin
135+
SolidQueue.logger&.warn("SolidQueue health server is configured but Puma plugin is active; skipping starting health server to avoid duplicate servers")
136+
return []
137+
end
138+
139+
options = health_server_options
140+
return [] unless options
141+
142+
[ Process.new(:health_server, options) ]
143+
end
144+
145+
def ensure_valid_health_server
146+
server_options = health_server_options
147+
return unless server_options
148+
149+
unless server_options[:host].present?
150+
errors.add(:base, "Health server: host is required")
151+
end
152+
153+
unless server_options.key?(:port) && server_options[:port].present?
154+
errors.add(:base, "Health server: port is required")
155+
end
156+
end
157+
132158
def workers_options
133159
@workers_options ||= processes_config.fetch(:workers, [])
134160
.map { |options| options.dup.symbolize_keys }
@@ -139,6 +165,14 @@ def dispatchers_options
139165
.map { |options| options.dup.symbolize_keys }
140166
end
141167

168+
def health_server_options
169+
@health_server_options ||= begin
170+
options = processes_config[:health_server]
171+
options = options.dup.symbolize_keys if options
172+
options.present? ? options : nil
173+
end
174+
end
175+
142176
def recurring_tasks
143177
@recurring_tasks ||= recurring_tasks_config.map do |id, options|
144178
RecurringTask.from_configuration(id, **options) if options&.has_key?(:schedule)
@@ -147,8 +181,8 @@ def recurring_tasks
147181

148182
def processes_config
149183
@processes_config ||= config_from \
150-
options.slice(:workers, :dispatchers).presence || options[:config_file],
151-
keys: [ :workers, :dispatchers ],
184+
options.slice(:workers, :dispatchers, :health_server).presence || options[:config_file],
185+
keys: [ :workers, :dispatchers, :health_server ],
152186
fallback: { workers: [ WORKER_DEFAULTS ], dispatchers: [ DISPATCHER_DEFAULTS ] }
153187
end
154188

lib/solid_queue/health_server.rb

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
# frozen_string_literal: true
2+
3+
require "socket"
4+
require "logger"
5+
6+
module SolidQueue
7+
class HealthServer < Processes::Base
8+
include Processes::Runnable
9+
10+
attr_reader :host, :port, :logger
11+
12+
def initialize(host:, port:, logger: nil, **options)
13+
@host = host
14+
@port = port
15+
@logger = logger || default_logger
16+
@server = nil
17+
18+
super(**options)
19+
end
20+
21+
def metadata
22+
super.merge(host: host, port: port)
23+
end
24+
25+
def running?
26+
@thread&.alive?
27+
end
28+
29+
private
30+
def run
31+
begin
32+
@server = TCPServer.new(host, port)
33+
log_info("listening on #{host}:#{port}")
34+
35+
loop do
36+
break if shutting_down?
37+
38+
readables, = IO.select([ @server, self_pipe[:reader] ].compact, nil, nil, 1)
39+
next unless readables
40+
41+
if readables.include?(self_pipe[:reader])
42+
drain_self_pipe
43+
end
44+
45+
if readables.include?(@server)
46+
handle_connection
47+
end
48+
end
49+
rescue Exception => exception
50+
handle_thread_error(exception)
51+
ensure
52+
SolidQueue.instrument(:shutdown_process, process: self) do
53+
run_callbacks(:shutdown) { shutdown }
54+
end
55+
end
56+
end
57+
58+
def handle_connection
59+
socket = @server.accept_nonblock(exception: false)
60+
return unless socket.is_a?(::TCPSocket)
61+
62+
begin
63+
request_line = socket.gets
64+
path = request_line&.split(" ")&.at(1) || "/"
65+
66+
if path == "/" || path == "/health"
67+
if system_healthy?
68+
body = "OK"
69+
status_line = "HTTP/1.1 200 OK"
70+
else
71+
body = "Unhealthy"
72+
status_line = "HTTP/1.1 503 Service Unavailable"
73+
end
74+
else
75+
body = "Not Found"
76+
status_line = "HTTP/1.1 404 Not Found"
77+
end
78+
79+
headers = [
80+
"Content-Type: text/plain",
81+
"Content-Length: #{body.bytesize}",
82+
"Connection: close"
83+
].join("\r\n")
84+
85+
socket.write("#{status_line}\r\n#{headers}\r\n\r\n#{body}")
86+
ensure
87+
begin
88+
socket.close
89+
rescue StandardError
90+
end
91+
end
92+
end
93+
94+
def shutdown
95+
begin
96+
@server&.close
97+
rescue StandardError
98+
ensure
99+
@server = nil
100+
end
101+
end
102+
103+
def set_procline
104+
procline "http #{host}:#{port}"
105+
end
106+
107+
def default_logger
108+
logger = Logger.new($stdout)
109+
logger.level = Logger::INFO
110+
logger.progname = "SolidQueueHTTP"
111+
logger
112+
end
113+
114+
def log_info(message)
115+
logger&.info(message)
116+
end
117+
118+
def drain_self_pipe
119+
loop { self_pipe[:reader].read_nonblock(11) }
120+
rescue Errno::EAGAIN, Errno::EINTR, IO::EWOULDBLOCKWaitReadable
121+
end
122+
123+
def system_healthy?
124+
wrap_in_app_executor do
125+
# If not supervised (e.g., unit tests), consider healthy
126+
supervisor_record = process&.supervisor
127+
return true unless supervisor_record
128+
129+
# Supervisor must be alive
130+
supervisor_alive = SolidQueue::Process.where(id: supervisor_record.id).merge(SolidQueue::Process.prunable).none?
131+
132+
# All supervisees must be alive (including this health server)
133+
supervisees_alive = supervisor_record.supervisees.merge(SolidQueue::Process.prunable).none?
134+
135+
supervisor_alive && supervisees_alive
136+
end
137+
rescue StandardError => error
138+
log_info("health check error: #{error.class}: #{error.message}")
139+
false
140+
end
141+
end
142+
end

0 commit comments

Comments
 (0)