Skip to content

Commit 8a18387

Browse files
YiChengLee03facebook-github-bot
authored andcommitted
feat: Add statistics unit tests (facebookincubator#13903)
Summary: Pull Request resolved: facebookincubator#13903 - Fix bugs in reader due to multithreaded environment, invalid flags for EOL and EOF - Exec test for rawReadBytes statistics Reviewed By: zacw7 Differential Revision: D77421942 fbshipit-source-id: 239a2b8710bd3f3f90214bb17c6ad43f7bfe08f9
1 parent 00d9594 commit 8a18387

File tree

5 files changed

+236
-8
lines changed

5 files changed

+236
-8
lines changed

velox/dwio/text/reader/CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,5 @@
1414

1515
velox_add_library(velox_dwio_text_reader TextReader.cpp)
1616

17-
velox_link_libraries(velox_dwio_text_reader velox_dwio_common fmt::fmt)
17+
velox_link_libraries(velox_dwio_text_reader velox_type_fbhive velox_dwio_common
18+
fmt::fmt)

velox/dwio/text/reader/TextReader.cpp

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -118,8 +118,11 @@ TextRowReader::TextRowReader(
118118
atEOF_{false},
119119
atSOL_{false},
120120
depth_{0},
121+
unreadData_{""},
122+
unreadIdx_{0},
121123
limit_{opts.limit()},
122124
fileLength_{getStreamLength()},
125+
ownedString_{""},
123126
stringViewBuffer_{StringViewBufferHolder(&contents_->pool)} {
124127
// Seek to first line at or after the specified region.
125128
if (contents_->compression == CompressionKind::CompressionKind_NONE) {
@@ -235,16 +238,21 @@ uint64_t TextRowReader::next(
235238
++rowsRead;
236239

237240
if (pos_ >= getLength()) {
241+
// disable further chunk reads but parse the remainder of the line
238242
atEOF_ = true;
239-
rowVecPtr->resize(rowsRead);
240243
}
241244

242245
// handle empty file
243246
if (initialPos == pos_ && atEOF_) {
244247
currentRow_ = 0;
245248
}
246249
}
250+
251+
// Resize the row vector to the actual number of rows read.
252+
// Handled here for both cases: pos_ > fileLength_ and pos_ > limit_
253+
rowVecPtr->resize(rowsRead);
247254
result = projectColumns(rowVecPtr, *scanSpec_, mutation);
255+
248256
return rowsRead;
249257
}
250258

@@ -327,8 +335,10 @@ void TextRowReader::setEOF() {
327335
atEOL_ = true;
328336
}
329337

338+
/// TODO: Update maximum depth after fixing issue with deeply nested complex
339+
/// types
330340
void TextRowReader::incrementDepth() {
331-
if (depth_ >= 6) {
341+
if (depth_ > 4) {
332342
parse_error("Schema nesting too deep");
333343
}
334344
depth_++;
@@ -509,6 +519,7 @@ DelimType TextRowReader::getDelimType(uint8_t v) {
509519
/// TODO: Logically should be >=, kept as it is to align with presto reader.
510520
if (pos_ > limit_) {
511521
atEOF_ = true;
522+
delim = DelimTypeEOR;
512523
}
513524
} else if (v == contents_->serDeOptions.separators.at(depth_)) {
514525
setEOE(delim);
@@ -563,7 +574,7 @@ char TextRowReader::getByteUnchecked(DelimType& delim) {
563574
}
564575
if (!skipLF) {
565576
setEOF();
566-
delim = 1;
577+
delim = DelimTypeEOR;
567578
}
568579
return '\n';
569580
}
@@ -579,10 +590,15 @@ char TextRowReader::getByteUncheckedOptimized(DelimType& delim) {
579590

580591
try {
581592
char v;
582-
if (unreadData_.empty()) {
593+
if (unreadData_.empty() || unreadIdx_ >= unreadData_.size()) {
583594
int length;
584595
const void* buffer;
585-
contents_->inputStream->Next(&buffer, &length);
596+
if (!contents_->inputStream->Next(&buffer, &length)) {
597+
setEOF();
598+
delim = DelimTypeEOR;
599+
return '\0';
600+
}
601+
586602
unreadData_ = std::string(reinterpret_cast<const char*>(buffer), length);
587603
unreadIdx_ = 0;
588604
}
@@ -608,7 +624,7 @@ char TextRowReader::getByteUncheckedOptimized(DelimType& delim) {
608624
}
609625
if (!skipLF) {
610626
setEOF();
611-
delim = 1;
627+
delim = DelimTypeEOR;
612628
}
613629
return '\n';
614630
}
@@ -673,7 +689,8 @@ bool TextRowReader::skipLine() {
673689
}
674690
/// TODO: Logically should be >=, kept as it is to align with presto reader
675691
if (pos_ > limit_) {
676-
atEOF_ = true;
692+
setEOF();
693+
delim = DelimTypeEOR;
677694
}
678695
return atEOF_;
679696
}

velox/exec/tests/TableScanTest.cpp

Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,14 @@
2020
#include <folly/experimental/EventCount.h>
2121
#include <folly/synchronization/Baton.h>
2222
#include <folly/synchronization/Latch.h>
23+
#include <filesystem>
2324

2425
#include "velox/common/base/Fs.h"
2526
#include "velox/common/base/tests/GTestUtils.h"
2627
#include "velox/common/caching/AsyncDataCache.h"
2728
#include "velox/common/caching/tests/CacheTestUtil.h"
29+
#include "velox/common/file/File.h"
30+
#include "velox/common/file/tests/FaultyFile.h"
2831
#include "velox/common/file/tests/FaultyFileSystem.h"
2932
#include "velox/common/memory/MemoryArbitrator.h"
3033
#include "velox/common/testutil/TestValue.h"
@@ -5789,5 +5792,208 @@ TEST_F(TableScanTest, prevBatchEmptyAdaptivity) {
57895792
EXPECT_GT(numBatchesReadWithoutAdaptivity, numBatchesRead);
57905793
}
57915794
}
5795+
5796+
TEST_F(TableScanTest, textfileEscape) {
5797+
auto expected = makeRowVector(
5798+
{"c0", "c1"},
5799+
{
5800+
makeFlatVector<std::string>({"a,bc", "d"}),
5801+
makeFlatVector<std::string>({"e", "e"}),
5802+
});
5803+
5804+
const auto tempFile = TempFilePath::create();
5805+
const auto tempPath = tempFile->getPath();
5806+
remove(tempPath.c_str());
5807+
LocalWriteFile localWriteFile(tempPath);
5808+
localWriteFile.append("a\\,bc,e\nd,e");
5809+
localWriteFile.close();
5810+
5811+
std::unordered_map<std::string, std::string> customSplitInfo;
5812+
std::unordered_map<std::string, std::optional<std::string>> partitionKeys;
5813+
std::unordered_map<std::string, std::string> serdeParameters{
5814+
{dwio::common::SerDeOptions::kFieldDelim, ","},
5815+
{dwio::common::SerDeOptions::kEscapeChar, "\\"}};
5816+
5817+
auto split = std::make_shared<connector::hive::HiveConnectorSplit>(
5818+
kHiveConnectorId,
5819+
tempPath,
5820+
dwio::common::FileFormat(dwio::common::FileFormat::TEXT),
5821+
0,
5822+
std::numeric_limits<uint64_t>::max(),
5823+
partitionKeys,
5824+
std::nullopt,
5825+
customSplitInfo,
5826+
nullptr,
5827+
serdeParameters);
5828+
5829+
auto inputType = asRowType(expected->type());
5830+
auto plan =
5831+
PlanBuilder(pool()).tableScan(inputType, {}, "", inputType).planNode();
5832+
5833+
auto task = facebook::velox::exec::test::AssertQueryBuilder(plan)
5834+
.split(split)
5835+
.assertResults(expected);
5836+
auto planStats = facebook::velox::exec::toPlanStats(task->taskStats());
5837+
auto scanNodeId = plan->id();
5838+
auto it = planStats.find(scanNodeId);
5839+
ASSERT_TRUE(it != planStats.end());
5840+
auto rawInputBytes = it->second.rawInputBytes;
5841+
auto overreadBytes = getTableScanRuntimeStats(task).at("overreadBytes").sum;
5842+
5843+
ASSERT_EQ(rawInputBytes, 11);
5844+
ASSERT_EQ(overreadBytes, 0);
5845+
}
5846+
5847+
TEST_F(TableScanTest, textfileChunkReadEntireFile) {
5848+
auto expected = makeRowVector(
5849+
{"c0", "c1"},
5850+
{
5851+
makeFlatVector<std::string>({"row1_col1", "row2_col1", "row3_col1"}),
5852+
makeFlatVector<std::string>({"row1_col2", "row2_col2", "row3_col2"}),
5853+
});
5854+
5855+
const auto tempFile = TempFilePath::create();
5856+
const auto tempPath = tempFile->getPath();
5857+
remove(tempPath.c_str());
5858+
LocalWriteFile localWriteFile(tempPath);
5859+
5860+
localWriteFile.append("row1_col1,row1_col2\n");
5861+
localWriteFile.append("row2_col1,row2_col2\n");
5862+
localWriteFile.append("row3_col1,row3_col2\n");
5863+
5864+
// Add extra padding data that might be read but not used
5865+
localWriteFile.append("extra_row1,extra_data1\n");
5866+
localWriteFile.append("extra_row2,extra_data2\n");
5867+
localWriteFile.close();
5868+
5869+
std::unordered_map<std::string, std::string> customSplitInfo;
5870+
std::unordered_map<std::string, std::optional<std::string>> partitionKeys;
5871+
std::unordered_map<std::string, std::string> serdeParameters{
5872+
{dwio::common::SerDeOptions::kFieldDelim, ","}};
5873+
5874+
// Create a split that only reads part of the file (first 60 bytes)
5875+
// This should cause the reader to potentially overread beyond the split
5876+
// boundary
5877+
auto split = std::make_shared<connector::hive::HiveConnectorSplit>(
5878+
kHiveConnectorId,
5879+
tempPath,
5880+
dwio::common::FileFormat(dwio::common::FileFormat::TEXT),
5881+
0,
5882+
59, // Limit to first 60 bytes instead of reading entire file
5883+
partitionKeys,
5884+
std::nullopt,
5885+
customSplitInfo,
5886+
nullptr,
5887+
serdeParameters);
5888+
5889+
auto inputType = asRowType(expected->type());
5890+
auto plan =
5891+
PlanBuilder(pool()).tableScan(inputType, {}, "", inputType).planNode();
5892+
5893+
auto task = facebook::velox::exec::test::AssertQueryBuilder(plan)
5894+
.split(split)
5895+
.assertResults(expected);
5896+
5897+
auto planStats = facebook::velox::exec::toPlanStats(task->taskStats());
5898+
auto scanNodeId = plan->id();
5899+
auto it = planStats.find(scanNodeId);
5900+
ASSERT_TRUE(it != planStats.end());
5901+
auto rawInputBytes = it->second.rawInputBytes;
5902+
5903+
// Entire file was read in a single chunk even though range is [0,59]
5904+
ASSERT_EQ(rawInputBytes, 106);
5905+
}
5906+
5907+
TEST_F(TableScanTest, textfileLarge) {
5908+
constexpr int kNumRows =
5909+
100000; // This will generate well over 8388608 bytes (per chunk read)
5910+
constexpr int kNumCols = 10;
5911+
5912+
constexpr int loadQuantum = 8 << 20; // loadQuantum_ as of June 2025
5913+
5914+
// Helper function to generate column data
5915+
auto generateColumnData = [](int row, int col) {
5916+
return fmt::format("row{}_col{}_padding_data_to_increase_size", row, col);
5917+
};
5918+
5919+
// Helper function to generate CSV row
5920+
auto generateCsvRow = [&](int row) {
5921+
std::vector<std::string> cols;
5922+
cols.reserve(kNumCols);
5923+
for (int col = 0; col < kNumCols; ++col) {
5924+
cols.push_back(generateColumnData(row, col));
5925+
}
5926+
return fmt::format("{}\n", fmt::join(cols, ","));
5927+
};
5928+
5929+
// Create expected result (only first row since split limit is 10 bytes)
5930+
std::vector<std::string> expectedRow;
5931+
expectedRow.reserve(kNumCols);
5932+
for (int col = 0; col < kNumCols; ++col) {
5933+
expectedRow.push_back(generateColumnData(0, col));
5934+
}
5935+
5936+
std::vector<std::string> columnNames;
5937+
std::vector<VectorPtr> columnVectors;
5938+
columnNames.reserve(kNumCols);
5939+
columnVectors.reserve(kNumCols);
5940+
5941+
for (int col = 0; col < kNumCols; ++col) {
5942+
columnNames.push_back(fmt::format("c{}", col));
5943+
columnVectors.push_back(makeFlatVector<std::string>({expectedRow[col]}));
5944+
}
5945+
5946+
auto expected = makeRowVector(columnNames, columnVectors);
5947+
5948+
// Create large file
5949+
const auto tempFile = TempFilePath::create();
5950+
const auto tempPath = tempFile->getPath();
5951+
remove(tempPath.c_str());
5952+
LocalWriteFile localWriteFile(tempPath);
5953+
5954+
for (int row = 0; row < kNumRows; ++row) {
5955+
localWriteFile.append(generateCsvRow(row));
5956+
}
5957+
localWriteFile.close();
5958+
5959+
ASSERT_GE(std::filesystem::file_size(tempPath), loadQuantum);
5960+
5961+
std::unordered_map<std::string, std::string> customSplitInfo;
5962+
std::unordered_map<std::string, std::optional<std::string>> partitionKeys;
5963+
std::unordered_map<std::string, std::string> serdeParameters{
5964+
{dwio::common::SerDeOptions::kFieldDelim, ","}};
5965+
5966+
auto split = std::make_shared<connector::hive::HiveConnectorSplit>(
5967+
kHiveConnectorId,
5968+
tempPath,
5969+
dwio::common::FileFormat(dwio::common::FileFormat::TEXT),
5970+
0,
5971+
10, // Limit to only first row
5972+
partitionKeys,
5973+
std::nullopt,
5974+
customSplitInfo,
5975+
nullptr,
5976+
serdeParameters);
5977+
5978+
auto inputType = asRowType(expected->type());
5979+
auto plan =
5980+
PlanBuilder(pool()).tableScan(inputType, {}, "", inputType).planNode();
5981+
5982+
auto task = facebook::velox::exec::test::AssertQueryBuilder(plan)
5983+
.split(split)
5984+
.assertResults(expected);
5985+
5986+
auto planStats = facebook::velox::exec::toPlanStats(task->taskStats());
5987+
auto scanNodeId = plan->id();
5988+
auto it = planStats.find(scanNodeId);
5989+
ASSERT_TRUE(it != planStats.end());
5990+
auto rawInputBytes = it->second.rawInputBytes;
5991+
5992+
// Verify we did not read the entire file but only a chunk
5993+
ASSERT_EQ(rawInputBytes, loadQuantum);
5994+
ASSERT_GT(getTableScanRuntimeStats(task)["totalScanTime"].sum, 0);
5995+
ASSERT_GT(getTableScanRuntimeStats(task)["ioWaitWallNanos"].sum, 0);
5996+
}
5997+
57925998
} // namespace
57935999
} // namespace facebook::velox::exec

velox/exec/tests/utils/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ target_link_libraries(
5050
velox_dwio_common
5151
velox_dwio_dwrf_reader
5252
velox_dwio_dwrf_writer
53+
velox_dwio_text_reader_register
5354
velox_dwio_common_test_utils
5455
velox_file_test_utils
5556
velox_type_fbhive

velox/exec/tests/utils/HiveConnectorTestBase.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
#include "velox/dwio/dwrf/RegisterDwrfWriter.h"
2525
#include "velox/dwio/dwrf/writer/FlushPolicy.h"
2626
#include "velox/dwio/dwrf/writer/Writer.h"
27+
#include "velox/dwio/text/RegisterTextReader.h"
2728
#include "velox/exec/tests/utils/AssertQueryBuilder.h"
2829

2930
namespace facebook::velox::exec::test {
@@ -49,6 +50,7 @@ void HiveConnectorTestBase::SetUp() {
4950
dwio::common::registerFileSinks();
5051
dwrf::registerDwrfReaderFactory();
5152
dwrf::registerDwrfWriterFactory();
53+
dwio::common::registerTextReaderFactory();
5254
}
5355

5456
void HiveConnectorTestBase::TearDown() {
@@ -60,6 +62,7 @@ void HiveConnectorTestBase::TearDown() {
6062
connector::unregisterConnector(kHiveConnectorId);
6163
connector::unregisterConnectorFactory(
6264
connector::hive::HiveConnectorFactory::kHiveConnectorName);
65+
dwio::common::unregisterTextReaderFactory();
6366
OperatorTestBase::TearDown();
6467
}
6568

0 commit comments

Comments
 (0)