diff --git a/config/datadog_example.ini b/config/datadog_example.ini new file mode 100644 index 0000000..57329e1 --- /dev/null +++ b/config/datadog_example.ini @@ -0,0 +1,15 @@ +[main] +upstream = amqp://localhost:5672 +log_level = INFO +idle_connection_timeout = 5 + +[listen] +address = localhost +port = 5673 + +[datadog] +enabled = true +service_name = amqproxy +env = production +agent_host = localhost +agent_port = 8126 \ No newline at end of file diff --git a/src/amqproxy/cli.cr b/src/amqproxy/cli.cr index 24e5ce2..56b592f 100644 --- a/src/amqproxy/cli.cr +++ b/src/amqproxy/cli.cr @@ -1,6 +1,9 @@ require "./version" require "./server" require "./http_server" +require "./tracer" +require "./nil_tracer" +require "./datadog_tracer" require "option_parser" require "uri" require "ini" @@ -17,6 +20,11 @@ class AMQProxy::CLI @term_timeout = -1 @term_client_close_timeout = 0 @server : AMQProxy::Server? = nil + @datadog_enabled = false + @datadog_service_name = "amqproxy" + @datadog_env = "production" + @datadog_agent_host = "localhost" + @datadog_agent_port = 8126 def parse_config(path) # ameba:disable Metrics/CyclomaticComplexity INI.parse(File.read(path)).each do |name, section| @@ -32,6 +40,17 @@ class AMQProxy::CLI else raise "Unsupported config #{name}/#{key}" end end + when "datadog" + section.each do |key, value| + case key + when "enabled" then @datadog_enabled = value.downcase == "true" + when "service_name" then @datadog_service_name = value + when "env" then @datadog_env = value + when "agent_host" then @datadog_agent_host = value + when "agent_port" then @datadog_agent_port = value.to_i + else raise "Unsupported config #{name}/#{key}" + end + end when "listen" section.each do |key, value| case key @@ -48,7 +67,7 @@ class AMQProxy::CLI abort ex.message end - def apply_env_variables + def apply_env_variables # ameba:disable Metrics/CyclomaticComplexity @listen_address = ENV["LISTEN_ADDRESS"]? || @listen_address @listen_port = ENV["LISTEN_PORT"]?.try &.to_i || @listen_port @http_port = ENV["HTTP_PORT"]?.try &.to_i || @http_port @@ -57,6 +76,11 @@ class AMQProxy::CLI @term_timeout = ENV["TERM_TIMEOUT"]?.try &.to_i || @term_timeout @term_client_close_timeout = ENV["TERM_CLIENT_CLOSE_TIMEOUT"]?.try &.to_i || @term_client_close_timeout @upstream = ENV["AMQP_URL"]? || @upstream + @datadog_enabled = ENV["DD_TRACE_ENABLED"]?.try { |v| v.downcase == "true" } || @datadog_enabled + @datadog_service_name = ENV["DD_SERVICE"]? || @datadog_service_name + @datadog_env = ENV["DD_ENV"]? || @datadog_env + @datadog_agent_host = ENV["DD_AGENT_HOST"]? || @datadog_agent_host + @datadog_agent_port = ENV["DD_TRACE_AGENT_PORT"]?.try &.to_i || @datadog_agent_port end def run(argv) @@ -88,6 +112,11 @@ class AMQProxy::CLI @term_client_close_timeout = v.to_i end parser.on("-d", "--debug", "Verbose logging") { @log_level = ::Log::Severity::Debug } + parser.on("--datadog-enabled", "Enable Datadog APM tracing") { @datadog_enabled = true } + parser.on("--datadog-service=SERVICE", "Datadog service name (default: amqproxy)") { |v| @datadog_service_name = v } + parser.on("--datadog-env=ENV", "Datadog environment (default: production)") { |v| @datadog_env = v } + parser.on("--datadog-agent-host=HOST", "Datadog agent host (default: localhost)") { |v| @datadog_agent_host = v } + parser.on("--datadog-agent-port=PORT", "Datadog agent port (default: 8126)") { |v| @datadog_agent_port = v.to_i } parser.on("-h", "--help", "Show this help") { puts parser.to_s; exit 0 } parser.on("-v", "--version", "Display version") { puts AMQProxy::VERSION.to_s; exit 0 } parser.invalid_option { |arg| abort "Invalid argument: #{arg}" } @@ -117,7 +146,12 @@ class AMQProxy::CLI Signal::INT.trap &->self.initiate_shutdown(Signal) Signal::TERM.trap &->self.initiate_shutdown(Signal) - server = @server = AMQProxy::Server.new(u.hostname || "", port, tls, @idle_connection_timeout) + tracer : Tracer = if @datadog_enabled + DatadogTracer.new(@datadog_service_name, @datadog_env, AMQProxy::VERSION, @datadog_agent_host, @datadog_agent_port) + else + NilTracer.new + end + server = @server = AMQProxy::Server.new(u.hostname || "", port, tls, @idle_connection_timeout, tracer) HTTPServer.new(server, @listen_address, @http_port.to_i) server.listen(@listen_address, @listen_port.to_i) diff --git a/src/amqproxy/client.cr b/src/amqproxy/client.cr index ffdc8a6..4ecae08 100644 --- a/src/amqproxy/client.cr +++ b/src/amqproxy/client.cr @@ -3,6 +3,8 @@ require "amq-protocol" require "./version" require "./upstream" require "./records" +require "./tracer" +require "./nil_tracer" module AMQProxy class Client @@ -14,8 +16,9 @@ module AMQProxy @channel_max : UInt16 @heartbeat : UInt16 @last_heartbeat = Time.monotonic + @tracer : Tracer - def initialize(@socket : TCPSocket) + def initialize(@socket : TCPSocket, @tracer = NilTracer) set_socket_options(@socket) tune_ok, @credentials = negotiate(@socket) @frame_max = tune_ok.frame_max @@ -42,11 +45,17 @@ module AMQProxy private def finish_publish(channel) buffer = @publish_buffers[channel] - if upstream_channel = @channel_map[channel] - upstream_channel.write(buffer.publish) - upstream_channel.write(buffer.header) - buffer.bodies.each do |body| - upstream_channel.write(body) + @tracer.trace("amqp.publish", buffer.publish.exchange, { + amqp_exchange: buffer.publish.exchange, + amqp_routing_key: buffer.publish.routing_key, + amqp_body_size: buffer.header.body_size.to_s, + }) do + if upstream_channel = @channel_map[channel] + upstream_channel.write(buffer.publish) + upstream_channel.write(buffer.header) + buffer.bodies.each do |body| + upstream_channel.write(body) + end end end ensure diff --git a/src/amqproxy/datadog_tracer.cr b/src/amqproxy/datadog_tracer.cr new file mode 100644 index 0000000..aeb6b34 --- /dev/null +++ b/src/amqproxy/datadog_tracer.cr @@ -0,0 +1,95 @@ +require "http/client" +require "json" +require "log" +require "./tracer" + +module AMQProxy + class DatadogTracer < Tracer + Log = ::Log.for(self) + + @service_name : String + @env : String + @version : String + @agent_host : String + @agent_port : Int32 + @http_client : HTTP::Client + + def initialize(service_name = "amqproxy", env = "production", version = VERSION, agent_host = "localhost", agent_port = 8126) + @service_name = service_name + @env = env + @version = version + @agent_host = agent_host + @agent_port = agent_port + @http_client = HTTP::Client.new(agent_host, agent_port) + @http_client.connect_timeout = 1.second + @http_client.read_timeout = 1.second + end + + def trace(operation_name : String, resource : String? = nil, tags = NamedTuple.new, &) + trace_id = Random.rand(UInt64::MAX) + span_id = Random.rand(UInt64::MAX) + start_time = Time.monotonic + start_time_ns = Time.utc.to_unix_ns.to_i64 + + begin + result = yield + duration = (Time.monotonic - start_time).total_nanoseconds.to_i64 + send_span(trace_id, span_id, operation_name, resource, start_time_ns, duration, tags, error: false) + result + rescue ex + duration = (Time.monotonic - start_time).total_nanoseconds.to_i64 + error_tags = tags.merge(error_type: ex.class.name, error_message: ex.message) + send_span(trace_id, span_id, operation_name, resource, start_time_ns, duration, error_tags, error: true) + raise ex + end + end + + private def send_span(trace_id : UInt64, span_id : UInt64, operation_name : String, resource : String?, start_time : Int64, duration : Int64, tags, error : Bool) + meta_tags = tags.merge({ + env: @env, + version: @version, + language: "crystal", + }) + + span = { + trace_id: trace_id, + span_id: span_id, + name: operation_name, + resource: resource || operation_name, + service: @service_name, + type: "custom", + start: start_time, + duration: duration, + error: error ? 1 : 0, + meta: meta_tags, + } + + payload = JSON.build do |json| + json.array do + json.array do + span.to_json(json) + end + end + end + + spawn do + begin + response = @http_client.put("/v0.4/traces", + headers: HTTP::Headers{"Content-Type" => "application/json"}, + body: payload + ) + + unless response.success? + Log.debug { "Failed to send trace to Datadog: #{response.status_code} #{response.body}" } + end + rescue ex + Log.debug { "Error sending trace to Datadog: #{ex.message}" } + end + end + end + + def close + @http_client.close + end + end +end diff --git a/src/amqproxy/nil_tracer.cr b/src/amqproxy/nil_tracer.cr new file mode 100644 index 0000000..3715cc6 --- /dev/null +++ b/src/amqproxy/nil_tracer.cr @@ -0,0 +1,12 @@ +require "./tracer" + +module AMQProxy + class NilTracer < Tracer + def trace(operation_name : String, resource : String? = nil, tags = NamedTuple.new, &) + yield + end + + def close + end + end +end diff --git a/src/amqproxy/server.cr b/src/amqproxy/server.cr index 4af02f4..1030daf 100644 --- a/src/amqproxy/server.cr +++ b/src/amqproxy/server.cr @@ -5,23 +5,26 @@ require "uri" require "./channel_pool" require "./client" require "./upstream" +require "./tracer" +require "./nil_tracer" module AMQProxy class Server Log = ::Log.for(self) @clients_lock = Mutex.new @clients = Array(Client).new + @tracer : Tracer - def self.new(url : URI) + def self.new(url : URI, tracer = NilTracer.new) tls = url.scheme == "amqps" host = url.host || "127.0.0.1" port = url.port || 5762 port = 5671 if tls && url.port.nil? idle_connection_timeout = url.query_params.fetch("idle_connection_timeout", 5).to_i - new(host, port, tls, idle_connection_timeout) + new(host, port, tls, idle_connection_timeout, tracer) end - def initialize(upstream_host, upstream_port, upstream_tls, idle_connection_timeout = 5) + def initialize(upstream_host, upstream_port, upstream_tls, idle_connection_timeout = 5, @tracer = NilTracer.new) tls_ctx = OpenSSL::SSL::Context::Client.new if upstream_tls @channel_pools = Hash(Credentials, ChannelPool).new do |hash, credentials| hash[credentials] = ChannelPool.new(upstream_host, upstream_port, tls_ctx, credentials, idle_connection_timeout) @@ -45,12 +48,10 @@ module AMQProxy Log.info { "Proxy listening on #{server.local_address}" } loop do socket = server.accept? || break - begin - addr = socket.remote_address - spawn handle_connection(socket, addr), name: "Client#read_loop #{addr}" - rescue IO::Error - next - end + addr = socket.remote_address + spawn handle_connection(socket, addr), name: "Client#read_loop #{addr}" + rescue IO::Error + next end Log.info { "Proxy stopping accepting connections" } end @@ -74,7 +75,9 @@ module AMQProxy end private def handle_connection(socket, remote_address) - c = Client.new(socket) + c = @tracer.trace("client.connection", remote_address.to_s) do + Client.new(socket, @tracer) + end active_client(c) do channel_pool = @channel_pools[c.credentials] c.read_loop(channel_pool) diff --git a/src/amqproxy/tracer.cr b/src/amqproxy/tracer.cr new file mode 100644 index 0000000..b5085f6 --- /dev/null +++ b/src/amqproxy/tracer.cr @@ -0,0 +1,6 @@ +module AMQProxy + abstract class Tracer + abstract def trace(operation_name : String, resource : String? = nil, tags = NamedTuple.new, &) + abstract def close + end +end