Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,18 @@ Command to load CSV data:
java -jar [path/to]/ingest-1.1.jar --input sample_data/csv/csv_trace1.csv --output csv_gpx/ --type csv
```

#### JSON data

Ingest tool imports JSON data using the format specified in in the `sample_data/json/` sample files labeled `json_trace1.json` and `json_trace2.json`.
The file `json_trace2.json` is the same trace as `json_trace.json`, but contains a "PICKUP" and "DROPOFF" event.

A single JSON file can contain multiple traces, as long as each vehicle is uniquely identified within the file(s). The order of the records in the JSON file does not impact map matching, as records are sorted by time for each group of vehicle IDs.

Command to load JSON data:

```
java -jar [path/to]/ingest-1.1.jar --input sample_data/json/json_trace1.json --output json_gpx/ --type json
```

![Traffic Map](docs/images/pickup_event_trace.png)

Expand Down
6 changes: 3 additions & 3 deletions gradlew
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ DEFAULT_JVM_OPTS=""
# Use the maximum available, or set MAX_FD != -1 to use that value.
MAX_FD="maximum"

warn ( ) {
warn () {
echo "$*"
}

die ( ) {
die () {
echo
echo "$*"
echo
Expand Down Expand Up @@ -155,7 +155,7 @@ if $cygwin ; then
fi

# Escape application args
save ( ) {
save () {
for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done
echo " "
}
Expand Down
1 change: 1 addition & 0 deletions ingest/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ dependencies {
compile "com.google.protobuf:protobuf-java:${protobufversion}"
compile "com.jsoniter:jsoniter:0.9.15"
compile 'commons-cli:commons-cli:1.4'
compile 'com.google.code.gson:gson:2.8.5'
compile 'com.esri.geometry:esri-geometry-api:1.2.1'
compile 'joda-time:joda-time:2.9.9'

Expand Down
174 changes: 88 additions & 86 deletions ingest/src/main/java/io/sharedstreets/matcher/ingest/Ingester.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,16 @@

import io.sharedstreets.matcher.ingest.input.CsvEventExtractor;
import io.sharedstreets.matcher.ingest.input.DcfhvEventExtractor;
import io.sharedstreets.matcher.ingest.input.JsonEventExtractor;
import io.sharedstreets.matcher.ingest.input.json.JsonInputFormat;
import io.sharedstreets.matcher.ingest.input.gpx.GpxInputFormat;
import io.sharedstreets.matcher.ingest.model.Ingest;
import io.sharedstreets.matcher.ingest.model.InputEvent;
import io.sharedstreets.matcher.ingest.util.TileId;
import org.apache.commons.cli.*;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.io.FileOutputFormat;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.flink.core.fs.FileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -24,7 +22,25 @@

public class Ingester {

static Logger logger = LoggerFactory.getLogger(Ingester.class);
public enum FileType {
CSV("CSV"),
JSON("JSON"),
GPX("GPX"),
DCFHV("DCFHV");

private final String stringValue;

FileType(String csv) {
stringValue = csv;
}

@Override
public String toString() {
return stringValue;
}
}

private static Logger logger = LoggerFactory.getLogger(Ingester.class);

public static void main(String[] args) throws Exception {
// create the command line parser
Expand All @@ -33,142 +49,130 @@ public static void main(String[] args) throws Exception {
// create the Options
Options options = new Options();

options.addOption( OptionBuilder.withLongOpt( "input" )
.withDescription( "path to input files" )
options.addOption(OptionBuilder.withLongOpt("input")
.withDescription("path to input files")
.hasArg()
.withArgName("INPUT-DIR")
.create() );
.create());

options.addOption( OptionBuilder.withLongOpt( "type" )
.withDescription( "input type, supports: [CSV, JSON, GPX]" )
options.addOption(OptionBuilder.withLongOpt("type")
.withDescription("input type, supports: [CSV, JSON, GPX]")
.hasArg()
.withArgName("INPUT-DIR")
.create() );
.create());

options.addOption( OptionBuilder.withLongOpt( "output" )
.withDescription( "path to output (will be overwritten)" )
options.addOption(OptionBuilder.withLongOpt("output")
.withDescription("path to output (will be overwritten)")
.hasArg()
.withArgName("OUTPUT-DIR")
.create() );


.create());

options.addOption("speeds", "track GPS speed when available");
options.addOption("verbose", "verbose error output" );
options.addOption("verbose", "verbose error output");

String inputPath = "";

String outputPath = "";

String inputType = "";


boolean verbose = false;


boolean gpsSpeeds = false;

try {
// parse the command line arguments
CommandLine line = parser.parse( options, args );
CommandLine line = parser.parse(options, args);



if( line.hasOption( "input" ) ) {
if (line.hasOption("input")) {
// print the value of block-size
inputPath = line.getOptionValue( "input" );
inputPath = line.getOptionValue("input");
}

if( line.hasOption( "output" ) ) {
if (line.hasOption("output")) {
// print the value of block-size
outputPath = line.getOptionValue( "output" );
outputPath = line.getOptionValue("output");
}

if( line.hasOption( "speeds" ) ) {
if (line.hasOption("speeds")) {
// print the value of block-size
gpsSpeeds = true;
}

if( line.hasOption( "verbose" ) ) {
if (line.hasOption("verbose")) {
verbose = true;
}

if( line.hasOption( "type" ) ) {
if (line.hasOption("type")) {
// print the value of block-size
inputType = line.getOptionValue( "type" ).trim().toUpperCase();
}
else {
String fileParts[] = inputPath.split("\\.");
if(fileParts[fileParts.length-1].toLowerCase().equals("csv"))
inputType = "CSV";
else if(fileParts[fileParts.length-1].toLowerCase().equals("json"))
inputType = "JSON";
else if(fileParts[fileParts.length-1].toLowerCase().equals("gpx"))
inputType = "GPX";
else if(fileParts[fileParts.length-1].toLowerCase().equals("dcfhv"))
inputType = "DCFHV";
inputType = line.getOptionValue("type").trim().toUpperCase();
} else {
String[] fileParts = inputPath.split("\\.");
switch (fileParts[fileParts.length - 1].toLowerCase()) {
case "csv":
inputType = FileType.CSV.toString();
break;
case "json":
inputType = FileType.JSON.toString();
break;
case "gpx":
inputType = FileType.GPX.toString();
break;
case "dcfhv":
inputType = FileType.DCFHV.toString();
break;
}
}

}
catch( Exception exp ) {
} catch (Exception exp) {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp( "integster", options );
formatter.printHelp("integster", options);

System.out.println( "Unexpected exception:" + exp.getMessage() );
System.out.println("Unexpected exception:" + exp.getMessage());
return;
}

final String finalInputType = inputType;
final boolean finalVerbose = verbose;

// let's go...

logger.info("Starting up streams...");

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

if(inputPath == null || inputPath.trim().isEmpty()) {
if (inputPath == null || inputPath.trim().isEmpty()) {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp( "integster", options );
formatter.printHelp("integster", options);
return;
}

// process events

// step 1: read strings (blocks of location events from file)
DataSet<InputEvent> inputEvents = null;

if (finalInputType.equals("GPX")) {
inputEvents = env.createInput(new GpxInputFormat(inputPath, gpsSpeeds, finalVerbose));
}
else {
DataSet<String> inputStream = env.readTextFile(inputPath);

// open text based file formats and map strings to extractor methods
inputEvents = inputStream.flatMap(new FlatMapFunction<String, InputEvent>() {

@Override
public void flatMap(String value, Collector<InputEvent> out) throws Exception {

if (finalInputType.equals("CSV")) {
List<InputEvent> inputEvents = CsvEventExtractor.extractEvents(value, finalVerbose);

for (InputEvent inputEvent : inputEvents)
out.collect(inputEvent);

} else if (finalInputType.equals("JSON")) {
List<InputEvent> inputEvents = JsonEventExtractor.extractEvents(value, finalVerbose);

for (InputEvent inputEvent : inputEvents)
DataSet<InputEvent> inputEvents;

switch (FileType.valueOf(finalInputType)) {
case GPX:
inputEvents = env.createInput(new GpxInputFormat(inputPath, gpsSpeeds, finalVerbose));
break;
case JSON:
inputEvents = env.createInput(new JsonInputFormat(new org.apache.flink.core.fs.Path(inputPath)));
break;
default:
DataSet<String> inputStream = env.readTextFile(inputPath);

// open text based file formats and map strings to extractor methods
inputEvents = inputStream.flatMap((FlatMapFunction<String, InputEvent>) (value, out) -> {
if (finalInputType.equals(FileType.CSV.toString())) {
List<InputEvent> csvInputEvents = CsvEventExtractor.extractEvents(value, finalVerbose);

for (InputEvent inputEvent : csvInputEvents) {
out.collect(inputEvent);
} else if (finalInputType.equals("DCFHV")) {
List<InputEvent> inputEvents = DcfhvEventExtractor.extractEvents(value, finalVerbose);
}
} else if (finalInputType.equals(FileType.DCFHV.toString())) {
List<InputEvent> dcfhvInputEvents = DcfhvEventExtractor.extractEvents(value, finalVerbose);

for (InputEvent inputEvent : inputEvents)
for (InputEvent inputEvent : dcfhvInputEvents) {
out.collect(inputEvent);
}
}
}
});
});
}

// // create list of map tiles for input traces
Expand Down Expand Up @@ -202,9 +206,8 @@ public void flatMap(String value, Collector<InputEvent> out) throws Exception {
//
Path dataPath = Paths.get(outputPath, "event_data").toAbsolutePath();

if(dataPath.toFile().exists()) {
System.out.print("File already exists: " + outputPath.toString());
return;
if (dataPath.toFile().exists()) {
System.out.print("File already exists: " + outputPath + " \n...Overwriting\n");
}

// write protobuf of traces
Expand All @@ -214,9 +217,8 @@ public void writeRecord(InputEvent record) throws IOException {
Ingest.InputEventProto proto = record.toProto();
proto.writeDelimitedTo(this.stream);
}
}, dataPath.toString()).setParallelism(1);
}, dataPath.toString(), FileSystem.WriteMode.OVERWRITE).setParallelism(1);

env.execute("process");

}
}

This file was deleted.

Loading