Skip to content

Commit fc50f8f

Browse files
committed
agents: implement gRPC retry policies
Enable them in opentelemetry-cpp by defining `ENABLE_OTLP_RETRY_PREVIEW` and also them to our GrpcClient. The values we're using with a default deadline of 10 seconds are: - max_attemps: 5 - initial_backoff: 500ms - max_backoff: 5s - backoff_multiplier: 2 Add NSOLID_GRPC_DEADLINE env var for configurable gRPC timeouts. Specially useful for testing. Refactor and dry code when creating grpc exporters inside the new GrpcAgent::configure_grpc_exporters() helper function. Create parse_env_var_int() utility in nsolid_util.h for safe env parsing.
1 parent 6ed7616 commit fc50f8f

File tree

10 files changed

+241
-104
lines changed

10 files changed

+241
-104
lines changed

agents/grpc/src/asset_stream.cc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "asset_stream.h"
2+
#include "grpc_client.h"
23

34
#include <cinttypes>
45

@@ -8,6 +9,8 @@
89

910
using grpc::Status;
1011
using grpcagent::NSolidService;
12+
using std::chrono::system_clock;
13+
using std::chrono::seconds;
1114

1215
namespace node {
1316
namespace nsolid {
@@ -27,6 +30,10 @@ AssetStream::AssetStream(
2730
context_.AddMetadata("nsolid-saas-token", saas);
2831
}
2932

33+
// Use configurable deadline for asset streams
34+
context_.set_deadline(system_clock::now() +
35+
seconds(get_grpc_deadline_seconds()));
36+
3037
// Call the appropriate RPC method based on the rpc_type parameter
3138
if (rpc_type == EXPORT_CONTINUOUS_PROFILE) {
3239
stub->async()->ExportContinuousProfile(&context_, &event_response_, this);

agents/grpc/src/grpc_agent.cc

Lines changed: 85 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "grpc_agent.h"
2+
#include "../../src/nsolid/nsolid_util.h"
23

34
#include "asserts-cpp/asserts.h"
45
#include "nsolid/nsolid_api.h"
@@ -62,6 +63,11 @@ using ThreadMetricsMap = std::map<uint64_t, ThreadMetrics::MetricsStor>;
6263

6364
constexpr uint64_t span_timer_interval = 1000;
6465
constexpr size_t span_msg_q_min_size = 1000;
66+
// At class or file scope
67+
constexpr size_t retry_max_attempts = 5;
68+
constexpr auto retry_initial_backoff = std::chrono::milliseconds(500);
69+
constexpr auto retry_max_backoff = std::chrono::seconds(5);
70+
constexpr float retry_backoff_multiplier = 2.0f;
6571

6672
const char* const kNSOLID_GRPC_INSECURE = "NSOLID_GRPC_INSECURE";
6773
const char* const kNSOLID_GRPC_CERTS = "NSOLID_GRPC_CERTS";
@@ -992,7 +998,81 @@ void GrpcAgent::check_exit_on_profile() {
992998
}
993999
}
9941000

995-
int GrpcAgent::config(const json& config) {
1001+
void GrpcAgent::configure_grpc_exporters(const std::string& endpoint,
1002+
bool insecure) {
1003+
Debug("GrpcAgent configured. Endpoint: %s. Insecure: %d\n",
1004+
endpoint.c_str(), static_cast<unsigned>(insecure));
1005+
1006+
OtlpGrpcClientOptions opts;
1007+
opts.compression = "gzip";
1008+
opts.endpoint = endpoint;
1009+
opts.metadata = {{"nsolid-agent-id", agent_id_},
1010+
{"nsolid-saas", saas()}};
1011+
// Make sure the client is initialized. We set it to the same
1012+
// default value as max_concurrent_requests as the exporters.
1013+
opts.max_concurrent_requests = 64;
1014+
opts.use_ssl_credentials = !insecure;
1015+
if (!insecure) {
1016+
if (!custom_certs_.empty()) {
1017+
opts.ssl_credentials_cacert_as_string = custom_certs_;
1018+
} else {
1019+
opts.ssl_credentials_cacert_as_string = cacert_;
1020+
}
1021+
}
1022+
1023+
opts.retry_policy_max_attempts = retry_max_attempts;
1024+
opts.retry_policy_initial_backoff = retry_initial_backoff;
1025+
opts.retry_policy_max_backoff = retry_max_backoff;
1026+
opts.retry_policy_backoff_multiplier = retry_backoff_multiplier;
1027+
1028+
nsolid_service_stub_ = GrpcClient::MakeNSolidServiceStub(opts);
1029+
// CommandStream needs to be created before the OTLP client to avoid
1030+
// a race condition with abseil mutexes.
1031+
reset_command_stream();
1032+
1033+
std::shared_ptr<OtlpGrpcClient> client =
1034+
OtlpGrpcClientFactory::Create(opts);
1035+
1036+
auto set_common_options = [&](auto& options) {
1037+
options.compression = "gzip";
1038+
options.endpoint = endpoint;
1039+
options.metadata = {{"nsolid-agent-id", agent_id_},
1040+
{"nsolid-saas", saas()}};
1041+
if (!insecure) {
1042+
options.use_ssl_credentials = true;
1043+
if (!custom_certs_.empty()) {
1044+
options.ssl_credentials_cacert_as_string = custom_certs_;
1045+
} else {
1046+
options.ssl_credentials_cacert_as_string = cacert_;
1047+
}
1048+
}
1049+
1050+
options.retry_policy_max_attempts = retry_max_attempts;
1051+
options.retry_policy_initial_backoff = retry_initial_backoff;
1052+
options.retry_policy_max_backoff = retry_max_backoff;
1053+
options.retry_policy_backoff_multiplier = retry_backoff_multiplier;
1054+
};
1055+
1056+
{
1057+
OtlpGrpcExporterOptions options;
1058+
set_common_options(options);
1059+
trace_exporter_ = std::make_unique<OtlpGrpcExporter>(options, client);
1060+
}
1061+
{
1062+
OtlpGrpcMetricExporterOptions options;
1063+
set_common_options(options);
1064+
metrics_exporter_ =
1065+
std::make_unique<OtlpGrpcMetricExporter>(options, client);
1066+
}
1067+
{
1068+
OtlpGrpcLogRecordExporterOptions options;
1069+
set_common_options(options);
1070+
log_exporter_ =
1071+
std::make_unique<OtlpGrpcLogRecordExporter>(options, client);
1072+
}
1073+
}
1074+
1075+
int GrpcAgent::config(const nlohmann::json& config) {
9961076
int ret = 0;
9971077
json old_config = config_;
9981078
config_ = config;
@@ -1020,93 +1100,13 @@ int GrpcAgent::config(const json& config) {
10201100
// Only parse the insecure flag in non SaaS mode.
10211101
if (insecure_str.has_value() && (!saas_ || saas_->testing)) {
10221102
// insecure = std::stoull(insecure_str.value());
1023-
insecure = std::stoi(insecure_str.value());
1103+
insecure = utils::parse_env_var_int(kNSOLID_GRPC_INSECURE, 0, 0, 1);
10241104
}
10251105

10261106
const std::string& endpoint = !saas_ ?
10271107
it->get<std::string>() :
10281108
saas_->endpoint;
1029-
Debug("GrpcAgent configured. Endpoint: %s. Insecure: %d\n",
1030-
endpoint.c_str(), static_cast<unsigned>(insecure));
1031-
1032-
OtlpGrpcClientOptions opts;
1033-
opts.compression = "gzip";
1034-
opts.endpoint = endpoint;
1035-
opts.metadata = {{"nsolid-agent-id", agent_id_},
1036-
{"nsolid-saas", saas()}};
1037-
// Make sure the client is initialized. We set it to the same
1038-
// default value as ax_concurrent_requests as the exporters.
1039-
opts.max_concurrent_requests = 64;
1040-
opts.use_ssl_credentials = !insecure;
1041-
if (!insecure) {
1042-
if (!custom_certs_.empty()) {
1043-
opts.ssl_credentials_cacert_as_string = custom_certs_;
1044-
} else {
1045-
opts.ssl_credentials_cacert_as_string = cacert_;
1046-
}
1047-
}
1048-
1049-
nsolid_service_stub_ = GrpcClient::MakeNSolidServiceStub(opts);
1050-
// CommandStream needs to be created before the OTLP client to avoid
1051-
// a race condition with abseil mutexes.
1052-
reset_command_stream();
1053-
1054-
std::shared_ptr<OtlpGrpcClient> client =
1055-
OtlpGrpcClientFactory::Create(opts);
1056-
1057-
{
1058-
OtlpGrpcExporterOptions options;
1059-
options.compression = "gzip";
1060-
options.endpoint = endpoint;
1061-
options.metadata = {{"nsolid-agent-id", agent_id_},
1062-
{"nsolid-saas", saas()}};
1063-
if (!insecure) {
1064-
options.use_ssl_credentials = true;
1065-
if (!custom_certs_.empty()) {
1066-
options.ssl_credentials_cacert_as_string = custom_certs_;
1067-
} else {
1068-
options.ssl_credentials_cacert_as_string = cacert_;
1069-
}
1070-
}
1071-
1072-
trace_exporter_ = std::make_unique<OtlpGrpcExporter>(options, client);
1073-
}
1074-
{
1075-
OtlpGrpcMetricExporterOptions options;
1076-
options.compression = "gzip";
1077-
options.endpoint = endpoint;
1078-
options.metadata = {{"nsolid-agent-id", agent_id_},
1079-
{"nsolid-saas", saas()}};
1080-
if (!insecure) {
1081-
options.use_ssl_credentials = true;
1082-
if (!custom_certs_.empty()) {
1083-
options.ssl_credentials_cacert_as_string = custom_certs_;
1084-
} else {
1085-
options.ssl_credentials_cacert_as_string = cacert_;
1086-
}
1087-
}
1088-
1089-
metrics_exporter_ =
1090-
std::make_unique<OtlpGrpcMetricExporter>(options, client);
1091-
}
1092-
{
1093-
OtlpGrpcLogRecordExporterOptions options;
1094-
options.compression = "gzip";
1095-
options.endpoint = endpoint;
1096-
options.metadata = {{"nsolid-agent-id", agent_id_},
1097-
{"nsolid-saas", saas()}};
1098-
if (!insecure) {
1099-
options.use_ssl_credentials = true;
1100-
if (!custom_certs_.empty()) {
1101-
options.ssl_credentials_cacert_as_string = custom_certs_;
1102-
} else {
1103-
options.ssl_credentials_cacert_as_string = cacert_;
1104-
}
1105-
}
1106-
1107-
log_exporter_ =
1108-
std::make_unique<OtlpGrpcLogRecordExporter>(options, client);
1109-
}
1109+
configure_grpc_exporters(endpoint, insecure);
11101110
}
11111111
}
11121112

@@ -1804,7 +1804,8 @@ void GrpcAgent::send_exit() {
18041804
exit_body->set_profile(cpu_profile_state.last_main_profile);
18051805
}
18061806

1807-
auto context = GrpcClient::MakeClientContext(agent_id_, saas());
1807+
// 1 second for fast exit
1808+
auto context = GrpcClient::MakeClientContext(agent_id_, saas(), 1);
18081809
uv_cond_t cond;
18091810
uv_mutex_t lock;
18101811
bool signaled = false;

agents/grpc/src/grpc_agent.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,8 @@ class GrpcAgent: public std::enable_shared_from_this<GrpcAgent>,
202202

203203
void command_stream_closed(const ::grpc::Status& s);
204204

205+
void configure_grpc_exporters(const std::string& endpoint, bool insecure);
206+
205207
int config(const nlohmann::json& config);
206208

207209
void do_start();

agents/grpc/src/grpc_client.cc

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22
#include "debug_utils-inl.h"
33
#include "opentelemetry/exporters/otlp/otlp_grpc_client_options.h"
44

5+
#include <cerrno>
6+
#include <cstdlib>
7+
#include <limits>
8+
59
using grpc::Channel;
610
using grpc::ChannelArguments;
711
using grpc::ClientContext;
@@ -11,11 +15,23 @@ using grpc::SslCredentials;
1115
using grpc::SslCredentialsOptions;
1216
using grpcagent::NSolidService;
1317
using opentelemetry::v1::exporter::otlp::OtlpGrpcClientOptions;
18+
using std::chrono::system_clock;
19+
using std::chrono::seconds;
1420

1521
namespace node {
1622
namespace nsolid {
1723
namespace grpc {
1824

25+
const char* const kNSOLID_GRPC_DEADLINE = "NSOLID_GRPC_DEADLINE";
26+
27+
int get_grpc_deadline_seconds() {
28+
static int cached_deadline = -1;
29+
if (cached_deadline == -1) {
30+
cached_deadline = parse_env_var_int(kNSOLID_GRPC_DEADLINE, 10, 1, INT_MAX);
31+
}
32+
return cached_deadline;
33+
}
34+
1935
/**
2036
* Create gRPC channel.
2137
*/
@@ -32,6 +48,48 @@ std::shared_ptr<Channel>
3248
grpc_arguments.SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 15 * 1000 /* 15 sec*/);
3349
grpc_arguments.SetInt(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1);
3450
grpc_arguments.SetInt(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA, 0);
51+
52+
53+
static const auto kServiceConfigJson = std::string_view{R"(
54+
{
55+
"methodConfig": [
56+
{
57+
"name": [{}],
58+
"retryPolicy": {
59+
"maxAttempts": %0000000000u,
60+
"initialBackoff": "%0000000000.1fs",
61+
"maxBackoff": "%0000000000.1fs",
62+
"backoffMultiplier": %0000000000.1f,
63+
"retryableStatusCodes": [
64+
"CANCELLED",
65+
"DEADLINE_EXCEEDED",
66+
"ABORTED",
67+
"OUT_OF_RANGE",
68+
"DATA_LOSS",
69+
"UNAVAILABLE"
70+
]
71+
}
72+
}
73+
]
74+
})"};
75+
76+
// Allocate string with buffer large enough to hold the formatted json config
77+
auto service_config = std::string(kServiceConfigJson.size(), '\0');
78+
// Prior to C++17, need to explicitly cast away constness from `data()` buffer
79+
float initial_backoff = options.retry_policy_initial_backoff.count();
80+
float max_backoff = options.retry_policy_max_backoff.count();
81+
float backoff_multiplier = options.retry_policy_backoff_multiplier;
82+
std::snprintf(
83+
const_cast<decltype(service_config)::value_type *>(service_config.data()),
84+
service_config.size(),
85+
kServiceConfigJson.data(),
86+
options.retry_policy_max_attempts,
87+
std::min(std::max(initial_backoff, 0.f), 999999999.f),
88+
std::min(std::max(max_backoff, 0.f), 999999999.f),
89+
std::min(std::max(backoff_multiplier, 0.f), 999999999.f));
90+
91+
grpc_arguments.SetServiceConfigJSON(service_config);
92+
3593
if (!options.use_ssl_credentials) {
3694
channel = CreateCustomChannel(options.endpoint,
3795
InsecureChannelCredentials(),
@@ -55,7 +113,19 @@ std::shared_ptr<Channel>
55113
std::unique_ptr<ClientContext>
56114
GrpcClient::MakeClientContext(const std::string& agent_id,
57115
const std::string& saas) {
116+
// Forward to the version with custom deadline using environment variable
117+
return MakeClientContext(agent_id, saas, get_grpc_deadline_seconds());
118+
}
119+
120+
/**
121+
* Create gRPC client context to call RPC with custom deadline.
122+
*/
123+
std::unique_ptr<ClientContext>
124+
GrpcClient::MakeClientContext(const std::string& agent_id,
125+
const std::string& saas,
126+
int deadline_seconds) {
58127
std::unique_ptr<ClientContext> context = std::make_unique<ClientContext>();
128+
context->set_deadline(system_clock::now() + seconds(deadline_seconds));
59129
context->AddMetadata("nsolid-agent-id", agent_id);
60130
if (!saas.empty()) {
61131
context->AddMetadata("nsolid-saas-token", saas);

agents/grpc/src/grpc_client.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,14 @@ class GrpcClient {
6868
static std::unique_ptr<::grpc::ClientContext>
6969
MakeClientContext(const std::string& agent_id, const std::string& saas);
7070

71+
/**
72+
* Create gRPC client context to call RPC with custom deadline.
73+
*/
74+
static std::unique_ptr<::grpc::ClientContext>
75+
MakeClientContext(const std::string& agent_id,
76+
const std::string& saas,
77+
int deadline_seconds);
78+
7179
/**
7280
* Create N|Solid service stub to communicate with the N|Solid Console.
7381
*/
@@ -148,6 +156,9 @@ class GrpcClient {
148156
}
149157
};
150158

159+
// Helper function to get configurable gRPC deadline
160+
int get_grpc_deadline_seconds();
161+
151162
} // namespace grpc
152163
} // namespace nsolid
153164
} // namespace node

deps/opentelemetry-cpp/otlp-http-exporter.gyp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
'BUILDING_LIBCURL',
6464
'ENABLE_ASYNC_EXPORT',
6565
'OPENTELEMETRY_STL_VERSION=2020',
66+
'ENABLE_OTLP_RETRY_PREVIEW',
6667
],
6768
'dependencies': [
6869
'../protobuf/protobuf.gyp:protobuf',
@@ -75,6 +76,7 @@
7576
'defines': [
7677
'ENABLE_ASYNC_EXPORT',
7778
'OPENTELEMETRY_STL_VERSION=2020',
79+
'ENABLE_OTLP_RETRY_PREVIEW',
7880
],
7981
'include_dirs': [
8082
'api/include',

0 commit comments

Comments
 (0)