Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import static com.here.xyz.jobs.steps.resources.ResourcesRegistry.fromStaticLoads;
import static com.here.xyz.jobs.steps.resources.ResourcesRegistry.toStaticLoads;
import static com.here.xyz.util.Random.randomAlpha;
import static java.util.concurrent.TimeUnit.SECONDS;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
Expand Down Expand Up @@ -118,7 +119,7 @@ public class Job implements XyzSerializable {
@JsonView(Static.class)
private List<StaticLoad> calculatedResourceLoads;

private static final Async ASYNC = new Async(100, Job.class);
private static final Async ASYNC = new Async(200, Job.class);
private static final Logger logger = LogManager.getLogger();
private static final long DEFAULT_JOB_TTL = TimeUnit.DAYS.toMillis(2 * 7); //4 weeks

Expand Down Expand Up @@ -260,7 +261,7 @@ private static Future<Boolean> validateStep(Step step) {
if (step.getStatus().getState() != targetState)
step.getStatus().setState(targetState);
return isReady;
});
}, 10, SECONDS);
}

public Future<Void> start() {
Expand Down
76 changes: 47 additions & 29 deletions xyz-util/src/main/java/com/here/xyz/util/Async.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2017-2024 HERE Europe B.V.
* Copyright (C) 2017-2025 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,45 +19,63 @@

package com.here.xyz.util;

import com.here.xyz.util.service.Core;
import io.vertx.core.Future;
import io.vertx.core.WorkerExecutor;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class Async {
public final AtomicReference<WorkerExecutor> asyncWorkers = new AtomicReference<>();
private static final Logger logger = LogManager.getLogger();
private final ExecutorService exec;
private final ScheduledExecutorService timoutCheckerExec;

private String name;
private final int workerPoolSize;

//TODO: Monitor long running threads
//TODO: Monitor thread pool utilization and throw warning if usage is over 90%

public Async(int workerPoolSize, Class<?> callerClass) {
name = callerClass.getName();
this.workerPoolSize = workerPoolSize;
exec = Executors.newFixedThreadPool(workerPoolSize, new ThreadFactoryBuilder().setNameFormat("async-" + name + "-%d").build());
timoutCheckerExec = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("async-timer-" + name + "-%d")
.build());
}

private WorkerExecutor exec() {
if (asyncWorkers.get() == null) {
WorkerExecutor workers = Core.vertx.createSharedWorkerExecutor(name, workerPoolSize);
if (!asyncWorkers.compareAndSet(null, workers))
//Some other thread initialized the workers already
workers.close();
}
return asyncWorkers.get();
public <R> io.vertx.core.Future<R> run(Callable<R> task) {
return run(task, -1, null);
}

public <R> Future<R> run(ThrowingSupplier<R> task) {
return exec().executeBlocking(
promise -> {
try {
promise.complete(task.supply());
}
catch (Exception e) {
promise.fail(e);
public <R> io.vertx.core.Future<R> run(Callable<R> task, long timeout, TimeUnit unit) {
CompletableFuture<R> promise = new CompletableFuture<>();
Future future = exec.submit(() -> {
try {
promise.complete(task.call());
}
catch (Exception e) {
promise.completeExceptionally(e);
}
return null;
});

if (timeout >= 0) {
timoutCheckerExec.schedule(() -> {
if (!future.isDone()) {
if (!promise.isDone()) {
promise.completeExceptionally(new TimeoutException("Timeout of task in " + name + " after " + timeout + " "
+ unit.toString().toLowerCase()));
}
}, false);
}
future.cancel(true);
}
}, timeout, unit);
}

@FunctionalInterface
public interface ThrowingSupplier<R> {
R supply() throws Exception;
return io.vertx.core.Future.fromCompletionStage(promise);
}
}
40 changes: 34 additions & 6 deletions xyz-util/src/test/java/com/here/xyz/test/AsyncTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,23 @@
package com.here.xyz.test;

import static java.time.temporal.ChronoUnit.MILLIS;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTimeout;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

import com.here.xyz.util.Async;
import com.here.xyz.util.service.Core;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.TimeoutException;
import org.junit.jupiter.api.Test;

public class AsyncTest {
private static final Async ASYNC = new Async(5, AsyncTest.class);

static {
Core.vertx = Vertx.vertx();
}

@Test
public void runParallel() {
long waitTime = 100;
Expand All @@ -60,4 +58,34 @@ public void runParallel() {
Thread.sleep(10);
});
}

@Test
public void runWithTimeout() {
long waitTime = 100;
long startTime = System.currentTimeMillis();
Future f = ASYNC.run(() -> {
//Sleep for some time that is longer than the timeout for sure
Thread.sleep(waitTime * 2);

//Assure the task was actually interrupted and never reaches this point
throw new Exception("The thread should never reach this point. It was not interrupted by a timeout.");
}, waitTime, MILLISECONDS);

//Assure the call to ASYNC#run() was not blocking
long timeAfterStartingTask = System.currentTimeMillis();
assertTrue(timeAfterStartingTask - startTime < waitTime / 2, "The main thread was not immediately continuing after "
+ "starting the task. It seems the call to Async#run() was blocking.");

//Assure the timeout was fired at the right time
long timoutGraceTime = waitTime + waitTime / 2;
assertTimeout(Duration.of(timoutGraceTime, MILLIS), () -> {
while (!f.isComplete())
Thread.sleep(10);

//Assure the future was set to fail in the correct way
assertTrue(f.failed(), "The resulting future was not set to failed.");
assertEquals(TimeoutException.class, f.cause().getClass(), "The resulting cause of the future should be a "
+ TimeoutException.class + " but is: " + f.cause());
}, "The timeout was not firing after " + timoutGraceTime + "ms");
}
}