Skip to content

Commit 0228bc9

Browse files
authored
Merge pull request #20 from j0tunn/feat/prj.10
ДЗ-10. Асинхронная сетевая обработка команд
2 parents df6fc91 + 2d482e7 commit 0228bc9

21 files changed

+641
-1
lines changed

projects/10/CMakeLists.txt

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
cmake_minimum_required(VERSION 3.10)
2+
3+
include(${CMAKE_CURRENT_SOURCE_DIR}/../../common/CMakeLists.txt)
4+
set(CMAKE_CXX_STANDARD 20)
5+
6+
project(bulk_server VERSION ${PROJECT_VERSION})
7+
8+
set(Boost_USE_STATIC_LIBS ON)
9+
find_package(Boost REQUIRED system regex)
10+
11+
add_executable(${PROJECT_NAME}
12+
main.cpp
13+
cmd_processor.cpp
14+
bulk_reader.cpp
15+
bulk_router.cpp
16+
flush_observer.cpp
17+
stream_logger.cpp
18+
file_logger.cpp
19+
reader_state.cpp
20+
thread_logger.cpp
21+
)
22+
23+
target_link_libraries(${PROJECT_NAME} PRIVATE Boost::system Boost::regex)
24+
25+
# Package
26+
setupCPack(${PROJECT_NAME})

projects/10/bulk_reader.cpp

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
#include "bulk_reader.h"
2+
3+
using namespace std;
4+
5+
BulkReader::BulkReader(unsigned int bulkSize)
6+
: state_(new AutoModeState([this](unique_ptr<ReaderState>&& newState) { this->setNewState(move(newState)); }, bulkSize))
7+
{}
8+
9+
void BulkReader::addCmd(const string& cmd) {
10+
if (cmd == "{") {
11+
state_->startBulk();
12+
return;
13+
}
14+
15+
if (cmd == "}") {
16+
state_->finishBulk();
17+
return;
18+
}
19+
20+
state_->addCmd(cmd);
21+
}
22+
23+
void BulkReader::eof() {
24+
state_->eof();
25+
}
26+
27+
bool BulkReader::isInStaticMode() const {
28+
return state_->isAutoMode();
29+
}
30+
31+
void BulkReader::setNewState(unique_ptr<ReaderState>&& newState) {
32+
notifyFlush_(state_->getBulk());
33+
state_ = move(newState);
34+
}

projects/10/bulk_reader.h

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
#pragma once
2+
3+
#include <string>
4+
#include <vector>
5+
#include <memory>
6+
#include "flush_observer.h"
7+
#include "command.h"
8+
#include "reader_state.h"
9+
10+
class BulkReader : public FlushObservable {
11+
public:
12+
explicit BulkReader(unsigned int bulkSize);
13+
void addCmd(const std::string& cmd);
14+
void eof();
15+
bool isInStaticMode() const;
16+
17+
private:
18+
void setNewState(std::unique_ptr<ReaderState>&& state);
19+
20+
std::unique_ptr<ReaderState> state_;
21+
};

projects/10/bulk_router.cpp

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
#include "bulk_router.h"
2+
#include <utility>
3+
4+
using namespace std;
5+
6+
BulkRouter::BulkRouter(unsigned int bulkSize)
7+
: bulkSize_(bulkSize)
8+
, pStaticBulkReader_(nullptr)
9+
{
10+
}
11+
12+
void BulkRouter::addCmd(const string& cmd, unsigned long sessionId) {
13+
if (!addDynamicCmd(cmd, sessionId)) {
14+
addStaticCmd(cmd, sessionId);
15+
}
16+
}
17+
18+
bool BulkRouter::addDynamicCmd(const string& cmd, unsigned long sessionId) {
19+
auto itReader = dynamicBulkReaders_.find(sessionId);
20+
if (itReader == dynamicBulkReaders_.end()) {
21+
return false;
22+
}
23+
24+
auto& pReader = itReader->second;
25+
pReader->addCmd(cmd);
26+
27+
if (pReader->isInStaticMode()) {
28+
dynamicBulkReaders_.erase(itReader);
29+
}
30+
31+
return true;
32+
}
33+
34+
void BulkRouter::addStaticCmd(const string& cmd, unsigned long sessionId) {
35+
if (!pStaticBulkReader_) {
36+
pStaticBulkReader_ = make_unique<BulkReader>(bulkSize_);
37+
pStaticBulkReader_->addFlushObserver(this);
38+
}
39+
40+
pStaticBulkReader_->addCmd(cmd);
41+
42+
if (!pStaticBulkReader_->isInStaticMode()) {
43+
dynamicBulkReaders_.emplace(make_pair(sessionId, move(pStaticBulkReader_)));
44+
}
45+
}
46+
47+
void BulkRouter::eof() {
48+
if (pStaticBulkReader_) {
49+
pStaticBulkReader_->eof();
50+
}
51+
52+
for (auto& pair : dynamicBulkReaders_) {
53+
pair.second->eof();
54+
}
55+
}
56+
57+
void BulkRouter::onFlush(const vector<Command>& bulk) {
58+
notifyFlush_(bulk);
59+
};

projects/10/bulk_router.h

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
#pragma once
2+
3+
#include <string>
4+
#include <map>
5+
#include <memory>
6+
#include "flush_observer.h"
7+
#include "bulk_reader.h"
8+
9+
class BulkRouter : public FlushObservable, public IFlushObserver {
10+
public:
11+
explicit BulkRouter(unsigned int bulkSize);
12+
void addCmd(const std::string& cmd, unsigned long sessionId);
13+
void eof();
14+
15+
void onFlush(const std::vector<Command>& bulk) override;
16+
17+
private:
18+
void addStaticCmd(const std::string& cmd, unsigned long sessionId);
19+
bool addDynamicCmd(const std::string& cmd, unsigned long sessionId);
20+
21+
unsigned int bulkSize_;
22+
std::unique_ptr<BulkReader> pStaticBulkReader_;
23+
std::map<unsigned long, std::unique_ptr<BulkReader> > dynamicBulkReaders_;
24+
};

projects/10/cmd_processor.cpp

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
#include "cmd_processor.h"
2+
#include "stream_logger.h"
3+
#include "file_logger.h"
4+
#include <iostream>
5+
6+
using namespace std;
7+
8+
CmdProcessor::CmdProcessor(unsigned int bulkSize)
9+
: bulkRouter_(bulkSize)
10+
, consoleLogger_(1, []() { return new StreamLogger(cout); })
11+
, fileLogger_(2, []() { return new FileLogger(filesystem::current_path()); })
12+
{
13+
bulkRouter_.addFlushObserver(&consoleLogger_);
14+
bulkRouter_.addFlushObserver(&fileLogger_);
15+
}
16+
17+
CmdProcessor::~CmdProcessor() {
18+
bulkRouter_.eof();
19+
}
20+
21+
void CmdProcessor::handleCmd(const string& cmd, unsigned long sessionId) {
22+
bulkRouter_.addCmd(cmd, sessionId);
23+
}

projects/10/cmd_processor.h

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
#pragma once
2+
3+
#include "bulk_router.h"
4+
#include "thread_logger.h"
5+
#include <string>
6+
7+
class CmdProcessor {
8+
public:
9+
CmdProcessor(unsigned int bulkSize);
10+
~CmdProcessor();
11+
12+
void handleCmd(const std::string& cmd, unsigned long sessionId);
13+
14+
private:
15+
BulkRouter bulkRouter_;
16+
ThreadLogger consoleLogger_;
17+
ThreadLogger fileLogger_;
18+
};

projects/10/command.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
#pragma once
2+
3+
#include <string>
4+
#include <ctime>
5+
6+
struct Command {
7+
explicit Command(const std::string& cmd)
8+
: val(cmd)
9+
, timestamp(std::time(0))
10+
{}
11+
12+
Command(const Command&) = default;
13+
14+
std::string val;
15+
std::time_t timestamp;
16+
};

projects/10/file_logger.cpp

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
#include <sstream>
2+
#include <fstream>
3+
#include <thread>
4+
#include "file_logger.h"
5+
#include "stream_logger.h"
6+
7+
using namespace std;
8+
9+
FileLogger::FileLogger(const filesystem::path& dir)
10+
: logDir_(dir)
11+
{}
12+
13+
void FileLogger::onFlush(const vector<Command>& bulk) {
14+
if (bulk.size() == 0) {
15+
return;
16+
}
17+
18+
stringstream fileName;
19+
fileName << "bulk" << bulk[0].timestamp << "_" << this_thread::get_id() << ".log";
20+
21+
ofstream out(logDir_ / fileName.str());
22+
StreamLogger subLogger(out);
23+
subLogger.onFlush(bulk);
24+
}

projects/10/file_logger.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
#pragma once
2+
3+
#include <string>
4+
#include <filesystem>
5+
#include "flush_observer.h"
6+
7+
class FileLogger : public IFlushObserver {
8+
public:
9+
FileLogger(const std::filesystem::path& dir);
10+
void onFlush(const std::vector<Command>& bulk) override;
11+
12+
private:
13+
const std::filesystem::path logDir_;
14+
};

0 commit comments

Comments
 (0)