Skip to content

Commit cf0a7c1

Browse files
committed
missing file
1 parent 40bc738 commit cf0a7c1

File tree

1 file changed

+289
-0
lines changed

1 file changed

+289
-0
lines changed
Lines changed: 289 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,289 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#pragma once
18+
19+
#include <folly/Conv.h>
20+
#include <folly/ProducerConsumerQueue.h>
21+
#include <folly/ThreadLocal.h>
22+
#include <folly/lang/Aligned.h>
23+
#include <folly/logging/xlog.h>
24+
#include <folly/synchronization/Latch.h>
25+
#include <folly/system/ThreadName.h>
26+
27+
#include "cachelib/cachebench/cache/Cache.h"
28+
#include "cachelib/cachebench/util/Exceptions.h"
29+
#include "cachelib/cachebench/util/Parallel.h"
30+
#include "cachelib/cachebench/util/Request.h"
31+
#include "cachelib/cachebench/workload/ReplayGeneratorBase.h"
32+
33+
namespace facebook {
34+
namespace cachelib {
35+
namespace cachebench {
36+
37+
// BinaryKVReplayGenerator generates the cachelib requests based on the
38+
// requests read from the given binary trace file made with KVReplayGenerator
39+
// In order to minimize the contentions for the request submission queues
40+
// which might need to be dispatched by multiple stressor threads,
41+
// the requests are sharded to each stressor by doing hashing over the key.
42+
class BinaryKVReplayGenerator : public ReplayGeneratorBase {
43+
public:
44+
explicit BinaryKVReplayGenerator(const StressorConfig& config)
45+
: ReplayGeneratorBase(config), binaryStream_(config) {
46+
for (uint32_t i = 0; i < numShards_; ++i) {
47+
stressorCtxs_.emplace_back(std::make_unique<StressorCtx>(i));
48+
std::string_view s{"abc"};
49+
requestPtr_.emplace_back(
50+
new Request(s, reinterpret_cast<size_t*>(0), OpType::kGet, 0));
51+
}
52+
53+
folly::Latch latch(1);
54+
genWorker_ = std::thread([this, &latch] {
55+
folly::setThreadName("cb_replay_gen");
56+
genRequests(latch);
57+
});
58+
59+
latch.wait();
60+
61+
XLOGF(INFO,
62+
"Started BinaryKVReplayGenerator (amp factor {}, # of stressor "
63+
"threads {}, fast foward {})",
64+
ampFactor_, numShards_, fastForwardCount_);
65+
}
66+
67+
virtual ~BinaryKVReplayGenerator() {
68+
XCHECK(shouldShutdown());
69+
if (genWorker_.joinable()) {
70+
genWorker_.join();
71+
}
72+
}
73+
74+
// getReq generates the next request from the trace file.
75+
const Request& getReq(
76+
uint8_t,
77+
std::mt19937_64&,
78+
std::optional<uint64_t> lastRequestId = std::nullopt) override;
79+
80+
void renderStats(uint64_t, std::ostream& out) const override {
81+
out << std::endl << "== BinaryKVReplayGenerator Stats ==" << std::endl;
82+
83+
out << folly::sformat("{}: {:.2f} million (parse error: {})",
84+
"Total Processed Samples",
85+
(double)parseSuccess.load() / 1e6, parseError.load())
86+
<< std::endl;
87+
}
88+
89+
void notifyResult(uint64_t requestId, OpResultType result) override;
90+
91+
void markFinish() override { getStressorCtx().markFinish(); }
92+
93+
private:
94+
// Interval at which the submission queue is polled when it is either
95+
// full (producer) or empty (consumer).
96+
// We use polling with the delay since the ProducerConsumerQueue does not
97+
// support blocking read or writes with a timeout
98+
static constexpr uint64_t checkIntervalUs_ = 100;
99+
static constexpr size_t kMaxRequests =
100+
500000000; // just stores pointers to mmap'd data
101+
102+
using ReqQueue = folly::ProducerConsumerQueue<BinaryRequest*>;
103+
104+
// StressorCtx keeps track of the state including the submission queues
105+
// per stressor thread. Since there is only one request generator thread,
106+
// lock-free ProducerConsumerQueue is used for performance reason.
107+
// Also, separate queue which is dispatched ahead of any requests in the
108+
// submission queue is used for keeping track of the requests which need to be
109+
// resubmitted (i.e., a request having remaining repeat count); there could
110+
// be more than one requests outstanding for async stressor while only one
111+
// can be outstanding for sync stressor
112+
struct StressorCtx {
113+
explicit StressorCtx(uint32_t id)
114+
: id_(id), reqQueue_(std::in_place_t{}, kMaxRequests) {}
115+
116+
bool isFinished() { return finished_.load(std::memory_order_relaxed); }
117+
void markFinish() { finished_.store(true, std::memory_order_relaxed); }
118+
119+
uint32_t id_{0};
120+
// std::queue<std::unique_ptr<BinaryReqWrapper>> resubmitQueue_;
121+
std::queue<BinaryRequest*> resubmitQueue_;
122+
folly::cacheline_aligned<ReqQueue> reqQueue_;
123+
// Thread that finish its operations mark it here, so we will skip
124+
// further request on its shard
125+
std::atomic<bool> finished_{false};
126+
};
127+
128+
// Used to assign stressorIdx_
129+
std::atomic<uint32_t> incrementalIdx_{0};
130+
131+
// A sticky index assigned to each stressor threads that calls into
132+
// the generator.
133+
folly::ThreadLocalPtr<uint32_t> stressorIdx_;
134+
135+
// Vector size is equal to the # of stressor threads;
136+
// stressorIdx_ is used to index.
137+
std::vector<std::unique_ptr<StressorCtx>> stressorCtxs_;
138+
139+
// Pointer to request object used to carry binary
140+
// request data
141+
std::vector<Request*> requestPtr_;
142+
143+
// Class that holds a vector of pointers to the
144+
// binary data
145+
BinaryFileStream binaryStream_;
146+
147+
std::thread genWorker_;
148+
149+
// Used to signal end of file as EndOfTrace exception
150+
std::atomic<bool> eof{false};
151+
152+
// Stats
153+
std::atomic<uint64_t> parseError = 0;
154+
std::atomic<uint64_t> parseSuccess = 0;
155+
156+
void genRequests(folly::Latch& latch);
157+
158+
void setEOF() { eof.store(true, std::memory_order_relaxed); }
159+
bool isEOF() { return eof.load(std::memory_order_relaxed); }
160+
161+
inline StressorCtx& getStressorCtx(size_t shardId) {
162+
XCHECK_LT(shardId, numShards_);
163+
return *stressorCtxs_[shardId];
164+
}
165+
166+
inline StressorCtx& getStressorCtx() {
167+
if (!stressorIdx_.get()) {
168+
stressorIdx_.reset(new uint32_t(incrementalIdx_++));
169+
}
170+
171+
return getStressorCtx(*stressorIdx_);
172+
}
173+
174+
inline Request* getRequestPtr(size_t shardId) {
175+
XCHECK_LT(shardId, numShards_);
176+
return requestPtr_[shardId];
177+
}
178+
179+
inline Request* getRequestPtr() {
180+
if (!stressorIdx_.get()) {
181+
stressorIdx_.reset(new uint32_t(incrementalIdx_++));
182+
}
183+
184+
return getRequestPtr(*stressorIdx_);
185+
}
186+
};
187+
188+
inline void BinaryKVReplayGenerator::genRequests(folly::Latch& latch) {
189+
bool init = true;
190+
uint64_t nreqs = 0;
191+
binaryStream_.setOffset(fastForwardCount_);
192+
auto begin = util::getCurrentTimeSec();
193+
while (!shouldShutdown()) {
194+
try {
195+
BinaryRequest* req = binaryStream_.getNextPtr();
196+
auto key = req->getKey(binaryStream_.getKeyOffset());
197+
XDCHECK_LT(req->op_, 11);
198+
nreqs++;
199+
auto shardId = getShard(key);
200+
auto& stressorCtx = getStressorCtx(shardId);
201+
auto& reqQ = *stressorCtx.reqQueue_;
202+
while (!reqQ.write(req) && !stressorCtx.isFinished() &&
203+
!shouldShutdown()) {
204+
// ProducerConsumerQueue does not support blocking, so use sleep
205+
if (init) {
206+
latch.count_down();
207+
init = false;
208+
}
209+
std::this_thread::sleep_for(
210+
std::chrono::microseconds{checkIntervalUs_});
211+
}
212+
if (nreqs >= preLoadReqs_ && init) {
213+
auto end = util::getCurrentTimeSec();
214+
double reqsPerSec = nreqs / (double)(end - begin);
215+
XLOGF(INFO, "Parse rate: {:.2f} reqs/sec", reqsPerSec);
216+
latch.count_down();
217+
init = false;
218+
}
219+
} catch (const EndOfTrace& e) {
220+
if (init) {
221+
latch.count_down();
222+
}
223+
break;
224+
}
225+
}
226+
227+
setEOF();
228+
}
229+
230+
const Request& BinaryKVReplayGenerator::getReq(uint8_t,
231+
std::mt19937_64&,
232+
std::optional<uint64_t>) {
233+
BinaryRequest* req = nullptr;
234+
auto& stressorCtx = getStressorCtx();
235+
auto& reqQ = *stressorCtx.reqQueue_;
236+
auto& resubmitQueue = stressorCtx.resubmitQueue_;
237+
XDCHECK_EQ(req, nullptr);
238+
239+
while (resubmitQueue.empty() && !reqQ.read(req)) {
240+
if (resubmitQueue.empty() && isEOF()) {
241+
throw cachelib::cachebench::EndOfTrace("Test stopped or EOF reached");
242+
}
243+
// ProducerConsumerQueue does not support blocking, so use sleep
244+
std::this_thread::sleep_for(std::chrono::microseconds{checkIntervalUs_});
245+
}
246+
247+
if (req == nullptr) {
248+
XCHECK(!resubmitQueue.empty());
249+
req = resubmitQueue.front();
250+
resubmitQueue.pop();
251+
}
252+
XDCHECK_NE(req, nullptr);
253+
XDCHECK_NE(reinterpret_cast<uint64_t>(req), 0);
254+
XDCHECK_LT(req->op_, 12);
255+
auto key = req->getKey(binaryStream_.getKeyOffset());
256+
OpType op;
257+
switch (req->op_) {
258+
case 1:
259+
op = OpType::kGet;
260+
break;
261+
case 2:
262+
op = OpType::kSet;
263+
break;
264+
case 3:
265+
op = OpType::kDel;
266+
break;
267+
}
268+
auto r = getRequestPtr();
269+
r->update(key,
270+
const_cast<size_t*>(reinterpret_cast<size_t*>(&req->valueSize_)),
271+
op,
272+
req->ttl_,
273+
reinterpret_cast<uint64_t>(req));
274+
return *r;
275+
}
276+
277+
void BinaryKVReplayGenerator::notifyResult(uint64_t requestId, OpResultType) {
278+
// requestId should point to the BinaryRequesat object.
279+
BinaryRequest* req = reinterpret_cast<BinaryRequest*>(requestId);
280+
if (req->repeats_ > 0) {
281+
req->repeats_ = req->repeats_ - 1;
282+
// need to insert into the queue again
283+
getStressorCtx().resubmitQueue_.emplace(req);
284+
}
285+
}
286+
287+
} // namespace cachebench
288+
} // namespace cachelib
289+
} // namespace facebook

0 commit comments

Comments
 (0)