-
Notifications
You must be signed in to change notification settings - Fork 3.9k
Closed
Description
Using Dataset NativeMemoryPool.getDefault() work very well with any size of data input as you see in:
import org.apache.arrow.dataset.file.FileFormat;
import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
import org.apache.arrow.dataset.jni.DirectReservationListener;
import org.apache.arrow.dataset.jni.NativeMemoryPool;
import org.apache.arrow.dataset.scanner.ScanOptions;
import org.apache.arrow.dataset.scanner.ScanTask;
import org.apache.arrow.dataset.scanner.Scanner;
import org.apache.arrow.dataset.source.Dataset;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowReader;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
public class ReadingMultipleParquetFiles {
public static void main(String[] args) {
//https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2022-01.parquet
File file = new File("src/main/resources/parquetfiles/yellow_tripdata_2022-01.parquet");
List<VectorSchemaRoot> schemaRoots = new ArrayList<>();
try(BufferAllocator allocator = new RootAllocator();
NativeMemoryPool aDefault = NativeMemoryPool.getDefault();
FileSystemDatasetFactory fileSystemDatasetFactory = new FileSystemDatasetFactory(
allocator, aDefault,
FileFormat.PARQUET, file.toURI().toString());
Dataset dataset = fileSystemDatasetFactory.finish();
Scanner scanner = dataset.newScan(new ScanOptions(1000))
){
System.out.println(DirectReservationListener.instance().getCurrentDirectMemReservation());
for (ScanTask scanTask : scanner.scan()) {
try(ArrowReader execute = scanTask.execute()){
while(execute.loadNextBatch()){
schemaRoots.add(execute.getVectorSchemaRoot());
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(schemaRoots.size());
}
}
In case we decided to use NativeMemoryPool.createListenable we are seeing this error message: /Users/runner/work/crossbow/crossbow/arrow/java/dataset/src/main/cpp/jni_util.cc:78: Failed to update reservation while freeing bytes: JNIEnv was not attached to current thread:
import org.apache.arrow.dataset.file.FileFormat;
import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
import org.apache.arrow.dataset.jni.DirectReservationListener;
import org.apache.arrow.dataset.jni.NativeMemoryPool;
import org.apache.arrow.dataset.scanner.ScanOptions;
import org.apache.arrow.dataset.scanner.ScanTask;
import org.apache.arrow.dataset.scanner.Scanner;
import org.apache.arrow.dataset.source.Dataset;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowReader;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
public class ReadingMultipleParquetFiles {
public static void main(String[] args) {
//https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2022-01.parquet
File file = new File("src/main/resources/parquetfiles/yellow_tripdata_2022-01.parquet");
List<VectorSchemaRoot> schemaRoots = new ArrayList<>();
try(BufferAllocator allocator = new RootAllocator();
NativeMemoryPool listenable = NativeMemoryPool.createListenable(
DirectReservationListener.instance());
FileSystemDatasetFactory fileSystemDatasetFactory = new FileSystemDatasetFactory(
allocator, listenable,
FileFormat.PARQUET, file.toURI().toString());
Dataset dataset = fileSystemDatasetFactory.finish();
Scanner scanner = dataset.newScan(new ScanOptions(1000))
){
System.out.println(DirectReservationListener.instance().getCurrentDirectMemReservation());
for (ScanTask scanTask : scanner.scan()) {
try(ArrowReader execute = scanTask.execute()){
while(execute.loadNextBatch()){
schemaRoots.add(execute.getVectorSchemaRoot());
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(schemaRoots.size());
}
}
Log stack trace:
/Users/runner/work/crossbow/crossbow/arrow/java/dataset/src/main/cpp/jni_util.cc:78: Failed to update reservation while freeing bytes: JNIEnv was not attached to current thread
0 jnilib-1263766398115565476.tmp 0x000000013f46fc0c _ZN5arrow4util7CerrLogD2Ev + 204
1 jnilib-1263766398115565476.tmp 0x000000013f46fb2e _ZN5arrow4util7CerrLogD0Ev + 14
2 jnilib-1263766398115565476.tmp 0x000000013f464de2 _ZN5arrow4util8ArrowLogD1Ev + 34
3 jnilib-1263766398115565476.tmp 0x000000013e54d96d _ZN5arrow7dataset3jni31ReservationListenableMemoryPool4Impl4FreeEPhx + 237
4 jnilib-1263766398115565476.tmp 0x000000013f78c035 _ZN5arrow10PoolBufferD2Ev + 69
5 jnilib-1263766398115565476.tmp 0x000000013f78bd0e _ZN5arrow10PoolBufferD0Ev + 14
6 jnilib-1263766398115565476.tmp 0x000000013f70e1ce _ZN5arrow9ArrayDataD2Ev + 222
7 jnilib-1263766398115565476.tmp 0x000000013f5b8fde _ZN5arrow17SimpleRecordBatchD2Ev + 206
8 jnilib-1263766398115565476.tmp 0x000000013e5785b8 _ZNO5arrow6FutureINSt3__110shared_ptrINS_11RecordBatchEEEE14ThenOnCompleteIZNS_23DefaultIfEmptyGeneratorIS4_EclEvEUt_NS5_17PassthruOnFailureIS9_EEEclERKNS_6ResultIS4_EE + 168
9 jnilib-1263766398115565476.tmp 0x000000013f46b486 _ZN5arrow18ConcreteFutureImpl21RunOrScheduleCallbackERKNSt3__110shared_ptrINS_10FutureImplEEEONS3_14CallbackRecordEb + 230
10 jnilib-1263766398115565476.tmp 0x000000013f46b2bd _ZN5arrow18ConcreteFutureImpl22DoMarkFinishedOrFailedENS_11FutureStateE + 189
11 jnilib-1263766398115565476.tmp 0x000000013e82dc82 _ZN5arrow6FutureINSt3__110shared_ptrINS_11RecordBatchEEEE14DoMarkFinishedENS_6ResultIS4_EE + 290
12 jnilib-1263766398115565476.tmp 0x000000013e82d8e8 _ZN5arrow6FutureINSt3__110shared_ptrINS_11RecordBatchEEEE12MarkFinishedENS_6ResultIS4_EE + 88
13 jnilib-1263766398115565476.tmp 0x000000013e82e8b0 _ZN5arrow8internal6FnOnceIFvRKNS_10FutureImplEEE6FnImplINS_6FutureINSt3__110shared_ptrINS_11RecordBatchEEEE21WrapResultyOnComplete8CallbackINS_6detail16MarkNextFinishedISD_SD_Lb0ELb0EEEEEE6invokeES4_ + 160
14 jnilib-1263766398115565476.tmp 0x000000013f46b486 _ZN5arrow18ConcreteFutureImpl21RunOrScheduleCallbackERKNSt3__110shared_ptrINS_10FutureImplEEEONS3_14CallbackRecordEb + 230
15 jnilib-1263766398115565476.tmp 0x000000013f46b2bd _ZN5arrow18ConcreteFutureImpl22DoMarkFinishedOrFailedENS_11FutureStateE + 189
16 jnilib-1263766398115565476.tmp 0x000000013e82dc82 _ZN5arrow6FutureINSt3__110shared_ptrINS_11RecordBatchEEEE14DoMarkFinishedENS_6ResultIS4_EE + 290
17 jnilib-1263766398115565476.tmp 0x000000013e82d8e8 _ZN5arrow6FutureINSt3__110shared_ptrINS_11RecordBatchEEEE12MarkFinishedENS_6ResultIS4_EE + 88
18 jnilib-1263766398115565476.tmp 0x000000013e5c93fe _ZNK5arrow6detail14ContinueFutureclINS_24SerialReadaheadGeneratorINSt3__110shared_ptrINS_11RecordBatchEEEE11ErrCallbackEJRKNS_6StatusEENS_6ResultIS7_EENS_6FutureIS7_EEEENS4_9enable_ifIXaaaantsr3std7is_voidIT1_EE5valuentsr9is_futureISI_EE5valueoontsrT2_8is_emptysr3std7is_sameISI_SA_EE5valueEvE4typeESJ_OT_DpOT0_ + 110
19 jnilib-1263766398115565476.tmp 0x000000013e5c9335 _ZNO5arrow6FutureINSt3__110shared_ptrINS_11RecordBatchEEEE14ThenOnCompleteINS_24SerialReadaheadGeneratorIS4_E8CallbackENS8_11ErrCallbackEEclERKNS_6ResultIS4_EE + 293
20 jnilib-1263766398115565476.tmp 0x000000013f46b486 _ZN5arrow18ConcreteFutureImpl21RunOrScheduleCallbackERKNSt3__110shared_ptrINS_10FutureImplEEEONS3_14CallbackRecordEb + 230
21 jnilib-1263766398115565476.tmp 0x000000013f46b2bd _ZN5arrow18ConcreteFutureImpl22DoMarkFinishedOrFailedENS_11FutureStateE + 189
22 jnilib-1263766398115565476.tmp 0x000000013e82dc82 _ZN5arrow6FutureINSt3__110shared_ptrINS_11RecordBatchEEEE14DoMarkFinishedENS_6ResultIS4_EE + 290
23 jnilib-1263766398115565476.tmp 0x000000013e82d8e8 _ZN5arrow6FutureINSt3__110shared_ptrINS_11RecordBatchEEEE12MarkFinishedENS_6ResultIS4_EE + 88
24 jnilib-1263766398115565476.tmp 0x000000013e5c7e46 _ZNK5arrow6detail14ContinueFutureclINS_6FutureINSt3__110shared_ptrINS_11RecordBatchEEEE17PassthruOnFailureIZNS_7dataset16SlicingGeneratorclEvEUlRKS7_E_EEJRKNS_6StatusEENS_6ResultIS7_EES8_EENS4_9enable_ifIXaaaantsr3std7is_voidIT1_EE5valuentsr9is_futureISM_EE5valueoontsrT2_8is_emptysr3std7is_sameISM_SG_EE5valueEvE4typeESN_OT_DpOT0_ + 102
25 jnilib-1263766398115565476.tmp 0x000000013e5c7d9e _ZNO5arrow6FutureINSt3__110shared_ptrINS_11RecordBatchEEEE14ThenOnCompleteIZNS_7dataset16SlicingGeneratorclEvEUlRKS4_E_NS5_17PassthruOnFailureISB_EEEclERKNS_6ResultIS4_EE + 222
26 jnilib-1263766398115565476.tmp 0x000000013f46b486 _ZN5arrow18ConcreteFutureImpl21RunOrScheduleCallbackERKNSt3__110shared_ptrINS_10FutureImplEEEONS3_14CallbackRecordEb + 230
27 jnilib-1263766398115565476.tmp 0x000000013f46b2bd _ZN5arrow18ConcreteFutureImpl22DoMarkFinishedOrFailedENS_11FutureStateE + 189
28 jnilib-1263766398115565476.tmp 0x000000013e82dc82 _ZN5arrow6FutureINSt3__110shared_ptrINS_11RecordBatchEEEE14DoMarkFinishedENS_6ResultIS4_EE + 290
29 jnilib-1263766398115565476.tmp 0x000000013e82d8e8 _ZN5arrow6FutureINSt3__110shared_ptrINS_11RecordBatchEEEE12MarkFinishedENS_6ResultIS4_EE + 88
30 jnilib-1263766398115565476.tmp 0x000000013e62ad98 _ZN5arrow8internal6FnOnceIFvRKNS_10FutureImplEEE6FnImplINS_6FutureINS0_5EmptyEE21WrapStatusyOnComplete8CallbackIZNS_15MergedGeneratorINSt3__110shared_ptrINS_11RecordBatchEEEE5State14MarkFinalErrorERKNS_6StatusENS8_ISH_EEEUlSM_E_EEE6invokeES4_ + 56
31 jnilib-1263766398115565476.tmp 0x000000013f46b486 _ZN5arrow18ConcreteFutureImpl21RunOrScheduleCallbackERKNSt3__110shared_ptrINS_10FutureImplEEEONS3_14CallbackRecordEb + 230
32 jnilib-1263766398115565476.tmp 0x000000013f46b2bd _ZN5arrow18ConcreteFutureImpl22DoMarkFinishedOrFailedENS_11FutureStateE + 189
33 jnilib-1263766398115565476.tmp 0x000000013f4bb658 _ZN5arrow6FutureINS_8internal5EmptyEE14DoMarkFinishedENS_6ResultIS2_EE + 152
34 jnilib-1263766398115565476.tmp 0x000000013f4afbc1 _ZN5arrow6FutureINS_8internal5EmptyEE12MarkFinishedIS2_vEEvNS_6StatusE + 81
35 jnilib-1263766398115565476.tmp 0x000000013e626fda _ZN5arrow15MergedGeneratorINSt3__110shared_ptrINS_11RecordBatchEEEE5State20MarkFinishedAndPurgeEv + 58
36 jnilib-1263766398115565476.tmp 0x000000013e62b505 _ZN5arrow15MergedGeneratorINSt3__110shared_ptrINS_11RecordBatchEEEE13OuterCallbackclERKNS_6ResultINS1_8functionIFNS_6FutureIS4_EEvEEEEE + 1173
37 jnilib-1263766398115565476.tmp 0x000000013f46b486 _ZN5arrow18ConcreteFutureImpl21RunOrScheduleCallbackERKNSt3__110shared_ptrINS_10FutureImplEEEONS3_14CallbackRecordEb + 230
38 jnilib-1263766398115565476.tmp 0x000000013f46b2bd _ZN5arrow18ConcreteFutureImpl22DoMarkFinishedOrFailedENS_11FutureStateE + 189
39 jnilib-1263766398115565476.tmp 0x000000013e6220fa _ZN5arrow6FutureINSt3__18functionIFNS0_INS1_10shared_ptrINS_11RecordBatchEEEEEvEEEE14DoMarkFinishedENS_6ResultIS8_EE + 282
40 jnilib-1263766398115565476.tmp 0x000000013e621ef3 _ZN5arrow6FutureINSt3__18functionIFNS0_INS1_10shared_ptrINS_11RecordBatchEEEEEvEEEE12MarkFinishedENS_6ResultIS8_EE + 51
41 jnilib-1263766398115565476.tmp 0x000000013e78715b _ZN5arrow8internal6FnOnceIFvRKNS_10FutureImplEEE6FnImplINS_6FutureINSt3__18functionIFNS8_INS9_10shared_ptrINS_11RecordBatchEEEEEvEEEE21WrapResultyOnComplete8CallbackINS_6detail16MarkNextFinishedISH_SH_Lb0ELb0EEEEEE6invokeES4_ + 59
42 jnilib-1263766398115565476.tmp 0x000000013f46b486 _ZN5arrow18ConcreteFutureImpl21RunOrScheduleCallbackERKNSt3__110shared_ptrINS_10FutureImplEEEONS3_14CallbackRecordEb + 230
43 jnilib-1263766398115565476.tmp 0x000000013f454dcb _ZN5arrow18ConcreteFutureImpl11AddCallbackENS_8internal6FnOnceIFvRKNS_10FutureImplEEEENS_15CallbackOptionsE + 139
44 jnilib-1263766398115565476.tmp 0x000000013f454cfd _ZN5arrow10FutureImpl11AddCallbackENS_8internal6FnOnceIFvRKS0_EEENS_15CallbackOptionsE + 29
45 jnilib-1263766398115565476.tmp 0x000000013e786f78 _ZN5arrow8internal6FnOnceIFvvEE6FnImplINSt3__16__bindINS_6detail14ContinueFutureEJRNS_6FutureINS5_8functionIFNS9_INS5_10shared_ptrINS_11RecordBatchEEEEEvEEEEERFSH_PNS0_8ExecutorENSB_IN7parquet5arrow12_GLOBAL__N_114FileReaderImplEEEiRKNS5_6vectorIiNS5_9allocatorIiEEEEERSK_RSP_RKiSV_EEEE6invokeEv + 184
46 jnilib-1263766398115565476.tmp 0x000000013f453445 _ZNSt3__1L14__thread_proxyINS_5tupleIJNS_10unique_ptrINS_15__thread_structENS_14default_deleteIS3_EEEEZN5arrow8internal10ThreadPool21LaunchWorkersUnlockedEiE3$_3EEEEEPvSC_ + 693
47 libsystem_pthread.dylib 0x00007fff2072f8fc _pthread_start + 224
48 libsystem_pthread.dylib 0x00007fff2072b443 thread_start + 15Process finished with exit code 134 (interrupted by signal 6: SIGABRT)
Reporter: David Dali Susanibar Arce / @davisusanibar
Note: This issue was originally created as ARROW-17508. Please see the migration documentation for further details.