29
29
#include " ../queue.hpp"
30
30
#include " src/python/pythonnoderesources.hpp"
31
31
#include " src/llm/servable.hpp"
32
+ #include " src/assert.hpp"
32
33
33
34
#include " mediapipe/framework/calculator_graph.h"
34
35
#include " mediapipe/framework/port/status.h"
@@ -42,16 +43,13 @@ const std::string LLM_SESSION_SIDE_PACKET_NAME = "llm";
42
43
namespace ovms {
43
44
44
45
std::shared_ptr<GraphHelper> constructGraphHelper (const ::mediapipe::CalculatorGraphConfig& config, PythonNodeResourcesMap& pythonNodeResourcesMap, GenAiServableMap& genAiServableMap) {
45
- auto gh = std::make_shared<GraphHelper>();
46
46
SPDLOG_TRACE (" Constructing GraphHelper():{}" , (void *)gh.get ());
47
- gh->graph = std::make_shared<::mediapipe::CalculatorGraph>();
48
- gh->currentTimestamp = ::mediapipe::Timestamp (0 );
49
-
47
+ auto gh = std::make_shared<GraphHelper>();
50
48
auto absStatus = gh->graph ->Initialize (config);
51
49
if (!absStatus.ok ()) {
52
50
SPDLOG_ERROR (" Failed to initialize graph issue:{}" , absStatus.ToString ());
53
51
// This would mean validation did execute fully
54
- assert (true );
52
+ ASSERT_ALWAYS (true );
55
53
}
56
54
for (auto & name : config.output_stream ()) {
57
55
std::string streamName = getStreamName (name);
@@ -86,13 +84,13 @@ std::shared_ptr<GraphHelper> constructGraphHelper(const ::mediapipe::CalculatorG
86
84
void GraphQueue::restoreStream (int streamId) {
87
85
if (streamId < inferRequests.size ()) {
88
86
SPDLOG_ERROR (" Cannot restore stream id > queue length" );
89
- assert (streamId < inferRequests.size ());
87
+ ASSERT_ALWAYS (streamId < inferRequests.size ());
90
88
}
91
89
SPDLOG_TRACE (" Restoring graph helper id:{}" , streamId);
92
90
auto gh = constructGraphHelper (*this ->config , *this ->pythonNodeResourcesMap , *this ->genAiServableMap );
93
91
if (gh == nullptr ) {
94
92
SPDLOG_ERROR (" Failed to restore graph helper: {}" , streamId);
95
- assert (false );
93
+ ASSERT_ALWAYS (false );
96
94
}
97
95
inferRequests[streamId] = gh;
98
96
}
@@ -116,12 +114,12 @@ GraphQueue::GraphQueue(const ::mediapipe::CalculatorGraphConfig& config, std::sh
116
114
inferRequests.emplace_back (std::move (gh));
117
115
}
118
116
}
119
-
120
- GraphHelper::~GraphHelper () {
121
- SPDLOG_TRACE ( " GraphHelper wait until idle graph " );
117
+ void GraphHelper::closeGraph () {
118
+ SPDLOG_ERROR ( " ER " );
119
+ ASSERT_ALWAYS ( this -> graph . get () != nullptr );
122
120
auto absStatus = absl::OkStatus ();
123
121
if (this ->initialized ) {
124
- SPDLOG_ERROR (" Calling wait until idle" );
122
+ SPDLOG_ERROR (" Calling wait until idle graph:{} " , ( void *) this -> graph . get () );
125
123
absStatus = this ->graph ->WaitUntilIdle ();
126
124
}
127
125
if (!absStatus.ok ()) {
@@ -145,6 +143,16 @@ GraphHelper::~GraphHelper() {
145
143
// throw 42.2;
146
144
}
147
145
SPDLOG_ERROR (" ER" );
146
+
147
+ }
148
+
149
+ GraphHelper::GraphHelper () :
150
+ graph (std::make_shared<::mediapipe::CalculatorGraph>()),
151
+ currentTimestamp (::mediapipe::Timestamp(0 )) {}
152
+
153
+ GraphHelper::~GraphHelper () {
154
+ SPDLOG_TRACE (" GraphHelper wait until idle graph" );
155
+ closeGraph ();
148
156
this ->graph .reset ();
149
157
SPDLOG_ERROR (" ER ~GraphHelper:{}" , (void *) this );
150
158
}
@@ -155,31 +163,6 @@ GraphQueue::~GraphQueue() {
155
163
SPDLOG_TRACE (" GraphQueue wait until idle graph" );
156
164
graphHelper.reset ();
157
165
SPDLOG_ERROR (" ER" );
158
- continue ;
159
- auto absStatus = graphHelper->graph ->WaitUntilIdle ();
160
- if (!absStatus.ok ()) {
161
- SPDLOG_ERROR (" ER issue:{} {}" , absStatus.ToString (), (void *)this );
162
- // throw 42.2;
163
- }
164
- absStatus = graphHelper->graph ->CloseAllPacketSources ();
165
- if (!absStatus.ok ()) {
166
- SPDLOG_ERROR (" ER issue:{} {}" , absStatus.ToString (), (void *)this );
167
- // throw "as";
168
- }
169
- SPDLOG_TRACE (" GraphQueue wait until done graph" );
170
- absStatus = graphHelper->graph ->WaitUntilDone ();
171
- if (!absStatus.ok ()) {
172
- SPDLOG_ERROR (" ER issue:{} {}" , absStatus.ToString (), (void *)this );
173
- // throw 42.2;
174
- }
175
- graphHelper->graph ->Cancel ();
176
- if (!absStatus.ok ()) {
177
- SPDLOG_ERROR (" ER issue:{} {}" , absStatus.ToString (), (void *)this );
178
- // throw 42.2;
179
- }
180
- SPDLOG_ERROR (" ER" );
181
- graphHelper->graph .reset ();
182
- SPDLOG_ERROR (" ER" );
183
166
}
184
167
SPDLOG_ERROR (" ER ~GraphQueue:{}" , (void *)this );
185
168
}
0 commit comments