Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
a05202a
Starting S3Outputer
nsmith- Mar 18, 2022
d6268c9
Event flush
nsmith- Mar 22, 2022
f3dab1a
Simplify small buffer output
nsmith- Mar 23, 2022
1de023e
Implement get and put
nsmith- Mar 24, 2022
78b67bd
Implement basic product and event output
nsmith- Jun 1, 2022
2787f35
Migrate to using protobuf
nsmith- Jun 16, 2022
8582432
Functional S3Source
nsmith- Jun 24, 2022
3172651
Reset put offset!
nsmith- Jun 24, 2022
3573d8a
Should not have overrode it
nsmith- Jun 24, 2022
6f36afd
Need a virtual destructor to clean up lambdas
nsmith- Jun 24, 2022
cb71855
Make S3LibWrapper singleton, async per request
nsmith- Jun 27, 2022
12ac39d
Address some more review by @Dr15Jones
nsmith- Jun 27, 2022
e15781d
EventStripe output safer but more serial
nsmith- Jun 27, 2022
28a8f88
Make it faster!
nsmith- Jun 27, 2022
d705d54
Fanout buffer appending
nsmith- Jun 28, 2022
1f3adee
Fix small bug in product offsets
nsmith- Jun 28, 2022
3951ccd
Non-blocking product stripe retrieval
nsmith- Jun 29, 2022
46fb3ae
Implement zstd compression
nsmith- Jun 29, 2022
6b10954
Steal WaitingTaskList from cmssw
nsmith- Jun 29, 2022
e5aaa54
A somewhat working async implementation
nsmith- Jul 12, 2022
80cacb1
Merge branch 'master' of github.com:hep-cce2/root_serialization into …
nsmith- Jul 12, 2022
4affc33
Use task_group::defer for final task
nsmith- Jul 12, 2022
891d88a
Use task_group::defer also for S3 stuff
nsmith- Jul 12, 2022
8c77385
bugs squashed
nsmith- Jul 13, 2022
5867843
No need to schedule puts now that they don't block
nsmith- Jul 13, 2022
c47512b
Migrate S3 callbacks to FunctorTask
nsmith- Jul 13, 2022
33e2f1d
Let source use arena
nsmith- Jul 13, 2022
918bbbb
Track max concurrent requests
nsmith- Jul 13, 2022
204af1e
Add event stripe compression
nsmith- Jul 14, 2022
083f1b9
Nicer object naming
nsmith- Jul 14, 2022
11941e7
Merge branch 'master' of github.com:hep-cce2/root_serialization into …
nsmith- Jul 15, 2022
a2e02a1
fix uninitialized var
nsmith- Jul 15, 2022
4483ebb
Add some docs
nsmith- Aug 22, 2022
d0df761
Implement lzma compression
nsmith- Jan 23, 2023
d33cca2
Implement proper backoff+retry for async S3 requests
nsmith- Apr 19, 2023
b584f9a
Switch to HTTPS
nsmith- Apr 19, 2023
0cee12f
Implement S3Source product prefetching
nsmith- May 1, 2023
1d78b7e
Request timing log (to remove0
nsmith- May 1, 2023
8829875
Implement "fire and forget" event stripe flushing
nsmith- May 2, 2023
009a51e
Implement product groups
nsmith- May 3, 2023
75f4be0
Working product group IO
nsmith- May 4, 2023
7eff47f
Properly wait on fire-and-forget
nsmith- May 4, 2023
3904af9
Link libcurl and leave stubs for eventual SSL implementation
nsmith- Sep 22, 2023
5369e3d
Fix empty object bug by copying string for output buffer
nsmith- Sep 22, 2023
3ae8140
Stubs for prefetching objects on input (disabled)
nsmith- Sep 22, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED True)

#set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g3")
add_compile_definitions(TBB_USE_THREADING_TOOLS=1)

find_package(ROOT REQUIRED COMPONENTS Core RIO Tree)
find_package(TBB REQUIRED)
find_package(zstd REQUIRED)
find_package(lz4 REQUIRED)
find_package(LibLZMA REQUIRED)

set(THREADS_PREFER_PTHREAD_FLAG ON)
find_package(Threads REQUIRED)
Expand Down Expand Up @@ -84,6 +86,7 @@ add_executable(threaded_io_test
outputerFactoryGenerator.cc
WaiterFactory.cc
waiterFactoryGenerator.cc
WaitingTaskList.cc
ScaleWaiter.cc
EventSleepWaiter.cc
EventUnevenSleepWaiter.cc
Expand All @@ -99,6 +102,7 @@ target_compile_definitions(threaded_io_test PUBLIC TBB_PREVIEW_TASK_GROUP_EXTENS

target_link_libraries(threaded_io_test
PRIVATE LZ4::lz4
LibLZMA::LibLZMA
ROOT::Core
ROOT::RIO
ROOT::Tree
Expand Down Expand Up @@ -188,3 +192,26 @@ if(ENABLE_HDF5)
add_test(NAME TestProductsHDFEvent COMMAND bash -c "${CMAKE_CURRENT_BINARY_DIR}/threaded_io_test -s TestProductsSource -t 1 -n 10 -o HDFEventOutputer=test_prod_e.h5")
#; ${CMAKE_CURRENT_BINARY_DIR}/threaded_io_test -s HDFSource=test_prodi_e.h5 -t 1 -n 10 -o TestProductsOutputer")
endif()

option(ENABLE_S3 "Build S3 Sources and Outputers" OFF) # default OFF
if(ENABLE_S3)
if(NOT DEFINED LIBS3_DIR)
message(FATAL_ERROR "You must provide LIBS3_DIR variable")
endif()
find_package(CURL REQUIRED)
find_package(Protobuf REQUIRED)
include_directories(${Protobuf_INCLUDE_DIRS})
include_directories(${CMAKE_CURRENT_BINARY_DIR})
protobuf_generate_cpp(PROTO_SRCS PROTO_HDRS objectstripe.proto)
target_link_libraries(threaded_io_test PRIVATE ${Protobuf_LIBRARIES})
target_sources(threaded_io_test PRIVATE
S3Outputer.cc
S3Source.cc
S3Common.cc
${PROTO_SRCS}
)
target_include_directories(threaded_io_test PRIVATE ${LIBS3_DIR}/include)
target_link_directories(threaded_io_test PRIVATE ${LIBS3_DIR}/lib)
target_link_libraries(threaded_io_test PRIVATE s3 curl)
# add_test(NAME S3OutputerEmptyTest COMMAND threaded_io_test EmptySource 1 1 0 10 S3Outputer)
endif()
5 changes: 5 additions & 0 deletions ConfigurationParameters.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ namespace cce::tf {
return std::stof(iValue);
}

template<>
unsigned long ConfigurationParameters::convert<unsigned long>(std::string const& iValue) {
return std::stoul(iValue);
}

template<>
bool ConfigurationParameters::convert<bool>(std::string const& iValue) {
return (iValue.empty()) or (
Expand Down
24 changes: 24 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,17 @@ This is similar to SharedRootEventSource except this time each entry in the `Eve
> threaded_io_test -s SharedRootBatchEventsSource=test.eroot -t 1 -n 10
```

#### S3Source
Reads individual data product _stripes_--compressed concatenated serialized data products--from an S3 server, along with the appropriate
event-level metadata to index them. See s3io.md for further details of the data layout.

- verbose (int): increase number to get more detail
- prefix (string): the object prefix in the S3 bucket
- conn (string): path to the S3 connection configuration file
```
> threaded_io_test -s S3Source=prefix=testproducts:conn=s3conn.ini -t 1 -n 10
```

### Outputers

#### DummyOutputer
Expand Down Expand Up @@ -311,6 +322,19 @@ or
> threaded_io_test -s ReplicatedRootSource=test.root -t 1 -n 10 -o RootBatchEventsOutputer=test.root:batchSize=4
```

#### S3Outputer
Outputs individual data product _stripes_--compressed concatenated serialized data products--to an S3 server, along with the appropriate
event-level metadata to index them. See s3io.md for further details of the data layout.

- verbose (int): increase number to get more detail
- prefix (string): the object prefix to use when storing data in the S3 bucket
- productFlush (int): the minimum number of (possibly compressed) bytes to accumulate in the product stripe output buffer before flushing it to S3
- eventFlush (int): the maximum number of events that can be contained in a single product stripe.
- conn (string): path to the S3 connection configuration file
```
> threaded_io_test -s TestProductsSource -t 1 -n 10 -o S3Outputer=prefix=testproducts:conn=s3conn.ini
```

### Waiters

#### ScaleWaiter
Expand Down
Loading