Skip to content

Commit 0514e56

Browse files
SEKIRO-Jjon-wei
authored andcommitted
add TsvInputFormat (#8915)
* add TsvInputFormat * refactor code * fix grammar * use enum replace string literal * code refactor * code refactor * mark abstract for base class meant not to be instantiated * remove constructor for test
1 parent 7250010 commit 0514e56

File tree

19 files changed

+854
-233
lines changed

19 files changed

+854
-233
lines changed

core/src/main/java/org/apache/druid/data/input/InputFormat.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.druid.data.input.impl.NestedInputFormat;
2929
import org.apache.druid.data.input.impl.RegexInputFormat;
3030
import org.apache.druid.data.input.impl.SplittableInputSource;
31+
import org.apache.druid.data.input.impl.TsvInputFormat;
3132
import org.apache.druid.guice.annotations.UnstableApi;
3233

3334
import java.io.File;
@@ -45,7 +46,8 @@
4546
@JsonSubTypes(value = {
4647
@Type(name = "csv", value = CsvInputFormat.class),
4748
@Type(name = "json", value = JsonInputFormat.class),
48-
@Type(name = "regex", value = RegexInputFormat.class)
49+
@Type(name = "regex", value = RegexInputFormat.class),
50+
@Type(name = "tsv", value = TsvInputFormat.class)
4951
})
5052
public interface InputFormat
5153
{

core/src/main/java/org/apache/druid/data/input/impl/CSVParseSpec.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ public Parser<String, Object> makeParser()
100100
@Override
101101
public InputFormat toInputFormat()
102102
{
103-
return new CsvInputFormat(columns, listDelimiter, hasHeaderRow, skipHeaderRows);
103+
return new CsvInputFormat(columns, listDelimiter, null, hasHeaderRow, skipHeaderRows);
104104
}
105105

106106
@Override

core/src/main/java/org/apache/druid/data/input/impl/CsvInputFormat.java

Lines changed: 2 additions & 117 deletions
Original file line numberDiff line numberDiff line change
@@ -21,29 +21,12 @@
2121

2222
import com.fasterxml.jackson.annotation.JsonCreator;
2323
import com.fasterxml.jackson.annotation.JsonProperty;
24-
import com.google.common.annotations.VisibleForTesting;
25-
import com.google.common.base.Preconditions;
26-
import com.google.common.collect.ImmutableList;
27-
import org.apache.druid.data.input.InputEntity;
28-
import org.apache.druid.data.input.InputEntityReader;
29-
import org.apache.druid.data.input.InputFormat;
30-
import org.apache.druid.data.input.InputRowSchema;
31-
import org.apache.druid.indexer.Checks;
32-
import org.apache.druid.indexer.Property;
3324

3425
import javax.annotation.Nullable;
35-
import java.io.File;
36-
import java.util.Collections;
3726
import java.util.List;
38-
import java.util.Objects;
3927

40-
public class CsvInputFormat implements InputFormat
28+
public class CsvInputFormat extends SeparateValueInputFormat
4129
{
42-
private final String listDelimiter;
43-
private final List<String> columns;
44-
private final boolean findColumnsFromHeader;
45-
private final int skipHeaderRows;
46-
4730
@JsonCreator
4831
public CsvInputFormat(
4932
@JsonProperty("columns") @Nullable List<String> columns,
@@ -53,104 +36,6 @@ public CsvInputFormat(
5336
@JsonProperty("skipHeaderRows") int skipHeaderRows
5437
)
5538
{
56-
this.listDelimiter = listDelimiter;
57-
this.columns = columns == null ? Collections.emptyList() : columns;
58-
//noinspection ConstantConditions
59-
this.findColumnsFromHeader = Checks.checkOneNotNullOrEmpty(
60-
ImmutableList.of(
61-
new Property<>("hasHeaderRow", hasHeaderRow),
62-
new Property<>("findColumnsFromHeader", findColumnsFromHeader)
63-
)
64-
).getValue();
65-
this.skipHeaderRows = skipHeaderRows;
66-
67-
if (!this.columns.isEmpty()) {
68-
for (String column : this.columns) {
69-
Preconditions.checkArgument(!column.contains(","), "Column[%s] has a comma, it cannot", column);
70-
}
71-
} else {
72-
Preconditions.checkArgument(
73-
this.findColumnsFromHeader,
74-
"If columns field is not set, the first row of your data must have your header"
75-
+ " and hasHeaderRow must be set to true."
76-
);
77-
}
78-
}
79-
80-
@VisibleForTesting
81-
public CsvInputFormat(
82-
List<String> columns,
83-
String listDelimiter,
84-
boolean findColumnsFromHeader,
85-
int skipHeaderRows
86-
)
87-
{
88-
this(columns, listDelimiter, null, findColumnsFromHeader, skipHeaderRows);
89-
}
90-
91-
@JsonProperty
92-
public List<String> getColumns()
93-
{
94-
return columns;
95-
}
96-
97-
@JsonProperty
98-
public String getListDelimiter()
99-
{
100-
return listDelimiter;
101-
}
102-
103-
@JsonProperty
104-
public boolean isFindColumnsFromHeader()
105-
{
106-
return findColumnsFromHeader;
107-
}
108-
109-
@JsonProperty
110-
public int getSkipHeaderRows()
111-
{
112-
return skipHeaderRows;
113-
}
114-
115-
@Override
116-
public boolean isSplittable()
117-
{
118-
return true;
119-
}
120-
121-
@Override
122-
public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory)
123-
{
124-
return new CsvReader(
125-
inputRowSchema,
126-
source,
127-
temporaryDirectory,
128-
listDelimiter,
129-
columns,
130-
findColumnsFromHeader,
131-
skipHeaderRows
132-
);
133-
}
134-
135-
@Override
136-
public boolean equals(Object o)
137-
{
138-
if (this == o) {
139-
return true;
140-
}
141-
if (o == null || getClass() != o.getClass()) {
142-
return false;
143-
}
144-
CsvInputFormat format = (CsvInputFormat) o;
145-
return findColumnsFromHeader == format.findColumnsFromHeader &&
146-
skipHeaderRows == format.skipHeaderRows &&
147-
Objects.equals(listDelimiter, format.listDelimiter) &&
148-
Objects.equals(columns, format.columns);
149-
}
150-
151-
@Override
152-
public int hashCode()
153-
{
154-
return Objects.hash(listDelimiter, columns, findColumnsFromHeader, skipHeaderRows);
39+
super(columns, listDelimiter, hasHeaderRow, findColumnsFromHeader, skipHeaderRows, Format.CSV);
15540
}
15641
}

core/src/main/java/org/apache/druid/data/input/impl/CsvReader.java

Lines changed: 10 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -19,49 +19,15 @@
1919

2020
package org.apache.druid.data.input.impl;
2121

22-
import com.google.common.base.Function;
23-
import com.google.common.base.Preconditions;
24-
import com.google.common.base.Splitter;
25-
import com.google.common.collect.Iterables;
26-
import com.opencsv.RFC4180Parser;
27-
import com.opencsv.RFC4180ParserBuilder;
28-
import com.opencsv.enums.CSVReaderNullFieldIndicator;
29-
import org.apache.druid.common.config.NullHandling;
3022
import org.apache.druid.data.input.InputEntity;
31-
import org.apache.druid.data.input.InputRow;
3223
import org.apache.druid.data.input.InputRowSchema;
33-
import org.apache.druid.data.input.TextReader;
34-
import org.apache.druid.java.util.common.ISE;
35-
import org.apache.druid.java.util.common.collect.Utils;
36-
import org.apache.druid.java.util.common.parsers.ParseException;
37-
import org.apache.druid.java.util.common.parsers.ParserUtils;
38-
import org.apache.druid.java.util.common.parsers.Parsers;
3924

4025
import javax.annotation.Nullable;
4126
import java.io.File;
42-
import java.io.IOException;
43-
import java.util.Arrays;
44-
import java.util.Collections;
4527
import java.util.List;
46-
import java.util.Map;
4728

48-
public class CsvReader extends TextReader
29+
public class CsvReader extends SeparateValueReader
4930
{
50-
private final RFC4180Parser parser = CsvReader.createOpenCsvParser();
51-
private final boolean findColumnsFromHeader;
52-
private final int skipHeaderRows;
53-
private final Function<String, Object> multiValueFunction;
54-
@Nullable
55-
private List<String> columns;
56-
57-
public static RFC4180Parser createOpenCsvParser()
58-
{
59-
return NullHandling.replaceWithDefault()
60-
? new RFC4180Parser()
61-
: new RFC4180ParserBuilder().withFieldAsNull(
62-
CSVReaderNullFieldIndicator.EMPTY_SEPARATORS).build();
63-
}
64-
6531
CsvReader(
6632
InputRowSchema inputRowSchema,
6733
InputEntity source,
@@ -72,69 +38,15 @@ public static RFC4180Parser createOpenCsvParser()
7238
int skipHeaderRows
7339
)
7440
{
75-
super(inputRowSchema, source, temporaryDirectory);
76-
this.findColumnsFromHeader = findColumnsFromHeader;
77-
this.skipHeaderRows = skipHeaderRows;
78-
final String finalListDelimeter = listDelimiter == null ? Parsers.DEFAULT_LIST_DELIMITER : listDelimiter;
79-
this.multiValueFunction = ParserUtils.getMultiValueFunction(finalListDelimeter, Splitter.on(finalListDelimeter));
80-
this.columns = findColumnsFromHeader ? null : columns; // columns will be overriden by header row
81-
82-
if (this.columns != null) {
83-
for (String column : this.columns) {
84-
Preconditions.checkArgument(!column.contains(","), "Column[%s] has a comma, it cannot", column);
85-
}
86-
} else {
87-
Preconditions.checkArgument(
88-
findColumnsFromHeader,
89-
"If columns field is not set, the first row of your data must have your header"
90-
+ " and hasHeaderRow must be set to true."
91-
);
92-
}
93-
}
94-
95-
@Override
96-
public List<InputRow> parseInputRows(String line) throws IOException, ParseException
97-
{
98-
final Map<String, Object> zipped = parseLine(line);
99-
return Collections.singletonList(MapInputRowParser.parse(getInputRowSchema(), zipped));
100-
}
101-
102-
@Override
103-
public Map<String, Object> toMap(String intermediateRow) throws IOException
104-
{
105-
return parseLine(intermediateRow);
106-
}
107-
108-
private Map<String, Object> parseLine(String line) throws IOException
109-
{
110-
final String[] parsed = parser.parseLine(line);
111-
return Utils.zipMapPartial(
112-
Preconditions.checkNotNull(columns, "columns"),
113-
Iterables.transform(Arrays.asList(parsed), multiValueFunction)
41+
super(
42+
inputRowSchema,
43+
source,
44+
temporaryDirectory,
45+
listDelimiter,
46+
columns,
47+
findColumnsFromHeader,
48+
skipHeaderRows,
49+
SeparateValueInputFormat.Format.CSV
11450
);
11551
}
116-
117-
@Override
118-
public int getNumHeaderLinesToSkip()
119-
{
120-
return skipHeaderRows;
121-
}
122-
123-
@Override
124-
public boolean needsToProcessHeaderLine()
125-
{
126-
return findColumnsFromHeader;
127-
}
128-
129-
@Override
130-
public void processHeaderLine(String line) throws IOException
131-
{
132-
if (!findColumnsFromHeader) {
133-
throw new ISE("Don't call this if findColumnsFromHeader = false");
134-
}
135-
columns = findOrCreateColumnNames(Arrays.asList(parser.parseLine(line)));
136-
if (columns.isEmpty()) {
137-
throw new ISE("Empty columns");
138-
}
139-
}
14052
}

0 commit comments

Comments
 (0)