Skip to content

Commit 60e7889

Browse files
committed
agents: implement gRPC retry policies
Enable them in opentelemetry-cpp by defining `ENABLE_OTLP_RETRY_PREVIEW` and also them to our GrpcClient. The values we're using with a default deadline of 10 seconds are: - max_attemps: 5 - initial_backoff: 500ms - max_backoff: 5s - backoff_multiplier: 2 Add NSOLID_GRPC_DEADLINE env var for configurable gRPC timeouts. Specially useful for testing. Refactor and dry code when creating grpc exporters inside the new GrpcAgent::configure_grpc_exporters() helper function. Create parse_env_var_int() utility in nsolid_util.h for safe env parsing.
1 parent 6ed7616 commit 60e7889

File tree

10 files changed

+229
-104
lines changed

10 files changed

+229
-104
lines changed

agents/grpc/src/asset_stream.cc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "asset_stream.h"
2+
#include "grpc_client.h"
23

34
#include <cinttypes>
45

@@ -8,6 +9,8 @@
89

910
using grpc::Status;
1011
using grpcagent::NSolidService;
12+
using std::chrono::system_clock;
13+
using std::chrono::seconds;
1114

1215
namespace node {
1316
namespace nsolid {
@@ -27,6 +30,10 @@ AssetStream::AssetStream(
2730
context_.AddMetadata("nsolid-saas-token", saas);
2831
}
2932

33+
// Use configurable deadline for asset streams
34+
context_.set_deadline(system_clock::now() +
35+
seconds(get_grpc_deadline_seconds()));
36+
3037
// Call the appropriate RPC method based on the rpc_type parameter
3138
if (rpc_type == EXPORT_CONTINUOUS_PROFILE) {
3239
stub->async()->ExportContinuousProfile(&context_, &event_response_, this);

agents/grpc/src/grpc_agent.cc

Lines changed: 79 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "grpc_agent.h"
2+
#include "../../src/nsolid/nsolid_util.h"
23

34
#include "asserts-cpp/asserts.h"
45
#include "nsolid/nsolid_api.h"
@@ -992,7 +993,81 @@ void GrpcAgent::check_exit_on_profile() {
992993
}
993994
}
994995

995-
int GrpcAgent::config(const json& config) {
996+
void GrpcAgent::configure_grpc_exporters(const std::string& endpoint,
997+
bool insecure) {
998+
Debug("GrpcAgent configured. Endpoint: %s. Insecure: %d\n",
999+
endpoint.c_str(), static_cast<unsigned>(insecure));
1000+
1001+
OtlpGrpcClientOptions opts;
1002+
opts.compression = "gzip";
1003+
opts.endpoint = endpoint;
1004+
opts.metadata = {{"nsolid-agent-id", agent_id_},
1005+
{"nsolid-saas", saas()}};
1006+
// Make sure the client is initialized. We set it to the same
1007+
// default value as max_concurrent_requests as the exporters.
1008+
opts.max_concurrent_requests = 64;
1009+
opts.use_ssl_credentials = !insecure;
1010+
if (!insecure) {
1011+
if (!custom_certs_.empty()) {
1012+
opts.ssl_credentials_cacert_as_string = custom_certs_;
1013+
} else {
1014+
opts.ssl_credentials_cacert_as_string = cacert_;
1015+
}
1016+
}
1017+
1018+
opts.retry_policy_max_attempts = 5;
1019+
opts.retry_policy_initial_backoff = std::chrono::milliseconds(500);
1020+
opts.retry_policy_max_backoff = std::chrono::seconds(5);
1021+
opts.retry_policy_backoff_multiplier = 2.0f;
1022+
1023+
nsolid_service_stub_ = GrpcClient::MakeNSolidServiceStub(opts);
1024+
// CommandStream needs to be created before the OTLP client to avoid
1025+
// a race condition with abseil mutexes.
1026+
reset_command_stream();
1027+
1028+
std::shared_ptr<OtlpGrpcClient> client =
1029+
OtlpGrpcClientFactory::Create(opts);
1030+
1031+
auto set_common_options = [&](auto& options) {
1032+
options.compression = "gzip";
1033+
options.endpoint = endpoint;
1034+
options.metadata = {{"nsolid-agent-id", agent_id_},
1035+
{"nsolid-saas", saas()}};
1036+
if (!insecure) {
1037+
options.use_ssl_credentials = true;
1038+
if (!custom_certs_.empty()) {
1039+
options.ssl_credentials_cacert_as_string = custom_certs_;
1040+
} else {
1041+
options.ssl_credentials_cacert_as_string = cacert_;
1042+
}
1043+
}
1044+
1045+
options.retry_policy_max_attempts = 5;
1046+
options.retry_policy_initial_backoff = std::chrono::milliseconds(500);
1047+
options.retry_policy_max_backoff = std::chrono::seconds(5);
1048+
options.retry_policy_backoff_multiplier = 2.0f;
1049+
};
1050+
1051+
{
1052+
OtlpGrpcExporterOptions options;
1053+
set_common_options(options);
1054+
trace_exporter_ = std::make_unique<OtlpGrpcExporter>(options, client);
1055+
}
1056+
{
1057+
OtlpGrpcMetricExporterOptions options;
1058+
set_common_options(options);
1059+
metrics_exporter_ =
1060+
std::make_unique<OtlpGrpcMetricExporter>(options, client);
1061+
}
1062+
{
1063+
OtlpGrpcLogRecordExporterOptions options;
1064+
set_common_options(options);
1065+
log_exporter_ =
1066+
std::make_unique<OtlpGrpcLogRecordExporter>(options, client);
1067+
}
1068+
}
1069+
1070+
int GrpcAgent::config(const nlohmann::json& config) {
9961071
int ret = 0;
9971072
json old_config = config_;
9981073
config_ = config;
@@ -1020,93 +1095,13 @@ int GrpcAgent::config(const json& config) {
10201095
// Only parse the insecure flag in non SaaS mode.
10211096
if (insecure_str.has_value() && (!saas_ || saas_->testing)) {
10221097
// insecure = std::stoull(insecure_str.value());
1023-
insecure = std::stoi(insecure_str.value());
1098+
insecure = utils::parse_env_var_int(kNSOLID_GRPC_INSECURE, 0, 0, 1);
10241099
}
10251100

10261101
const std::string& endpoint = !saas_ ?
10271102
it->get<std::string>() :
10281103
saas_->endpoint;
1029-
Debug("GrpcAgent configured. Endpoint: %s. Insecure: %d\n",
1030-
endpoint.c_str(), static_cast<unsigned>(insecure));
1031-
1032-
OtlpGrpcClientOptions opts;
1033-
opts.compression = "gzip";
1034-
opts.endpoint = endpoint;
1035-
opts.metadata = {{"nsolid-agent-id", agent_id_},
1036-
{"nsolid-saas", saas()}};
1037-
// Make sure the client is initialized. We set it to the same
1038-
// default value as ax_concurrent_requests as the exporters.
1039-
opts.max_concurrent_requests = 64;
1040-
opts.use_ssl_credentials = !insecure;
1041-
if (!insecure) {
1042-
if (!custom_certs_.empty()) {
1043-
opts.ssl_credentials_cacert_as_string = custom_certs_;
1044-
} else {
1045-
opts.ssl_credentials_cacert_as_string = cacert_;
1046-
}
1047-
}
1048-
1049-
nsolid_service_stub_ = GrpcClient::MakeNSolidServiceStub(opts);
1050-
// CommandStream needs to be created before the OTLP client to avoid
1051-
// a race condition with abseil mutexes.
1052-
reset_command_stream();
1053-
1054-
std::shared_ptr<OtlpGrpcClient> client =
1055-
OtlpGrpcClientFactory::Create(opts);
1056-
1057-
{
1058-
OtlpGrpcExporterOptions options;
1059-
options.compression = "gzip";
1060-
options.endpoint = endpoint;
1061-
options.metadata = {{"nsolid-agent-id", agent_id_},
1062-
{"nsolid-saas", saas()}};
1063-
if (!insecure) {
1064-
options.use_ssl_credentials = true;
1065-
if (!custom_certs_.empty()) {
1066-
options.ssl_credentials_cacert_as_string = custom_certs_;
1067-
} else {
1068-
options.ssl_credentials_cacert_as_string = cacert_;
1069-
}
1070-
}
1071-
1072-
trace_exporter_ = std::make_unique<OtlpGrpcExporter>(options, client);
1073-
}
1074-
{
1075-
OtlpGrpcMetricExporterOptions options;
1076-
options.compression = "gzip";
1077-
options.endpoint = endpoint;
1078-
options.metadata = {{"nsolid-agent-id", agent_id_},
1079-
{"nsolid-saas", saas()}};
1080-
if (!insecure) {
1081-
options.use_ssl_credentials = true;
1082-
if (!custom_certs_.empty()) {
1083-
options.ssl_credentials_cacert_as_string = custom_certs_;
1084-
} else {
1085-
options.ssl_credentials_cacert_as_string = cacert_;
1086-
}
1087-
}
1088-
1089-
metrics_exporter_ =
1090-
std::make_unique<OtlpGrpcMetricExporter>(options, client);
1091-
}
1092-
{
1093-
OtlpGrpcLogRecordExporterOptions options;
1094-
options.compression = "gzip";
1095-
options.endpoint = endpoint;
1096-
options.metadata = {{"nsolid-agent-id", agent_id_},
1097-
{"nsolid-saas", saas()}};
1098-
if (!insecure) {
1099-
options.use_ssl_credentials = true;
1100-
if (!custom_certs_.empty()) {
1101-
options.ssl_credentials_cacert_as_string = custom_certs_;
1102-
} else {
1103-
options.ssl_credentials_cacert_as_string = cacert_;
1104-
}
1105-
}
1106-
1107-
log_exporter_ =
1108-
std::make_unique<OtlpGrpcLogRecordExporter>(options, client);
1109-
}
1104+
configure_grpc_exporters(endpoint, insecure);
11101105
}
11111106
}
11121107

@@ -1804,7 +1799,7 @@ void GrpcAgent::send_exit() {
18041799
exit_body->set_profile(cpu_profile_state.last_main_profile);
18051800
}
18061801

1807-
auto context = GrpcClient::MakeClientContext(agent_id_, saas());
1802+
auto context = GrpcClient::MakeClientContext(agent_id_, saas(), 1); // 1 second for fast exit
18081803
uv_cond_t cond;
18091804
uv_mutex_t lock;
18101805
bool signaled = false;

agents/grpc/src/grpc_agent.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,8 @@ class GrpcAgent: public std::enable_shared_from_this<GrpcAgent>,
202202

203203
void command_stream_closed(const ::grpc::Status& s);
204204

205+
void configure_grpc_exporters(const std::string& endpoint, bool insecure);
206+
205207
int config(const nlohmann::json& config);
206208

207209
void do_start();

agents/grpc/src/grpc_client.cc

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22
#include "debug_utils-inl.h"
33
#include "opentelemetry/exporters/otlp/otlp_grpc_client_options.h"
44

5+
#include <cerrno>
6+
#include <cstdlib>
7+
#include <limits>
8+
59
using grpc::Channel;
610
using grpc::ChannelArguments;
711
using grpc::ClientContext;
@@ -11,11 +15,23 @@ using grpc::SslCredentials;
1115
using grpc::SslCredentialsOptions;
1216
using grpcagent::NSolidService;
1317
using opentelemetry::v1::exporter::otlp::OtlpGrpcClientOptions;
18+
using std::chrono::system_clock;
19+
using std::chrono::seconds;
1420

1521
namespace node {
1622
namespace nsolid {
1723
namespace grpc {
1824

25+
const char* const kNSOLID_GRPC_DEADLINE = "NSOLID_GRPC_DEADLINE";
26+
27+
int get_grpc_deadline_seconds() {
28+
static int cached_deadline = -1;
29+
if (cached_deadline == -1) {
30+
cached_deadline = parse_env_var_int(kNSOLID_GRPC_DEADLINE, 10, 1, INT_MAX);
31+
}
32+
return cached_deadline;
33+
}
34+
1935
/**
2036
* Create gRPC channel.
2137
*/
@@ -32,6 +48,48 @@ std::shared_ptr<Channel>
3248
grpc_arguments.SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 15 * 1000 /* 15 sec*/);
3349
grpc_arguments.SetInt(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1);
3450
grpc_arguments.SetInt(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA, 0);
51+
52+
53+
static const auto kServiceConfigJson = std::string_view{R"(
54+
{
55+
"methodConfig": [
56+
{
57+
"name": [{}],
58+
"retryPolicy": {
59+
"maxAttempts": %0000000000u,
60+
"initialBackoff": "%0000000000.1fs",
61+
"maxBackoff": "%0000000000.1fs",
62+
"backoffMultiplier": %0000000000.1f,
63+
"retryableStatusCodes": [
64+
"CANCELLED",
65+
"DEADLINE_EXCEEDED",
66+
"ABORTED",
67+
"OUT_OF_RANGE",
68+
"DATA_LOSS",
69+
"UNAVAILABLE"
70+
]
71+
}
72+
}
73+
]
74+
})"};
75+
76+
// Allocate string with buffer large enough to hold the formatted json config
77+
auto service_config = std::string(kServiceConfigJson.size(), '\0');
78+
// Prior to C++17, need to explicitly cast away constness from `data()` buffer
79+
float initial_backoff = options.retry_policy_initial_backoff.count();
80+
float max_backoff = options.retry_policy_max_backoff.count();
81+
float backoff_multiplier = options.retry_policy_backoff_multiplier;
82+
std::snprintf(
83+
const_cast<decltype(service_config)::value_type *>(service_config.data()),
84+
service_config.size(),
85+
kServiceConfigJson.data(),
86+
options.retry_policy_max_attempts,
87+
std::min(std::max(initial_backoff, 0.f), 999999999.f),
88+
std::min(std::max(max_backoff, 0.f), 999999999.f),
89+
std::min(std::max(backoff_multiplier, 0.f), 999999999.f));
90+
91+
grpc_arguments.SetServiceConfigJSON(service_config);
92+
3593
if (!options.use_ssl_credentials) {
3694
channel = CreateCustomChannel(options.endpoint,
3795
InsecureChannelCredentials(),
@@ -55,7 +113,19 @@ std::shared_ptr<Channel>
55113
std::unique_ptr<ClientContext>
56114
GrpcClient::MakeClientContext(const std::string& agent_id,
57115
const std::string& saas) {
116+
// Forward to the version with custom deadline using environment variable
117+
return MakeClientContext(agent_id, saas, get_grpc_deadline_seconds());
118+
}
119+
120+
/**
121+
* Create gRPC client context to call RPC with custom deadline.
122+
*/
123+
std::unique_ptr<ClientContext>
124+
GrpcClient::MakeClientContext(const std::string& agent_id,
125+
const std::string& saas,
126+
int deadline_seconds) {
58127
std::unique_ptr<ClientContext> context = std::make_unique<ClientContext>();
128+
context->set_deadline(system_clock::now() + seconds(deadline_seconds));
59129
context->AddMetadata("nsolid-agent-id", agent_id);
60130
if (!saas.empty()) {
61131
context->AddMetadata("nsolid-saas-token", saas);

agents/grpc/src/grpc_client.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,14 @@ class GrpcClient {
6868
static std::unique_ptr<::grpc::ClientContext>
6969
MakeClientContext(const std::string& agent_id, const std::string& saas);
7070

71+
/**
72+
* Create gRPC client context to call RPC with custom deadline.
73+
*/
74+
static std::unique_ptr<::grpc::ClientContext>
75+
MakeClientContext(const std::string& agent_id,
76+
const std::string& saas,
77+
int deadline_seconds);
78+
7179
/**
7280
* Create N|Solid service stub to communicate with the N|Solid Console.
7381
*/
@@ -148,6 +156,9 @@ class GrpcClient {
148156
}
149157
};
150158

159+
// Helper function to get configurable gRPC deadline
160+
int get_grpc_deadline_seconds();
161+
151162
} // namespace grpc
152163
} // namespace nsolid
153164
} // namespace node

deps/opentelemetry-cpp/otlp-http-exporter.gyp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
'BUILDING_LIBCURL',
6464
'ENABLE_ASYNC_EXPORT',
6565
'OPENTELEMETRY_STL_VERSION=2020',
66+
'ENABLE_OTLP_RETRY_PREVIEW',
6667
],
6768
'dependencies': [
6869
'../protobuf/protobuf.gyp:protobuf',
@@ -75,6 +76,7 @@
7576
'defines': [
7677
'ENABLE_ASYNC_EXPORT',
7778
'OPENTELEMETRY_STL_VERSION=2020',
79+
'ENABLE_OTLP_RETRY_PREVIEW',
7880
],
7981
'include_dirs': [
8082
'api/include',

0 commit comments

Comments
 (0)