Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions agents/grpc/src/asset_stream.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "asset_stream.h"
#include "grpc_client.h"

#include <cinttypes>

Expand All @@ -8,6 +9,8 @@

using grpc::Status;
using grpcagent::NSolidService;
using std::chrono::system_clock;
using std::chrono::seconds;

namespace node {
namespace nsolid {
Expand All @@ -27,6 +30,10 @@ AssetStream::AssetStream(
context_.AddMetadata("nsolid-saas-token", saas);
}

// Use configurable deadline for asset streams
context_.set_deadline(system_clock::now() +
seconds(get_grpc_deadline_seconds()));

// Call the appropriate RPC method based on the rpc_type parameter
if (rpc_type == EXPORT_CONTINUOUS_PROFILE) {
stub->async()->ExportContinuousProfile(&context_, &event_response_, this);
Expand Down
169 changes: 85 additions & 84 deletions agents/grpc/src/grpc_agent.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "grpc_agent.h"
#include "../../src/nsolid/nsolid_util.h"

#include "asserts-cpp/asserts.h"
#include "nsolid/nsolid_api.h"
Expand Down Expand Up @@ -62,6 +63,11 @@ using ThreadMetricsMap = std::map<uint64_t, ThreadMetrics::MetricsStor>;

constexpr uint64_t span_timer_interval = 1000;
constexpr size_t span_msg_q_min_size = 1000;
// At class or file scope
constexpr size_t retry_max_attempts = 5;
constexpr auto retry_initial_backoff = std::chrono::milliseconds(500);
constexpr auto retry_max_backoff = std::chrono::seconds(5);
constexpr float retry_backoff_multiplier = 2.0f;

const char* const kNSOLID_GRPC_INSECURE = "NSOLID_GRPC_INSECURE";
const char* const kNSOLID_GRPC_CERTS = "NSOLID_GRPC_CERTS";
Expand Down Expand Up @@ -992,7 +998,81 @@ void GrpcAgent::check_exit_on_profile() {
}
}

int GrpcAgent::config(const json& config) {
void GrpcAgent::configure_grpc_exporters(const std::string& endpoint,
bool insecure) {
Debug("GrpcAgent configured. Endpoint: %s. Insecure: %d\n",
endpoint.c_str(), static_cast<unsigned>(insecure));

OtlpGrpcClientOptions opts;
opts.compression = "gzip";
opts.endpoint = endpoint;
opts.metadata = {{"nsolid-agent-id", agent_id_},
{"nsolid-saas", saas()}};
// Make sure the client is initialized. We set it to the same
// default value as max_concurrent_requests as the exporters.
opts.max_concurrent_requests = 64;
opts.use_ssl_credentials = !insecure;
if (!insecure) {
if (!custom_certs_.empty()) {
opts.ssl_credentials_cacert_as_string = custom_certs_;
} else {
opts.ssl_credentials_cacert_as_string = cacert_;
}
}

opts.retry_policy_max_attempts = retry_max_attempts;
opts.retry_policy_initial_backoff = retry_initial_backoff;
opts.retry_policy_max_backoff = retry_max_backoff;
opts.retry_policy_backoff_multiplier = retry_backoff_multiplier;

nsolid_service_stub_ = GrpcClient::MakeNSolidServiceStub(opts);
// CommandStream needs to be created before the OTLP client to avoid
// a race condition with abseil mutexes.
reset_command_stream();

std::shared_ptr<OtlpGrpcClient> client =
OtlpGrpcClientFactory::Create(opts);

auto set_common_options = [&](auto& options) {
options.compression = "gzip";
options.endpoint = endpoint;
options.metadata = {{"nsolid-agent-id", agent_id_},
{"nsolid-saas", saas()}};
if (!insecure) {
options.use_ssl_credentials = true;
if (!custom_certs_.empty()) {
options.ssl_credentials_cacert_as_string = custom_certs_;
} else {
options.ssl_credentials_cacert_as_string = cacert_;
}
}

options.retry_policy_max_attempts = retry_max_attempts;
options.retry_policy_initial_backoff = retry_initial_backoff;
options.retry_policy_max_backoff = retry_max_backoff;
options.retry_policy_backoff_multiplier = retry_backoff_multiplier;
};

{
OtlpGrpcExporterOptions options;
set_common_options(options);
trace_exporter_ = std::make_unique<OtlpGrpcExporter>(options, client);
}
{
OtlpGrpcMetricExporterOptions options;
set_common_options(options);
metrics_exporter_ =
std::make_unique<OtlpGrpcMetricExporter>(options, client);
}
{
OtlpGrpcLogRecordExporterOptions options;
set_common_options(options);
log_exporter_ =
std::make_unique<OtlpGrpcLogRecordExporter>(options, client);
}
}

int GrpcAgent::config(const nlohmann::json& config) {
int ret = 0;
json old_config = config_;
config_ = config;
Expand Down Expand Up @@ -1020,93 +1100,13 @@ int GrpcAgent::config(const json& config) {
// Only parse the insecure flag in non SaaS mode.
if (insecure_str.has_value() && (!saas_ || saas_->testing)) {
// insecure = std::stoull(insecure_str.value());
insecure = std::stoi(insecure_str.value());
insecure = utils::parse_env_var_int(kNSOLID_GRPC_INSECURE, 0, 0, 1);
}

const std::string& endpoint = !saas_ ?
it->get<std::string>() :
saas_->endpoint;
Debug("GrpcAgent configured. Endpoint: %s. Insecure: %d\n",
endpoint.c_str(), static_cast<unsigned>(insecure));

OtlpGrpcClientOptions opts;
opts.compression = "gzip";
opts.endpoint = endpoint;
opts.metadata = {{"nsolid-agent-id", agent_id_},
{"nsolid-saas", saas()}};
// Make sure the client is initialized. We set it to the same
// default value as ax_concurrent_requests as the exporters.
opts.max_concurrent_requests = 64;
opts.use_ssl_credentials = !insecure;
if (!insecure) {
if (!custom_certs_.empty()) {
opts.ssl_credentials_cacert_as_string = custom_certs_;
} else {
opts.ssl_credentials_cacert_as_string = cacert_;
}
}

nsolid_service_stub_ = GrpcClient::MakeNSolidServiceStub(opts);
// CommandStream needs to be created before the OTLP client to avoid
// a race condition with abseil mutexes.
reset_command_stream();

std::shared_ptr<OtlpGrpcClient> client =
OtlpGrpcClientFactory::Create(opts);

{
OtlpGrpcExporterOptions options;
options.compression = "gzip";
options.endpoint = endpoint;
options.metadata = {{"nsolid-agent-id", agent_id_},
{"nsolid-saas", saas()}};
if (!insecure) {
options.use_ssl_credentials = true;
if (!custom_certs_.empty()) {
options.ssl_credentials_cacert_as_string = custom_certs_;
} else {
options.ssl_credentials_cacert_as_string = cacert_;
}
}

trace_exporter_ = std::make_unique<OtlpGrpcExporter>(options, client);
}
{
OtlpGrpcMetricExporterOptions options;
options.compression = "gzip";
options.endpoint = endpoint;
options.metadata = {{"nsolid-agent-id", agent_id_},
{"nsolid-saas", saas()}};
if (!insecure) {
options.use_ssl_credentials = true;
if (!custom_certs_.empty()) {
options.ssl_credentials_cacert_as_string = custom_certs_;
} else {
options.ssl_credentials_cacert_as_string = cacert_;
}
}

metrics_exporter_ =
std::make_unique<OtlpGrpcMetricExporter>(options, client);
}
{
OtlpGrpcLogRecordExporterOptions options;
options.compression = "gzip";
options.endpoint = endpoint;
options.metadata = {{"nsolid-agent-id", agent_id_},
{"nsolid-saas", saas()}};
if (!insecure) {
options.use_ssl_credentials = true;
if (!custom_certs_.empty()) {
options.ssl_credentials_cacert_as_string = custom_certs_;
} else {
options.ssl_credentials_cacert_as_string = cacert_;
}
}

log_exporter_ =
std::make_unique<OtlpGrpcLogRecordExporter>(options, client);
}
configure_grpc_exporters(endpoint, insecure);
}
}

Expand Down Expand Up @@ -1804,7 +1804,8 @@ void GrpcAgent::send_exit() {
exit_body->set_profile(cpu_profile_state.last_main_profile);
}

auto context = GrpcClient::MakeClientContext(agent_id_, saas());
// 1 second for fast exit
auto context = GrpcClient::MakeClientContext(agent_id_, saas(), 1);
uv_cond_t cond;
uv_mutex_t lock;
bool signaled = false;
Expand Down
2 changes: 2 additions & 0 deletions agents/grpc/src/grpc_agent.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,8 @@ class GrpcAgent: public std::enable_shared_from_this<GrpcAgent>,

void command_stream_closed(const ::grpc::Status& s);

void configure_grpc_exporters(const std::string& endpoint, bool insecure);

int config(const nlohmann::json& config);

void do_start();
Expand Down
68 changes: 68 additions & 0 deletions agents/grpc/src/grpc_client.cc
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "grpc_client.h"
#include "debug_utils-inl.h"
#include "nsolid/nsolid_util.h"
#include "opentelemetry/exporters/otlp/otlp_grpc_client_options.h"

using grpc::Channel;
Expand All @@ -11,11 +12,24 @@ using grpc::SslCredentials;
using grpc::SslCredentialsOptions;
using grpcagent::NSolidService;
using opentelemetry::v1::exporter::otlp::OtlpGrpcClientOptions;
using std::chrono::system_clock;
using std::chrono::seconds;

namespace node {
namespace nsolid {

using utils::parse_env_var_int;

namespace grpc {

const char* const kNSOLID_GRPC_DEADLINE = "NSOLID_GRPC_DEADLINE";

int get_grpc_deadline_seconds() {
static const int cached_deadline =
parse_env_var_int(kNSOLID_GRPC_DEADLINE, 10, 1, INT_MAX);
return cached_deadline;
}

/**
* Create gRPC channel.
*/
Expand All @@ -32,6 +46,48 @@ std::shared_ptr<Channel>
grpc_arguments.SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 15 * 1000 /* 15 sec*/);
grpc_arguments.SetInt(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1);
grpc_arguments.SetInt(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA, 0);


static const auto kServiceConfigJson = std::string_view{R"(
{
"methodConfig": [
{
"name": [{}],
"retryPolicy": {
"maxAttempts": %0000000000u,
"initialBackoff": "%0000000000.1fs",
"maxBackoff": "%0000000000.1fs",
"backoffMultiplier": %0000000000.1f,
"retryableStatusCodes": [
"CANCELLED",
"DEADLINE_EXCEEDED",
"ABORTED",
"OUT_OF_RANGE",
"DATA_LOSS",
"UNAVAILABLE"
]
}
}
]
})"};

// Allocate string with buffer large enough to hold the formatted json config
auto service_config = std::string(kServiceConfigJson.size(), '\0');
// Prior to C++17, need to explicitly cast away constness from `data()` buffer
float initial_backoff = options.retry_policy_initial_backoff.count();
float max_backoff = options.retry_policy_max_backoff.count();
float backoff_multiplier = options.retry_policy_backoff_multiplier;
std::snprintf(
const_cast<decltype(service_config)::value_type *>(service_config.data()),
service_config.size(),
kServiceConfigJson.data(),
options.retry_policy_max_attempts,
std::min(std::max(initial_backoff, 0.f), 999999999.f),
std::min(std::max(max_backoff, 0.f), 999999999.f),
std::min(std::max(backoff_multiplier, 0.f), 999999999.f));

grpc_arguments.SetServiceConfigJSON(service_config);

if (!options.use_ssl_credentials) {
channel = CreateCustomChannel(options.endpoint,
InsecureChannelCredentials(),
Expand All @@ -55,7 +111,19 @@ std::shared_ptr<Channel>
std::unique_ptr<ClientContext>
GrpcClient::MakeClientContext(const std::string& agent_id,
const std::string& saas) {
// Forward to the version with custom deadline using environment variable
return MakeClientContext(agent_id, saas, get_grpc_deadline_seconds());
}

/**
* Create gRPC client context to call RPC with custom deadline.
*/
std::unique_ptr<ClientContext>
GrpcClient::MakeClientContext(const std::string& agent_id,
const std::string& saas,
int deadline_seconds) {
std::unique_ptr<ClientContext> context = std::make_unique<ClientContext>();
context->set_deadline(system_clock::now() + seconds(deadline_seconds));
context->AddMetadata("nsolid-agent-id", agent_id);
if (!saas.empty()) {
context->AddMetadata("nsolid-saas-token", saas);
Expand Down
11 changes: 11 additions & 0 deletions agents/grpc/src/grpc_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,14 @@ class GrpcClient {
static std::unique_ptr<::grpc::ClientContext>
MakeClientContext(const std::string& agent_id, const std::string& saas);

/**
* Create gRPC client context to call RPC with custom deadline.
*/
static std::unique_ptr<::grpc::ClientContext>
MakeClientContext(const std::string& agent_id,
const std::string& saas,
int deadline_seconds);

/**
* Create N|Solid service stub to communicate with the N|Solid Console.
*/
Expand Down Expand Up @@ -148,6 +156,9 @@ class GrpcClient {
}
};

// Helper function to get configurable gRPC deadline
int get_grpc_deadline_seconds();

} // namespace grpc
} // namespace nsolid
} // namespace node
Expand Down
2 changes: 2 additions & 0 deletions deps/opentelemetry-cpp/otlp-http-exporter.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
'BUILDING_LIBCURL',
'ENABLE_ASYNC_EXPORT',
'OPENTELEMETRY_STL_VERSION=2020',
'ENABLE_OTLP_RETRY_PREVIEW',
],
'dependencies': [
'../protobuf/protobuf.gyp:protobuf',
Expand All @@ -75,6 +76,7 @@
'defines': [
'ENABLE_ASYNC_EXPORT',
'OPENTELEMETRY_STL_VERSION=2020',
'ENABLE_OTLP_RETRY_PREVIEW',
],
'include_dirs': [
'api/include',
Expand Down
Loading
Loading