@@ -1150,67 +1150,99 @@ void ColumnWriterImpl::FlushBufferedDataPages() {
11501150// ----------------------------------------------------------------------
11511151// TypedColumnWriter
11521152
1153- template <typename Action>
1154- inline void DoInBatches (int64_t total, int64_t batch_size, Action&& action) {
1155- int64_t num_batches = static_cast <int >(total / batch_size);
1156- for (int round = 0 ; round < num_batches; round++) {
1157- action (round * batch_size, batch_size, /* check_page_size=*/ true );
1158- }
1159- // Write the remaining values
1160- if (total % batch_size > 0 ) {
1161- action (num_batches * batch_size, total % batch_size, /* check_page_size=*/ true );
1162- }
1163- }
1153+ // DoInBatches for non-repeated columns
1154+ template <typename Action, typename GetBufferedRows>
1155+ inline void DoInBatchesNonRepeated (int64_t num_levels, int64_t batch_size,
1156+ int64_t max_rows_per_page, Action&& action,
1157+ GetBufferedRows&& curr_page_buffered_rows) {
1158+ int64_t offset = 0 ;
1159+ while (offset < num_levels) {
1160+ int64_t page_buffered_rows = curr_page_buffered_rows ();
1161+ ARROW_DCHECK_LE (page_buffered_rows, max_rows_per_page);
11641162
1165- template <typename Action>
1166- inline void DoInBatches (const int16_t * def_levels, const int16_t * rep_levels,
1167- int64_t num_levels, int64_t batch_size, Action&& action,
1168- bool pages_change_on_record_boundaries) {
1169- if (!pages_change_on_record_boundaries || !rep_levels) {
1170- // If rep_levels is null, then we are writing a non-repeated column.
1171- // In this case, every record contains only one level.
1172- return DoInBatches (num_levels, batch_size, std::forward<Action>(action));
1163+ // Every record contains only one level.
1164+ int64_t max_batch_size = std::min (batch_size, num_levels - offset);
1165+ max_batch_size = std::min (max_batch_size, max_rows_per_page - page_buffered_rows);
1166+ int64_t end_offset = offset + max_batch_size;
1167+
1168+ ARROW_DCHECK_LE (offset, end_offset);
1169+ ARROW_DCHECK_LE (end_offset, num_levels);
1170+
1171+ // Always check page limit for non-repeated columns.
1172+ action (offset, end_offset - offset, /* check_page_limit=*/ true );
1173+
1174+ offset = end_offset;
11731175 }
1176+ }
11741177
1178+ // DoInBatches for repeated columns
1179+ template <typename Action, typename GetBufferedRows>
1180+ inline void DoInBatchesRepeated (const int16_t * def_levels, const int16_t * rep_levels,
1181+ int64_t num_levels, int64_t batch_size,
1182+ int64_t max_rows_per_page,
1183+ bool pages_change_on_record_boundaries, Action&& action,
1184+ GetBufferedRows&& curr_page_buffered_rows) {
11751185 int64_t offset = 0 ;
11761186 while (offset < num_levels) {
1177- int64_t end_offset = std::min (offset + batch_size, num_levels);
1178-
1179- // Find next record boundary (i.e. rep_level = 0)
1180- while (end_offset < num_levels && rep_levels[end_offset] != 0 ) {
1181- end_offset++;
1182- }
1183-
1184- if (end_offset < num_levels) {
1185- // This is not the last chunk of batch and end_offset is a record boundary.
1186- // It is a good chance to check the page size.
1187- action (offset, end_offset - offset, /* check_page_size=*/ true );
1188- } else {
1189- DCHECK_EQ (end_offset, num_levels);
1190- // This is the last chunk of batch, and we do not know whether end_offset is a
1191- // record boundary. Find the offset to beginning of last record in this chunk,
1192- // so we can check page size.
1193- int64_t last_record_begin_offset = num_levels - 1 ;
1194- while (last_record_begin_offset >= offset &&
1195- rep_levels[last_record_begin_offset] != 0 ) {
1196- last_record_begin_offset--;
1187+ int64_t max_batch_size = std::min (batch_size, num_levels - offset);
1188+ int64_t end_offset = num_levels; // end offset of the current batch
1189+ int64_t check_page_limit_end_offset = -1 ; // offset to check page limit (if not -1)
1190+
1191+ int64_t page_buffered_rows = curr_page_buffered_rows ();
1192+ ARROW_DCHECK_LE (page_buffered_rows, max_rows_per_page);
1193+
1194+ // Iterate rep_levels to find the shortest sequence that ends before a record
1195+ // boundary (i.e. rep_levels == 0) with a size no less than max_batch_size
1196+ for (int64_t i = offset; i < num_levels; ++i) {
1197+ if (rep_levels[i] == 0 ) {
1198+ // Use the beginning of last record to check page limit.
1199+ check_page_limit_end_offset = i;
1200+ if (i - offset >= max_batch_size || page_buffered_rows >= max_rows_per_page) {
1201+ end_offset = i;
1202+ break ;
1203+ }
1204+ page_buffered_rows += 1 ;
11971205 }
1206+ }
11981207
1199- if (offset <= last_record_begin_offset) {
1200- // We have found the beginning of last record and can check page size.
1201- action (offset, last_record_begin_offset - offset, /* check_page_size=*/ true );
1202- offset = last_record_begin_offset;
1203- }
1208+ ARROW_DCHECK_LE (offset, end_offset);
1209+ ARROW_DCHECK_LE (check_page_limit_end_offset, end_offset);
12041210
1205- // Write remaining data after the record boundary,
1206- // or all data if no boundary was found.
1207- action (offset, end_offset - offset, /* check_page_size=*/ false );
1211+ if (check_page_limit_end_offset >= 0 ) {
1212+ // At least one record boundary is included in this batch.
1213+ // It is a good chance to check the page limit.
1214+ action (offset, check_page_limit_end_offset - offset, /* check_page_limit=*/ true );
1215+ offset = check_page_limit_end_offset;
1216+ }
1217+ if (end_offset > offset) {
1218+ // The is the last chunk of batch, and we do not know whether end_offset is a
1219+ // record boundary so we cannot check page limit if pages cannot change on
1220+ // record boundaries.
1221+ ARROW_DCHECK_EQ (end_offset, num_levels);
1222+ action (offset, end_offset - offset,
1223+ /* check_page_limit=*/ !pages_change_on_record_boundaries);
12081224 }
12091225
12101226 offset = end_offset;
12111227 }
12121228}
12131229
1230+ template <typename Action, typename GetBufferedRows>
1231+ inline void DoInBatches (const int16_t * def_levels, const int16_t * rep_levels,
1232+ int64_t num_levels, int64_t batch_size, int64_t max_rows_per_page,
1233+ bool pages_change_on_record_boundaries, Action&& action,
1234+ GetBufferedRows&& curr_page_buffered_rows) {
1235+ if (!rep_levels) {
1236+ DoInBatchesNonRepeated (num_levels, batch_size, max_rows_per_page,
1237+ std::forward<Action>(action),
1238+ std::forward<GetBufferedRows>(curr_page_buffered_rows));
1239+ } else {
1240+ DoInBatchesRepeated (def_levels, rep_levels, num_levels, batch_size, max_rows_per_page,
1241+ pages_change_on_record_boundaries, std::forward<Action>(action),
1242+ std::forward<GetBufferedRows>(curr_page_buffered_rows));
1243+ }
1244+ }
1245+
12141246namespace {
12151247
12161248bool DictionaryDirectWriteSupported (const ::arrow::Array& array) {
@@ -1318,7 +1350,8 @@ class TypedColumnWriterImpl : public ColumnWriterImpl,
13181350 CheckDictionarySizeLimit ();
13191351 };
13201352 DoInBatches (def_levels, rep_levels, num_values, properties_->write_batch_size (),
1321- WriteChunk, pages_change_on_record_boundaries ());
1353+ properties_->max_rows_per_page (), pages_change_on_record_boundaries (),
1354+ WriteChunk, [this ]() { return num_buffered_rows_; });
13221355 return value_offset;
13231356 }
13241357
@@ -1368,7 +1401,8 @@ class TypedColumnWriterImpl : public ColumnWriterImpl,
13681401 CheckDictionarySizeLimit ();
13691402 };
13701403 DoInBatches (def_levels, rep_levels, num_values, properties_->write_batch_size (),
1371- WriteChunk, pages_change_on_record_boundaries ());
1404+ properties_->max_rows_per_page (), pages_change_on_record_boundaries (),
1405+ WriteChunk, [this ]() { return num_buffered_rows_; });
13721406 }
13731407
13741408 Status WriteArrow (const int16_t * def_levels, const int16_t * rep_levels,
@@ -1769,13 +1803,14 @@ class TypedColumnWriterImpl : public ColumnWriterImpl,
17691803 }
17701804
17711805 void CommitWriteAndCheckPageLimit (int64_t num_levels, int64_t num_values,
1772- int64_t num_nulls, bool check_page_size ) {
1806+ int64_t num_nulls, bool check_page_limit ) {
17731807 num_buffered_values_ += num_levels;
17741808 num_buffered_encoded_values_ += num_values;
17751809 num_buffered_nulls_ += num_nulls;
17761810
1777- if (check_page_size &&
1778- current_encoder_->EstimatedDataEncodedSize () >= properties_->data_pagesize ()) {
1811+ if (check_page_limit &&
1812+ (current_encoder_->EstimatedDataEncodedSize () >= properties_->data_pagesize () ||
1813+ num_buffered_rows_ >= properties_->max_rows_per_page ())) {
17791814 AddDataPage ();
17801815 }
17811816 }
@@ -1996,9 +2031,10 @@ Status TypedColumnWriterImpl<ParquetType>::WriteArrowDictionary(
19962031 return WriteDense ();
19972032 }
19982033
1999- PARQUET_CATCH_NOT_OK (DoInBatches (def_levels, rep_levels, num_levels,
2000- properties_->write_batch_size (), WriteIndicesChunk,
2001- pages_change_on_record_boundaries ()));
2034+ PARQUET_CATCH_NOT_OK (
2035+ DoInBatches (def_levels, rep_levels, num_levels, properties_->write_batch_size (),
2036+ properties_->max_rows_per_page (), pages_change_on_record_boundaries (),
2037+ WriteIndicesChunk, [this ]() { return num_buffered_rows_; }));
20022038 return Status::OK ();
20032039}
20042040
@@ -2441,9 +2477,10 @@ Status TypedColumnWriterImpl<ByteArrayType>::WriteArrowDense(
24412477 value_offset += batch_num_spaced_values;
24422478 };
24432479
2444- PARQUET_CATCH_NOT_OK (DoInBatches (def_levels, rep_levels, num_levels,
2445- properties_->write_batch_size (), WriteChunk,
2446- pages_change_on_record_boundaries ()));
2480+ PARQUET_CATCH_NOT_OK (
2481+ DoInBatches (def_levels, rep_levels, num_levels, properties_->write_batch_size (),
2482+ properties_->max_rows_per_page (), pages_change_on_record_boundaries (),
2483+ WriteChunk, [this ]() { return num_buffered_rows_; }));
24472484 return Status::OK ();
24482485}
24492486
0 commit comments