From 7c6cdb5ff3d4544faddc7c1c53ccbe9e90b5eea2 Mon Sep 17 00:00:00 2001 From: Nickolai Novik Date: Wed, 27 Aug 2025 13:54:08 -0400 Subject: [PATCH] Make pool_maxsize configurable. --- .../datarobot_drum/drum/common.py | 32 +++++++++++++------ 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/custom_model_runner/datarobot_drum/drum/common.py b/custom_model_runner/datarobot_drum/drum/common.py index 5dc168c01..e924cdcd8 100644 --- a/custom_model_runner/datarobot_drum/drum/common.py +++ b/custom_model_runner/datarobot_drum/drum/common.py @@ -9,6 +9,8 @@ import os import sys import trafaret as t +import requests.adapters + from contextvars import ContextVar from urllib.parse import urlparse, urlunparse @@ -153,10 +155,10 @@ def filter(self, record: logging.LogRecord) -> bool: return not record.name.startswith("opentelemetry") -def _setup_otel_logging(resource, multiprocessing=False): +def _setup_otel_logging(resource, multiprocessing=False, session=None): logger_provider = LoggerProvider(resource=resource) set_logger_provider(logger_provider) - exporter = OTLPLogExporter() + exporter = OTLPLogExporter(session=session) if multiprocessing: logger_provider.add_log_record_processor(SimpleLogRecordProcessor(exporter)) else: @@ -168,16 +170,16 @@ def _setup_otel_logging(resource, multiprocessing=False): return logger_provider -def _setup_otel_metrics(resource): - metric_exporter = OTLPMetricExporter() +def _setup_otel_metrics(resource, session=None): + metric_exporter = OTLPMetricExporter(session=session) metric_reader = PeriodicExportingMetricReader(metric_exporter) metric_provider = MeterProvider(metric_readers=[metric_reader], resource=resource) metrics.set_meter_provider(metric_provider) return metric_provider -def _setup_otel_tracing(resource, multiprocessing=False): - otlp_exporter = OTLPSpanExporter() +def _setup_otel_tracing(resource, multiprocessing=False, session=None): + otlp_exporter = OTLPSpanExporter(session=session) trace_provider = TracerProvider(resource=resource) if multiprocessing: trace_provider.add_span_processor(SimpleSpanProcessor(otlp_exporter)) @@ -225,8 +227,18 @@ def setup_otel(runtime_parameters, options): # (most frequent case) multiprocessing = options.max_workers > 1 + pool_maxsize = 30 # reqeusts default is 10 + if runtime_parameters.has("DR_OTEL_SESSION_POOL_MAXSIZE"): + pool_maxsize = int(runtime_parameters.get("DR_OTEL_SESSION_POOL_MAXSIZE")) + + session = requests.Session() + adapter = requests.adapters.HTTPAdapter(pool_maxsize=pool_maxsize) + session.mount("http://", adapter) + session.mount("https://", adapter) resource = Resource.create() - trace_provider = _setup_otel_tracing(resource=resource, multiprocessing=multiprocessing) + trace_provider = _setup_otel_tracing( + resource=resource, multiprocessing=multiprocessing, session=session + ) logger_provider = None metric_provider = None @@ -235,8 +247,10 @@ def setup_otel(runtime_parameters, options): if runtime_parameters.has("DR_OTEL_METRICS_LOGS_ENABLED") and runtime_parameters.get( "DR_OTEL_METRICS_LOGS_ENABLED" ): - logger_provider = _setup_otel_logging(resource=resource, multiprocessing=multiprocessing) - metric_provider = _setup_otel_metrics(resource=resource) + logger_provider = _setup_otel_logging( + resource=resource, multiprocessing=multiprocessing, session=session + ) + metric_provider = _setup_otel_metrics(resource=resource, session=session) log.info(f"OTEL is configured with endpoint: {endpoint}") return trace_provider, metric_provider, logger_provider