Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conf/default-config.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,4 @@
"operator_type": "public",
"enable_remote_config": true,
"uid_instance_id_prefix": "local-operator"
}
}
171 changes: 98 additions & 73 deletions src/main/java/com/uid2/operator/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,9 @@ public Main(Vertx vertx, JsonObject config) throws Exception {
this.clientSideTokenGenerate = config.getBoolean(Const.Config.EnableClientSideTokenGenerate, false);
this.validateServiceLinks = config.getBoolean(Const.Config.ValidateServiceLinks, false);
this.encryptedCloudFilesEnabled = config.getBoolean(Const.Config.EncryptedFiles, false);
this.shutdownHandler = new OperatorShutdownHandler(Duration.ofHours(12), Duration.ofHours(config.getInteger(Const.Config.SaltsExpiredShutdownHours, 12)), Clock.systemUTC(), new ShutdownService());
this.shutdownHandler = new OperatorShutdownHandler(Duration.ofHours(12),
Duration.ofHours(config.getInteger(Const.Config.SaltsExpiredShutdownHours, 12)), Duration.ofDays(7),
Clock.systemUTC(), new ShutdownService());
this.uidInstanceIdProvider = new UidInstanceIdProvider(config);

String coreAttestUrl = this.config.getString(Const.Config.CoreAttestUrlProp);
Expand Down Expand Up @@ -314,20 +316,19 @@ private ICloudStorage wrapCloudStorageForOptOut(ICloudStorage cloudStorage) {
if (config.getBoolean(Const.Config.OptOutS3PathCompatProp)) {
LOGGER.warn("Using S3 Path Compatibility Conversion: log -> delta, snapshot -> partition");
return new PathConversionWrapper(
cloudStorage,
in -> {
String out = in.replace("log", "delta")
.replace("snapshot", "partition");
LOGGER.debug("S3 path forward convert: " + in + " -> " + out);
return out;
},
in -> {
String out = in.replace("delta", "log")
.replace("partition", "snapshot");
LOGGER.debug("S3 path backward convert: " + in + " -> " + out);
return out;
}
);
cloudStorage,
in -> {
String out = in.replace("log", "delta")
.replace("snapshot", "partition");
LOGGER.debug("S3 path forward convert: " + in + " -> " + out);
return out;
},
in -> {
String out = in.replace("delta", "log")
.replace("partition", "snapshot");
LOGGER.debug("S3 path backward convert: " + in + " -> " + out);
return out;
});
} else {
return cloudStorage;
}
Expand All @@ -352,8 +353,10 @@ private void run() throws Exception {
fs.add(createStoreVerticles());

CompositeFuture.all(fs).onComplete(ar -> {
if (ar.failed()) compositePromise.fail(new Exception(ar.cause()));
else compositePromise.complete();
if (ar.failed())
compositePromise.fail(new Exception(ar.cause()));
else
compositePromise.complete();
});

compositePromise.future()
Expand Down Expand Up @@ -403,51 +406,59 @@ private Future<Void> createStoreVerticles() throws Exception {

if (clientSideTokenGenerate) {
fs.add(createAndDeployRotatingStoreVerticle("site", siteProvider, "site_refresh_ms"));
fs.add(createAndDeployRotatingStoreVerticle("client_side_keypairs", clientSideKeypairProvider, "client_side_keypairs_refresh_ms"));
fs.add(createAndDeployRotatingStoreVerticle("client_side_keypairs", clientSideKeypairProvider,
"client_side_keypairs_refresh_ms"));
}

if (validateServiceLinks) {
fs.add(createAndDeployRotatingStoreVerticle("service", serviceProvider, "service_refresh_ms"));
fs.add(createAndDeployRotatingStoreVerticle("service_link", serviceLinkProvider, "service_link_refresh_ms"));
fs.add(createAndDeployRotatingStoreVerticle("service_link", serviceLinkProvider,
"service_link_refresh_ms"));
}

if (encryptedCloudFilesEnabled) {
fs.add(createAndDeployRotatingStoreVerticle("cloud_encryption_keys", cloudEncryptionKeyProvider, "cloud_encryption_keys_refresh_ms"));
fs.add(createAndDeployRotatingStoreVerticle("cloud_encryption_keys", cloudEncryptionKeyProvider,
"cloud_encryption_keys_refresh_ms"));
}

if (useRemoteConfig) {
fs.add(createAndDeployRotatingStoreVerticle("runtime_config", (RuntimeConfigStore) configStore, Const.Config.ConfigScanPeriodMsProp));
fs.add(createAndDeployRotatingStoreVerticle("runtime_config", (RuntimeConfigStore) configStore,
Const.Config.ConfigScanPeriodMsProp));
}
fs.add(createAndDeployRotatingStoreVerticle("auth", clientKeyProvider, "auth_refresh_ms"));
fs.add(createAndDeployRotatingStoreVerticle("keyset", keysetProvider, "keyset_refresh_ms"));
fs.add(createAndDeployRotatingStoreVerticle("keysetkey", keysetKeyStore, "keysetkey_refresh_ms"));
fs.add(createAndDeployRotatingStoreVerticle("salt", saltProvider, "salt_refresh_ms"));
fs.add(createAndDeployCloudSyncStoreVerticle("optout", fsOptOut, optOutCloudSync));
CompositeFuture.all(fs).onComplete(ar -> {
if (ar.failed()) promise.fail(new Exception(ar.cause()));
else promise.complete();
if (ar.failed())
promise.fail(new Exception(ar.cause()));
else
promise.complete();
});


return promise.future();
}

private Future<String> createAndDeployRotatingStoreVerticle(String name, IMetadataVersionedStore store, String storeRefreshConfigMs) {
private Future<String> createAndDeployRotatingStoreVerticle(String name, IMetadataVersionedStore store,
String storeRefreshConfigMs) {
final int intervalMs = config.getInteger(storeRefreshConfigMs, 10000);

RotatingStoreVerticle rotatingStoreVerticle = new RotatingStoreVerticle(name, intervalMs, store);
return vertx.deployVerticle(rotatingStoreVerticle);
}

private Future<String> createAndDeployCloudSyncStoreVerticle(String name, ICloudStorage fsCloud,
ICloudSync cloudSync) {
ICloudSync cloudSync) {
CloudSyncVerticle cloudSyncVerticle = new CloudSyncVerticle(name, fsCloud, fsLocal, cloudSync, config);
return vertx.deployVerticle(cloudSyncVerticle)
.onComplete(v -> setupTimerEvent(cloudSyncVerticle.eventRefresh()));
.onComplete(v -> setupTimerEvent(cloudSyncVerticle.eventRefresh()));
}

private Future<String> createAndDeployStatsCollector() {
StatsCollectorVerticle statsCollectorVerticle = new StatsCollectorVerticle(60000, config.getInteger(Const.Config.MaxInvalidPaths, 50), config.getInteger(Const.Config.MaxVersionBucketsPerSite, 50));
StatsCollectorVerticle statsCollectorVerticle = new StatsCollectorVerticle(60000,
config.getInteger(Const.Config.MaxInvalidPaths, 50),
config.getInteger(Const.Config.MaxVersionBucketsPerSite, 50));
Future<String> result = vertx.deployVerticle(statsCollectorVerticle);
_statsCollectorQueue = statsCollectorVerticle;
return result;
Expand All @@ -466,31 +477,33 @@ private static Vertx createVertx() {
ObjectName objectName = new ObjectName("uid2.operator:type=jmx,name=AdminApi");
MBeanServer server = ManagementFactory.getPlatformMBeanServer();
server.registerMBean(AdminApi.instance, objectName);
} catch (InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException | MalformedObjectNameException e) {
} catch (InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException
| MalformedObjectNameException e) {
LOGGER.error("mBean initialisation failed {}", e.getMessage(), e);
System.exit(-1);
}

final int portOffset = Utils.getPortOffset();
VertxPrometheusOptions prometheusOptions = new VertxPrometheusOptions()
.setStartEmbeddedServer(true)
.setEmbeddedServerOptions(new HttpServerOptions().setPort(Const.Port.PrometheusPortForOperator + portOffset))
.setEnabled(true);
.setStartEmbeddedServer(true)
.setEmbeddedServerOptions(
new HttpServerOptions().setPort(Const.Port.PrometheusPortForOperator + portOffset))
.setEnabled(true);

MicrometerMetricsOptions metricOptions = new MicrometerMetricsOptions()
.setPrometheusOptions(prometheusOptions)
.setLabels(EnumSet.of(Label.HTTP_METHOD, Label.HTTP_CODE, Label.HTTP_PATH))
.setJvmMetricsEnabled(true)
.setEnabled(true);
.setPrometheusOptions(prometheusOptions)
.setLabels(EnumSet.of(Label.HTTP_METHOD, Label.HTTP_CODE, Label.HTTP_PATH))
.setJvmMetricsEnabled(true)
.setEnabled(true);
setupMetrics(metricOptions);

final int threadBlockedCheckInterval = Utils.isProductionEnvironment()
? 60 * 1000
: 3600 * 1000;
? 60 * 1000
: 3600 * 1000;

VertxOptions vertxOptions = new VertxOptions()
.setMetricsOptions(metricOptions)
.setBlockedThreadCheckInterval(threadBlockedCheckInterval);
.setMetricsOptions(metricOptions)
.setBlockedThreadCheckInterval(threadBlockedCheckInterval);

return Vertx.vertx(vertxOptions);
}
Expand All @@ -505,32 +518,35 @@ private static void setupMetrics(MicrometerMetricsOptions metricOptions) {

// see also https://micrometer.io/docs/registry/prometheus
prometheusRegistry.config()
// providing common renaming for prometheus metric, e.g. "hello.world" to "hello_world"
.meterFilter(new PrometheusRenameFilter())
.meterFilter(MeterFilter.replaceTagValues(Label.HTTP_PATH.toString(),
actualPath -> HTTPPathMetricFilter.filterPath(actualPath, Endpoints.pathSet())))
// Don't record metrics for 404s.
.meterFilter(MeterFilter.deny(id ->
id.getName().startsWith(MetricsDomain.HTTP_SERVER.getPrefix()) &&
Objects.equals(id.getTag(Label.HTTP_CODE.toString()), "404")))
.meterFilter(new MeterFilter() {
private final String httpServerResponseTime = MetricsDomain.HTTP_SERVER.getPrefix() + MetricsNaming.v4Names().getHttpResponseTime();

@Override
public DistributionStatisticConfig configure(Meter.Id id, DistributionStatisticConfig config) {
if (id.getName().equals(httpServerResponseTime)) {
return DistributionStatisticConfig.builder()
.percentiles(0.90, 0.95, 0.99)
.build()
.merge(config);
// providing common renaming for prometheus metric, e.g. "hello.world" to
// "hello_world"
.meterFilter(new PrometheusRenameFilter())
.meterFilter(MeterFilter.replaceTagValues(Label.HTTP_PATH.toString(),
actualPath -> HTTPPathMetricFilter.filterPath(actualPath, Endpoints.pathSet())))
// Don't record metrics for 404s.
.meterFilter(
MeterFilter.deny(id -> id.getName().startsWith(MetricsDomain.HTTP_SERVER.getPrefix()) &&
Objects.equals(id.getTag(Label.HTTP_CODE.toString()), "404")))
.meterFilter(new MeterFilter() {
private final String httpServerResponseTime = MetricsDomain.HTTP_SERVER.getPrefix()
+ MetricsNaming.v4Names().getHttpResponseTime();

@Override
public DistributionStatisticConfig configure(Meter.Id id, DistributionStatisticConfig config) {
if (id.getName().equals(httpServerResponseTime)) {
return DistributionStatisticConfig.builder()
.percentiles(0.90, 0.95, 0.99)
.build()
.merge(config);
}
return config;
}
return config;
}
})
// adding common labels
.commonTags("application", "uid2-operator");
})
// adding common labels
.commonTags("application", "uid2-operator");

// wire my monitoring system to global static state, see also https://micrometer.io/docs/concepts
// wire my monitoring system to global static state, see also
// https://micrometer.io/docs/concepts
Metrics.addRegistry(prometheusRegistry);
}

Expand All @@ -555,14 +571,19 @@ private void createVertxEventLoopsMetric() {
.register(Metrics.globalRegistry);
}

private Map.Entry<UidCoreClient, UidOptOutClient> createUidClients(Vertx vertx, String attestationUrl, String clientApiToken, Handler<Pair<AttestationResponseCode, String>> responseWatcher) throws Exception {
AttestationResponseHandler attestationResponseHandler = getAttestationTokenRetriever(vertx, attestationUrl, clientApiToken, responseWatcher);
UidCoreClient coreClient = new UidCoreClient(clientApiToken, CloudUtils.defaultProxy, attestationResponseHandler, this.encryptedCloudFilesEnabled, this.uidInstanceIdProvider);
UidOptOutClient optOutClient = new UidOptOutClient(clientApiToken, CloudUtils.defaultProxy, attestationResponseHandler, this.uidInstanceIdProvider);
private Map.Entry<UidCoreClient, UidOptOutClient> createUidClients(Vertx vertx, String attestationUrl,
String clientApiToken, Handler<Pair<AttestationResponseCode, String>> responseWatcher) throws Exception {
AttestationResponseHandler attestationResponseHandler = getAttestationTokenRetriever(vertx, attestationUrl,
clientApiToken, responseWatcher);
UidCoreClient coreClient = new UidCoreClient(clientApiToken, CloudUtils.defaultProxy,
attestationResponseHandler, this.encryptedCloudFilesEnabled, this.uidInstanceIdProvider);
UidOptOutClient optOutClient = new UidOptOutClient(clientApiToken, CloudUtils.defaultProxy,
attestationResponseHandler, this.uidInstanceIdProvider);
return new AbstractMap.SimpleEntry<>(coreClient, optOutClient);
}

private AttestationResponseHandler getAttestationTokenRetriever(Vertx vertx, String attestationUrl, String clientApiToken, Handler<Pair<AttestationResponseCode, String>> responseWatcher) throws Exception {
private AttestationResponseHandler getAttestationTokenRetriever(Vertx vertx, String attestationUrl,
String clientApiToken, Handler<Pair<AttestationResponseCode, String>> responseWatcher) throws Exception {
String enclavePlatform = this.config.getString(Const.Config.EnclavePlatformProp);
String operatorType = this.config.getString(Const.Config.OperatorTypeProp, "");

Expand All @@ -587,14 +608,17 @@ private AttestationResponseHandler getAttestationTokenRetriever(Vertx vertx, Str
break;
case "azure-cc":
LOGGER.info("creating uid core client with azure cc attestation protocol");
String maaServerBaseUrl = this.config.getString(Const.Config.MaaServerBaseUrlProp, "https://sharedeus.eus.attest.azure.net");
String maaServerBaseUrl = this.config.getString(Const.Config.MaaServerBaseUrlProp,
"https://sharedeus.eus.attest.azure.net");
attestationProvider = AttestationFactory.getAzureCCAttestation(maaServerBaseUrl);
break;
default:
throw new IllegalArgumentException(String.format("enclave_platform is providing the wrong value: %s", enclavePlatform));
throw new IllegalArgumentException(
String.format("enclave_platform is providing the wrong value: %s", enclavePlatform));
}

return new AttestationResponseHandler(vertx, attestationUrl, clientApiToken, operatorType, this.appVersion, attestationProvider, responseWatcher, CloudUtils.defaultProxy, this.uidInstanceIdProvider);
return new AttestationResponseHandler(vertx, attestationUrl, clientApiToken, operatorType, this.appVersion,
attestationProvider, responseWatcher, CloudUtils.defaultProxy, this.uidInstanceIdProvider);
}

private IOperatorKeyRetriever createOperatorKeyRetriever() throws Exception {
Expand All @@ -617,7 +641,8 @@ private IOperatorKeyRetriever createOperatorKeyRetriever() throws Exception {
return OperatorKeyRetrieverFactory.getGcpOperatorKeyRetriever(secretVersionName);
}
default: {
throw new IllegalArgumentException(String.format("enclave_platform is providing the wrong value: %s", enclavePlatform));
throw new IllegalArgumentException(
String.format("enclave_platform is providing the wrong value: %s", enclavePlatform));
}
}
}
Expand Down
Loading
Loading