Skip to content

Commit 5187eb1

Browse files
authored
GH-3263: Add DictionaryPage.decode to allow dictionary reuse in the ColumnReaderBase ctor (#3264)
1 parent 6ac9b29 commit 5187eb1

File tree

3 files changed

+76
-22
lines changed

3 files changed

+76
-22
lines changed

parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderBase.java

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -453,17 +453,9 @@ void writeValue() {
453453
this.writerVersion = writerVersion;
454454
this.maxDefinitionLevel = path.getMaxDefinitionLevel();
455455
DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
456-
if (dictionaryPage != null) {
457-
try {
458-
this.dictionary = dictionaryPage.getEncoding().initDictionary(path, dictionaryPage);
459-
if (converter.hasDictionarySupport()) {
460-
converter.setDictionary(dictionary);
461-
}
462-
} catch (IOException e) {
463-
throw new ParquetDecodingException("could not decode the dictionary for " + path, e);
464-
}
465-
} else {
466-
this.dictionary = null;
456+
this.dictionary = dictionaryPage == null ? null : dictionaryPage.decode(path);
457+
if (dictionary != null && converter.hasDictionarySupport()) {
458+
converter.setDictionary(dictionary);
467459
}
468460
this.totalValueCount = pageReader.getTotalValueCount();
469461
if (totalValueCount <= 0) {

parquet-column/src/main/java/org/apache/parquet/column/page/DictionaryPage.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,10 @@
2121
import java.io.IOException;
2222
import java.util.Objects;
2323
import org.apache.parquet.bytes.BytesInput;
24+
import org.apache.parquet.column.ColumnDescriptor;
25+
import org.apache.parquet.column.Dictionary;
2426
import org.apache.parquet.column.Encoding;
27+
import org.apache.parquet.io.ParquetDecodingException;
2528

2629
/**
2730
* Data for a dictionary page
@@ -74,6 +77,17 @@ public DictionaryPage copy() throws IOException {
7477
return new DictionaryPage(BytesInput.copy(bytes), getUncompressedSize(), dictionarySize, encoding);
7578
}
7679

80+
/**
81+
* @return the decoded dictionary
82+
*/
83+
public Dictionary decode(ColumnDescriptor path) {
84+
try {
85+
return getEncoding().initDictionary(path, this);
86+
} catch (IOException e) {
87+
throw new ParquetDecodingException("could not decode the dictionary for " + path, e);
88+
}
89+
}
90+
7791
@Override
7892
public String toString() {
7993
return "Page [bytes.size=" + bytes.size() + ", entryCount=" + dictionarySize + ", uncompressedSize="

parquet-column/src/test/java/org/apache/parquet/column/impl/TestColumnReaderImpl.java

Lines changed: 59 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,21 @@
2424
import java.util.List;
2525
import org.apache.parquet.Version;
2626
import org.apache.parquet.VersionParser;
27+
import org.apache.parquet.bytes.BytesInput;
2728
import org.apache.parquet.column.ColumnDescriptor;
2829
import org.apache.parquet.column.ColumnReader;
30+
import org.apache.parquet.column.Dictionary;
2931
import org.apache.parquet.column.ParquetProperties;
3032
import org.apache.parquet.column.page.DataPage;
3133
import org.apache.parquet.column.page.DataPageV2;
34+
import org.apache.parquet.column.page.DictionaryPage;
3235
import org.apache.parquet.column.page.mem.MemPageReader;
3336
import org.apache.parquet.column.page.mem.MemPageWriter;
3437
import org.apache.parquet.io.api.Binary;
3538
import org.apache.parquet.io.api.PrimitiveConverter;
3639
import org.apache.parquet.schema.MessageType;
3740
import org.apache.parquet.schema.MessageTypeParser;
41+
import org.junit.Assert;
3842
import org.junit.Test;
3943

4044
public class TestColumnReaderImpl {
@@ -53,8 +57,28 @@ public void addBinary(Binary value) {
5357

5458
@Test
5559
public void test() throws Exception {
60+
ColumnDescriptor col = requiredBinaryColumn();
61+
MemPageWriter pageWriter = writeBinaryDictColumn(col);
62+
List<DataPage> pages = pageWriter.getPages();
63+
int valueCount = 0;
64+
int rowCount = 0;
65+
for (DataPage dataPage : pages) {
66+
valueCount += dataPage.getValueCount();
67+
rowCount += ((DataPageV2) dataPage).getRowCount();
68+
}
69+
assertEquals(rows, rowCount);
70+
assertEquals(rows, valueCount);
71+
MemPageReader pageReader = toReader(pageWriter);
72+
validateExpectedValuesAndCount(col, pageReader);
73+
}
74+
75+
private static ColumnDescriptor requiredBinaryColumn() {
5676
MessageType schema = MessageTypeParser.parseMessageType("message test { required binary foo; }");
5777
ColumnDescriptor col = schema.getColumns().get(0);
78+
return col;
79+
}
80+
81+
private MemPageWriter writeBinaryDictColumn(ColumnDescriptor col) {
5882
MemPageWriter pageWriter = new MemPageWriter();
5983
ColumnWriterV2 columnWriterV2 = new ColumnWriterV2(
6084
col,
@@ -72,16 +96,15 @@ public void test() throws Exception {
7296
}
7397
columnWriterV2.writePage();
7498
columnWriterV2.finalizeColumnChunk();
75-
List<DataPage> pages = pageWriter.getPages();
76-
int valueCount = 0;
77-
int rowCount = 0;
78-
for (DataPage dataPage : pages) {
79-
valueCount += dataPage.getValueCount();
80-
rowCount += ((DataPageV2) dataPage).getRowCount();
81-
}
82-
assertEquals(rows, rowCount);
83-
assertEquals(rows, valueCount);
84-
MemPageReader pageReader = new MemPageReader(rows, pages.iterator(), pageWriter.getDictionaryPage());
99+
return pageWriter;
100+
}
101+
102+
private MemPageReader toReader(MemPageWriter pageWriter) {
103+
return new MemPageReader(rows, pageWriter.getPages().iterator(), pageWriter.getDictionaryPage());
104+
}
105+
106+
private void validateExpectedValuesAndCount(ColumnDescriptor col, MemPageReader pageReader)
107+
throws VersionParser.VersionParseException {
85108
ValidatingConverter converter = new ValidatingConverter();
86109
ColumnReader columnReader =
87110
new ColumnReaderImpl(col, pageReader, converter, VersionParser.parse(Version.FULL_VERSION));
@@ -124,7 +147,7 @@ public void testOptional() throws Exception {
124147
}
125148
assertEquals(rows, rowCount);
126149
assertEquals(rows, valueCount);
127-
MemPageReader pageReader = new MemPageReader(rows, pages.iterator(), pageWriter.getDictionaryPage());
150+
MemPageReader pageReader = toReader(pageWriter);
128151
ValidatingConverter converter = new ValidatingConverter();
129152
ColumnReader columnReader =
130153
new ColumnReaderImpl(col, pageReader, converter, VersionParser.parse(Version.FULL_VERSION));
@@ -135,4 +158,29 @@ public void testOptional() throws Exception {
135158
}
136159
assertEquals(0, converter.count);
137160
}
161+
162+
@Test
163+
public void testDeduplicatedDecodedDictionary() throws Exception {
164+
ColumnDescriptor col = requiredBinaryColumn();
165+
MemPageWriter pageWriter = writeBinaryDictColumn(col);
166+
167+
DictionaryPage dictionaryPage = pageWriter.getDictionaryPage();
168+
Assert.assertNotNull("Expected a dictionary", dictionaryPage);
169+
170+
Dictionary dict = dictionaryPage.decode(col);
171+
172+
// construct a page reader from a dictionary page that lacks bytes but stores the decoded data.
173+
MemPageReader pageReader = new MemPageReader(
174+
rows,
175+
pageWriter.getPages().iterator(),
176+
new DictionaryPage(
177+
BytesInput.empty(), dictionaryPage.getDictionarySize(), dictionaryPage.getEncoding()) {
178+
@Override
179+
public Dictionary decode(ColumnDescriptor path) {
180+
return dict;
181+
}
182+
});
183+
184+
validateExpectedValuesAndCount(col, pageReader);
185+
}
138186
}

0 commit comments

Comments
 (0)