Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import java.text.ParseException;
import java.util.concurrent.ForkJoinPool;

public class AccessibilityEstimator extends R5Process {
public class AccessibilityEstimator extends R5DataFrameProcess {

private DecayFunction decayFunction;

Expand Down Expand Up @@ -48,7 +48,7 @@ public AccessibilityEstimator(ForkJoinPool threadPool, RoutingProperties routing

@Override
protected RDataFrame runProcess(int index) throws ParseException {
RegionalTask request = buildRequest(index);
RegionalTask request = buildRegionalTask(index);

TravelTimeComputer computer = new R5TravelTimeComputer(request, transportNetwork);
OneOriginResult travelTimeResults = computer.computeTravelTimes();
Expand Down Expand Up @@ -107,8 +107,8 @@ protected RDataFrame buildDataFrameStructure(String fromId, int nRows) {
}

@Override
protected RegionalTask buildRequest(int index) throws ParseException {
RegionalTask request = super.buildRequest(index);
protected RegionalTask buildRegionalTask(int index) throws ParseException {
RegionalTask request = super.buildRegionalTask(index);

request.destinationPointSetKeys = this.opportunities;
request.destinationPointSets = this.destinationPoints;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import java.text.ParseException;
import java.util.concurrent.ForkJoinPool;

public class DetailedItineraryPlanner extends R5Process {
public class DetailedItineraryPlanner extends R5DataFrameProcess {

private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(DetailedItineraryPlanner.class);

Expand Down Expand Up @@ -56,7 +56,7 @@ protected RDataFrame buildDataFrameStructure(String fromId, int nRows) {

@Override
public RDataFrame runProcess(int index) throws ParseException {
RegionalTask request = buildRequest(index);
RegionalTask request = buildRegionalTask(index);

ProfileResponse response = runQuery(index, request);
if (response == null) return null;
Expand Down Expand Up @@ -100,8 +100,8 @@ private ProfileResponse runQuery(int index, RegionalTask request) {
}

@Override
protected RegionalTask buildRequest(int index) throws ParseException {
RegionalTask request = super.buildRequest(index);
protected RegionalTask buildRegionalTask(int index) throws ParseException {
RegionalTask request = super.buildRegionalTask(index);

request.toLat = toLats[index];
request.toLon = toLons[index];
Expand Down
4 changes: 2 additions & 2 deletions java-r5rcore/src/org/ipea/r5r/Process/FaretoDebug.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
/**
* This outputs the Pareto itinerary planner results directly to JSON.
*/
public class FaretoDebug extends R5Process {
public class FaretoDebug extends R5DataFrameProcess {
public FaretoDebug(ForkJoinPool threadPool,
RoutingProperties routingProperties) {
super(threadPool, routingProperties);
Expand All @@ -28,7 +28,7 @@ protected boolean isOneToOne() {

@Override
protected RDataFrame runProcess(int index) throws ParseException {
RegionalTask request = buildRequest(index);
RegionalTask request = buildRegionalTask(index);
request.fromLat = fromLats[0];
request.fromLon = fromLons[0];
request.toLat = toLats[0];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicInteger;

public class FastDetailedItineraryPlanner extends R5Process {
public class FastDetailedItineraryPlanner extends R5DataFrameProcess {

private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(FastDetailedItineraryPlanner.class);

Expand Down Expand Up @@ -43,7 +43,7 @@ public void dropItineraryGeometry() {

@Override
protected RDataFrame runProcess(int index) throws ParseException {
RegionalTask request = buildRequest(index);
RegionalTask request = buildRegionalTask(index);

TripPlanner computer = new TripPlanner(transportNetwork, request);
computer.setOD(fromIds[index], toIds[index]);
Expand Down Expand Up @@ -158,8 +158,8 @@ protected RDataFrame buildDataFrameStructure(String fromId, int nRows) {
}

@Override
protected RegionalTask buildRequest(int index) throws ParseException {
RegionalTask request = super.buildRequest(index);
protected RegionalTask buildRegionalTask(int index) throws ParseException {
RegionalTask request = super.buildRegionalTask(index);

request.toLat = toLats[index];
request.toLon = toLons[index];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import java.util.Map;
import java.util.concurrent.ForkJoinPool;

public class ParetoFrontierCalculator extends R5Process {
public class ParetoFrontierCalculator extends R5DataFrameProcess {

@Override
protected boolean isOneToOne() {
Expand All @@ -26,7 +26,7 @@ public ParetoFrontierCalculator(ForkJoinPool threadPool, RoutingProperties routi

@Override
protected RDataFrame runProcess(int index) throws ParseException {
RegionalTask request = buildRequest(index);
RegionalTask request = buildRegionalTask(index);
TravelTimeComputer computer = new R5TravelTimeComputer(request, transportNetwork);

Map<Float, OneOriginResult> travelTimeResults = new HashMap<>();
Expand Down Expand Up @@ -95,8 +95,8 @@ private void populateDataFrame(Map<Float, OneOriginResult> travelTimeResults, RD
}

@Override
protected RegionalTask buildRequest(int index) throws ParseException {
RegionalTask request = super.buildRequest(index);
protected RegionalTask buildRegionalTask(int index) throws ParseException {
RegionalTask request = super.buildRegionalTask(index);

request.destinationPointSetKeys = this.opportunities;
request.destinationPointSets = destinationPoints;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicInteger;

public class ParetoItineraryPlanner extends R5Process {
public class ParetoItineraryPlanner extends R5DataFrameProcess {

public static boolean travelAllowanceActive = true;

Expand All @@ -32,7 +32,7 @@ protected void buildDestinationPointSet() {

@Override
protected RDataFrame runProcess(int index) throws ParseException {
RegionalTask request = buildRequest(index);
RegionalTask request = buildRegionalTask(index);

R5ParetoServer computer = new R5ParetoServer(request, transportNetwork);
R5ParetoServer.ParetoReturn travelTimeResults = computer.handle();
Expand Down Expand Up @@ -142,8 +142,8 @@ protected RDataFrame buildDataFrameStructure(String fromId, int nRows) {
}

@Override
protected RegionalTask buildRequest(int index) throws ParseException {
RegionalTask request = super.buildRequest(index);
protected RegionalTask buildRegionalTask(int index) throws ParseException {
RegionalTask request = super.buildRegionalTask(index);

request.toLat = toLats[index];
request.toLon = toLons[index];
Expand Down
78 changes: 78 additions & 0 deletions java-r5rcore/src/org/ipea/r5r/Process/R5DataFrameProcess.jav
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package org.ipea.r5r.Process;

import static org.ipea.r5r.Process.R5Process.LOG;

import java.text.ParseException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;

import org.ipea.r5r.RDataFrame;
import org.ipea.r5r.RoutingProperties;
import org.ipea.r5r.Utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.conveyal.r5.transit.TransportNetwork;

public abstract class R5DataFrameProcess extends R5Process<RDataFrame> {
private static final Logger LOG = LoggerFactory.getLogger(R5DataFrameProcess.class);

public R5DataFrameProcess(ForkJoinPool threadPool, TransportNetwork transportNetwork, RoutingProperties routingProperties) {
super(threadPool, transportNetwork, routingProperties);
}

protected RDataFrame mergeResults(List<RDataFrame> processResults) {
LOG.info("Consolidating results...");

int nRows;
nRows = processResults.stream()
.mapToInt(RDataFrame::nRow)
.sum();

RDataFrame mergedDataFrame = buildDataFrameStructure("", nRows);
if (Utils.benchmark) {
mergedDataFrame.addLongColumn("execution_time", 0L);
}

mergedDataFrame.getDataFrame().keySet().stream().parallel().forEach(
key -> {
ArrayList<Object> destinationArray = mergedDataFrame.getDataFrame().get(key);
processResults.forEach(
dataFrame -> {
ArrayList<Object> originArray = dataFrame.getDataFrame().get(key);
destinationArray.addAll(originArray);

originArray.clear();
});
}
);
mergedDataFrame.updateRowCount();

LOG.info(" DONE!");

return mergedDataFrame;
}

protected abstract RDataFrame buildDataFrameStructure(String fromId, int nRows);

@Override
public RDataFrame run() throws ExecutionException, InterruptedException {
buildDestinationPointSet();
int[] requestIndices = IntStream.range(0, nOrigins).toArray();
AtomicInteger totalProcessed = new AtomicInteger(1);

List<RDataFrame> processResults = r5rThreadPool.submit(() ->
Arrays.stream(requestIndices).parallel().
mapToObj(index -> tryRunProcess(totalProcessed, index)).
filter(Objects::nonNull).
collect(Collectors.toList())).get();

LOG.info(".. DONE!");

RDataFrame results = mergeResults(processResults);

return results;
}

}
106 changes: 106 additions & 0 deletions java-r5rcore/src/org/ipea/r5r/Process/R5DataFrameProcess.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package org.ipea.r5r.Process;

import org.ipea.r5r.RDataFrame;
import org.ipea.r5r.RoutingProperties;
import org.ipea.r5r.Utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicInteger;

import static java.lang.Math.max;

public abstract class R5DataFrameProcess extends R5Process<RDataFrame, RDataFrame> {

public R5DataFrameProcess(ForkJoinPool threadPool, RoutingProperties routingProperties) {
super(threadPool, routingProperties);
}

private static final Logger LOG = LoggerFactory.getLogger(R5DataFrameProcess.class);

protected abstract boolean isOneToOne();

@Override
protected RDataFrame mergeResults (List<RDataFrame> processResults) {
LOG.info("Consolidating results...");

int nRows;
nRows = processResults.stream()
.mapToInt(RDataFrame::nRow)
.sum();

RDataFrame mergedDataFrame = buildDataFrameStructure("", nRows);
if (Utils.benchmark) {
mergedDataFrame.addLongColumn("execution_time", 0L);
}

mergedDataFrame.getDataFrame().keySet().stream().parallel().forEach(
key -> {
ArrayList<Object> destinationArray = mergedDataFrame.getDataFrame().get(key);
processResults.forEach(
dataFrame -> {
ArrayList<Object> originArray = dataFrame.getDataFrame().get(key);
destinationArray.addAll(originArray);

originArray.clear();
});
}
);
mergedDataFrame.updateRowCount();

LOG.info(" DONE!");

return mergedDataFrame;
}

protected abstract RDataFrame buildDataFrameStructure(String fromId, int nRows);

// we override this here so we can handle benchmarking and saving output to CSV
@Override
protected RDataFrame tryRunProcess(AtomicInteger totalProcessed, int index) {
RDataFrame results = null;
try {
long start = System.currentTimeMillis();
results = runProcess(index);
long duration = max(System.currentTimeMillis() - start, 0L);

if (results != null & Utils.benchmark) {
results.addLongColumn("execution_time", duration);
}

if (Utils.saveOutputToCsv & results != null) {
String filename = getCsvFilename(index);
results.saveToCsv(filename);
results.clear();
}

int nProcessed = totalProcessed.getAndIncrement();
if (nProcessed % 1000 == 1 || (nProcessed == nOrigins)) {
LOG.info("{} out of {} origins processed.", nProcessed, nOrigins);
}
} catch (Exception e) {
e.printStackTrace();
// re-throw as unchecked so we get an error on the R side
throw new RuntimeException();
}

return Utils.saveOutputToCsv ? null : results;
}

private String getCsvFilename(int index) {
String filename;
if (this.isOneToOne()) {
// one-to-one functions, such as detailed itineraries
// save one file per origin-destination pair
filename = Utils.outputCsvFolder + "/from_" + fromIds[index] + "_to_" + toIds[index] + ".csv";
} else {
// one-to-many functions, such as travel time matrix
// save one file per origin
filename = Utils.outputCsvFolder + "/from_" + fromIds[index] + ".csv";
}
return filename;
}
}
Loading