Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,6 @@ public void copyAsField(String name, ${name}Writer writer) {

</#list></#list>

public void copyAsValue(StructWriter writer, ExtensionTypeWriterFactory writerFactory) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is okay to remove a public method because there has been no release yet?

Maybe we should discuss it on the mailing list as it seems we haven't found the right pattern yet

fail("CopyAsValue StructWriter");
}

public void read(ExtensionHolder holder) {
fail("Extension");
}
Expand Down Expand Up @@ -147,4 +143,5 @@ public int size() {
private void fail(String name) {
throw new IllegalArgumentException(String.format("You tried to read a [%s] type when you are using a field reader of type [%s].", name, this.getClass().getSimpleName()));
}

}
5 changes: 2 additions & 3 deletions vector/src/main/codegen/templates/AbstractFieldWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,14 @@ public void endEntry() {
throw new IllegalStateException(String.format("You tried to end a map entry when you are using a ValueWriter of type %s.", this.getClass().getSimpleName()));
}

@Override
public void write(ExtensionHolder var1) {
this.fail("ExtensionType");
}
@Override
public void writeExtension(Object var1) {
this.fail("ExtensionType");
}
public void addExtensionTypeWriterFactory(ExtensionTypeWriterFactory var1) {
this.fail("ExtensionType");
}

<#list vv.types as type><#list type.minor as minor><#assign name = minor.class?cap_first />
<#assign fields = minor.fields!type.fields />
Expand Down
4 changes: 4 additions & 0 deletions vector/src/main/codegen/templates/ArrowType.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@

import org.apache.arrow.flatbuf.Type;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.complex.writer.FieldWriter;
import org.apache.arrow.vector.types.*;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.ValueVector;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
Expand Down Expand Up @@ -331,6 +333,8 @@ public boolean equals(Object obj) {
public <T> T accept(ArrowTypeVisitor<T> visitor) {
return visitor.visit(this);
}

public abstract FieldWriter getNewFieldWriter(ValueVector vector);
}

private static final int defaultDecimalBitWidth = 128;
Expand Down
3 changes: 0 additions & 3 deletions vector/src/main/codegen/templates/BaseReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ public interface RepeatedStructReader extends StructReader{
boolean next();
int size();
void copyAsValue(StructWriter writer);
void copyAsValue(StructWriter writer, ExtensionTypeWriterFactory writerFactory);
}

public interface ListReader extends BaseReader{
Expand All @@ -60,7 +59,6 @@ public interface RepeatedListReader extends ListReader{
boolean next();
int size();
void copyAsValue(ListWriter writer);
void copyAsValue(ListWriter writer, ExtensionTypeWriterFactory writerFactory);
}

public interface MapReader extends BaseReader{
Expand All @@ -71,7 +69,6 @@ public interface RepeatedMapReader extends MapReader{
boolean next();
int size();
void copyAsValue(MapWriter writer);
void copyAsValue(MapWriter writer, ExtensionTypeWriterFactory writerFactory);
}

public interface ScalarReader extends
Expand Down
7 changes: 0 additions & 7 deletions vector/src/main/codegen/templates/BaseWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,6 @@ public interface ExtensionWriter extends BaseWriter {
* @param value the extension type value to write
*/
void writeExtension(Object value);

/**
* Adds the given extension type factory. This factory allows configuring writer implementations for specific ExtensionTypeVector.
*
* @param factory the extension type factory to add
*/
void addExtensionTypeWriterFactory(ExtensionTypeWriterFactory factory);
}

public interface ScalarWriter extends
Expand Down
21 changes: 5 additions & 16 deletions vector/src/main/codegen/templates/ComplexCopier.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,8 @@ public class ComplexCopier {
* @param input field to read from
* @param output field to write to
*/
public static void copy(FieldReader input, FieldWriter output) {
writeValue(input, output, null);
}

public static void copy(FieldReader input, FieldWriter output, ExtensionTypeWriterFactory extensionTypeWriterFactory) {
writeValue(input, output, extensionTypeWriterFactory);
}
public static void copy(FieldReader reader, FieldWriter writer) {

private static void writeValue(FieldReader reader, FieldWriter writer, ExtensionTypeWriterFactory extensionTypeWriterFactory) {
final MinorType mt = reader.getMinorType();

switch (mt) {
Expand All @@ -65,7 +58,7 @@ private static void writeValue(FieldReader reader, FieldWriter writer, Extension
FieldReader childReader = reader.reader();
FieldWriter childWriter = getListWriterForReader(childReader, writer);
if (childReader.isSet()) {
writeValue(childReader, childWriter, extensionTypeWriterFactory);
copy(childReader, childWriter);
} else {
childWriter.writeNull();
}
Expand All @@ -83,8 +76,8 @@ private static void writeValue(FieldReader reader, FieldWriter writer, Extension
FieldReader structReader = reader.reader();
if (structReader.isSet()) {
writer.startEntry();
writeValue(mapReader.key(), getMapWriterForReader(mapReader.key(), writer.key()), extensionTypeWriterFactory);
writeValue(mapReader.value(), getMapWriterForReader(mapReader.value(), writer.value()), extensionTypeWriterFactory);
copy(mapReader.key(), getMapWriterForReader(mapReader.key(), writer.key()));
copy(mapReader.value(), getMapWriterForReader(mapReader.value(), writer.value()));
writer.endEntry();
} else {
writer.writeNull();
Expand All @@ -103,7 +96,7 @@ private static void writeValue(FieldReader reader, FieldWriter writer, Extension
if (childReader.getMinorType() != Types.MinorType.NULL) {
FieldWriter childWriter = getStructWriterForReader(childReader, writer, name);
if (childReader.isSet()) {
writeValue(childReader, childWriter, extensionTypeWriterFactory);
copy(childReader, childWriter);
} else {
childWriter.writeNull();
}
Expand All @@ -115,13 +108,9 @@ private static void writeValue(FieldReader reader, FieldWriter writer, Extension
}
break;
case EXTENSIONTYPE:
if (extensionTypeWriterFactory == null) {
throw new IllegalArgumentException("Must provide ExtensionTypeWriterFactory");
}
if (reader.isSet()) {
Object value = reader.readObject();
if (value != null) {
writer.addExtensionTypeWriterFactory(extensionTypeWriterFactory);
writer.writeExtension(value);
}
} else {
Expand Down
1 change: 0 additions & 1 deletion vector/src/main/codegen/templates/NullReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ public void read(int arrayIndex, Nullable${name}Holder holder){
}
</#list></#list>

public void copyAsValue(StructWriter writer, ExtensionTypeWriterFactory writerFactory){}
public void read(ExtensionHolder holder) {
holder.isSet = 0;
}
Expand Down
24 changes: 17 additions & 7 deletions vector/src/main/codegen/templates/PromotableWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ protected void setWriter(ValueVector v) {
writer = new UnionWriter((UnionVector) vector, nullableStructWriterFactory);
break;
case EXTENSIONTYPE:
writer = new UnionExtensionWriter((ExtensionTypeVector) vector);
writer = ((ExtensionType) vector.getField().getType()).getNewFieldWriter(vector);
break;
default:
writer = type.getNewFieldWriter(vector);
Expand Down Expand Up @@ -325,6 +325,9 @@ protected boolean requiresArrowType(MinorType type) {

@Override
protected FieldWriter getWriter(MinorType type, ArrowType arrowType) {
if(type == MinorType.EXTENSIONTYPE) {
lastExtensionType = arrowType;
}
if (state == State.UNION) {
if (requiresArrowType(type)) {
((UnionWriter) writer).getWriter(type, arrowType);
Expand Down Expand Up @@ -540,18 +543,25 @@ public void writeLargeVarChar(String value) {
getWriter(MinorType.LARGEVARCHAR).writeLargeVarChar(value);
}

protected ArrowType lastExtensionType;

@Override
public void writeExtension(Object value) {
getWriter(MinorType.EXTENSIONTYPE).writeExtension(value);
FieldWriter writer = getWriter(MinorType.EXTENSIONTYPE, lastExtensionType);
if(writer instanceof UnionWriter) {
((UnionWriter) writer).writeExtension(value, lastExtensionType);
} else {
writer.writeExtension(value);
}
}

@Override
public void addExtensionTypeWriterFactory(ExtensionTypeWriterFactory factory) {
getWriter(MinorType.EXTENSIONTYPE).addExtensionTypeWriterFactory(factory);
public void writeExtension(Object value, ArrowType arrowType) {
getWriter(MinorType.EXTENSIONTYPE, arrowType).writeExtension(value);
}

public void addExtensionTypeWriterFactory(ExtensionTypeWriterFactory factory, ArrowType arrowType) {
getWriter(MinorType.EXTENSIONTYPE, arrowType).addExtensionTypeWriterFactory(factory);
@Override
public void write(ExtensionHolder holder) {
getWriter(MinorType.EXTENSIONTYPE, lastExtensionType).write(holder);
}

@Override
Expand Down
13 changes: 4 additions & 9 deletions vector/src/main/codegen/templates/UnionListWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -204,13 +204,13 @@ public MapWriter map(String name, boolean keysSorted) {

@Override
public ExtensionWriter extension(ArrowType arrowType) {
this.extensionType = arrowType;
extensionType = arrowType;
return this;
}

@Override
public ExtensionWriter extension(String name, ArrowType arrowType) {
ExtensionWriter extensionWriter = writer.extension(name, arrowType);
return extensionWriter;
return writer.extension(name, arrowType);
}

<#if listName == "LargeList">
Expand Down Expand Up @@ -337,15 +337,10 @@ public void writeNull() {

@Override
public void writeExtension(Object value) {
writer.writeExtension(value);
writer.writeExtension(value, extensionType);
writer.setPosition(writer.idx() + 1);
}

@Override
public void addExtensionTypeWriterFactory(ExtensionTypeWriterFactory var1) {
writer.addExtensionTypeWriterFactory(var1, extensionType);
}

public void write(ExtensionHolder var1) {
writer.write(var1);
writer.setPosition(writer.idx() + 1);
Expand Down
23 changes: 23 additions & 0 deletions vector/src/main/codegen/templates/UnionReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ public void read(int index, UnionHolder holder) {
}

private FieldReader getReaderForIndex(int index) {
return getReaderForIndex(index, null);
}

private FieldReader getReaderForIndex(int index, ArrowType type) {
int typeValue = data.getTypeValue(index);
FieldReader reader = (FieldReader) readers[typeValue];
if (reader != null) {
Expand All @@ -105,11 +109,26 @@ private FieldReader getReaderForIndex(int index) {
</#if>
</#list>
</#list>
case EXTENSIONTYPE:
if(type == null) {
throw new RuntimeException("Cannot get Extension reader without an ArrowType");
}
return (FieldReader) getExtension(type);
default:
throw new UnsupportedOperationException("Unsupported type: " + MinorType.values()[typeValue]);
}
}

private ExtensionReader extensionReader;

private ExtensionReader getExtension(ArrowType type) {
if (extensionReader == null) {
extensionReader = data.getExtension(type).getReader();
extensionReader.setPosition(idx());
}
return extensionReader;
}

private SingleStructReaderImpl structReader;

private StructReader getStruct() {
Expand Down Expand Up @@ -240,4 +259,8 @@ public FieldReader reader() {
public boolean next() {
return getReaderForIndex(idx()).next();
}

public void read(ExtensionHolder holder){
getReaderForIndex(idx(), holder.type()).read(holder);
}
}
18 changes: 18 additions & 0 deletions vector/src/main/codegen/templates/UnionVector.java
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,22 @@ public MapVector getMap(String name, ArrowType arrowType) {
return mapVector;
}

private ExtensionTypeVector extensionVector;

public ExtensionTypeVector getExtension(ArrowType arrowType) {
if (extensionVector == null) {
int vectorCount = internalStruct.size();
extensionVector = addOrGet(null, MinorType.EXTENSIONTYPE, arrowType, ExtensionTypeVector.class);
if (internalStruct.size() > vectorCount) {
extensionVector.allocateNew();
if (callBack != null) {
callBack.doWork();
}
}
}
return extensionVector;
}

public int getTypeValue(int index) {
return typeBuffer.getByte(index * TYPE_WIDTH);
}
Expand Down Expand Up @@ -725,6 +741,8 @@ public ValueVector getVectorByType(int typeId, ArrowType arrowType) {
return getListView();
case MAP:
return getMap(name, arrowType);
case EXTENSIONTYPE:
return getExtension(arrowType);
default:
throw new UnsupportedOperationException("Cannot support type: " + MinorType.values()[typeId]);
}
Expand Down
27 changes: 26 additions & 1 deletion vector/src/main/codegen/templates/UnionWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
package org.apache.arrow.vector.complex.impl;

<#include "/@includes/vv_imports.ftl" />
import java.util.HashMap;

import org.apache.arrow.vector.complex.writer.BaseWriter;
import org.apache.arrow.vector.types.Types.MinorType;

Expand Down Expand Up @@ -213,8 +215,31 @@ public MapWriter asMap(ArrowType arrowType) {
return getMapWriter(arrowType);
}

private java.util.Map<ArrowType, ExtensionWriter> extensionWriters = new HashMap<>();

private ExtensionWriter getExtensionWriter(ArrowType arrowType) {
throw new UnsupportedOperationException("ExtensionTypes are not supported yet.");
ExtensionWriter w = extensionWriters.get(arrowType);
if (w == null) {
w = ((ExtensionType) arrowType).getNewFieldWriter(data.getExtension(arrowType));
w.setPosition(idx());
extensionWriters.put(arrowType, w);
}
return w;
}

public void writeExtension(Object value, ArrowType type) {
data.setType(idx(), MinorType.EXTENSIONTYPE);
ExtensionWriter w = getExtensionWriter(type);
w.setPosition(idx());
w.writeExtension(value);
}

@Override
public void write(ExtensionHolder holder) {
data.setType(idx(), MinorType.EXTENSIONTYPE);
ExtensionWriter w = getExtensionWriter(holder.type());
w.setPosition(idx());
w.write(holder);
}

BaseWriter getWriter(MinorType minorType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.ReferenceManager;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.complex.impl.ExtensionTypeWriterFactory;
import org.apache.arrow.vector.complex.reader.FieldReader;
import org.apache.arrow.vector.util.DataSizeRoundingUtil;
import org.apache.arrow.vector.util.TransferPair;
Expand Down Expand Up @@ -261,18 +260,6 @@ public void copyFromSafe(int fromIndex, int thisIndex, ValueVector from) {
throw new UnsupportedOperationException();
}

@Override
public void copyFrom(
int fromIndex, int thisIndex, ValueVector from, ExtensionTypeWriterFactory writerFactory) {
throw new UnsupportedOperationException();
}

@Override
public void copyFromSafe(
int fromIndex, int thisIndex, ValueVector from, ExtensionTypeWriterFactory writerFactory) {
throw new UnsupportedOperationException();
}

/**
* Transfer the validity buffer from `validityBuffer` to the target vector's `validityBuffer`.
* Start at `startIndex` and copy `length` number of elements. If the starting index is 8 byte
Expand Down
Loading