Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions ci/build_test_OnCommit.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ pipeline {
stage("Release image and tests in parallel") {
when { expression { image_build_needed == "true" } }
parallel {
stage("Run unit tests") {
/*stage("Run unit tests") {
agent {
label "${agent_name_linux}"
}
Expand All @@ -132,7 +132,7 @@ pipeline {
}
}
}
}
}*/
stage("Internal tests") {
agent {
label "${agent_name_linux}"
Expand All @@ -152,7 +152,7 @@ pipeline {
}
}
}
stage('Test windows') {
/*stage('Test windows') {
agent {
label "${agent_name_windows}"
}
Expand All @@ -176,7 +176,7 @@ pipeline {
}
}
}
}
}*/
}
}
}
Expand Down
36 changes: 31 additions & 5 deletions src/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,29 @@ cc_library(
visibility = ["//visibility:public",],
linkopts = [],
)
cc_library(
name = "mediapipe_internal_graphqueue",
hdrs = [
"mediapipe_internal/graphqueue.hpp",
"mediapipe_internal/outputstreamobserver.hpp",
], # TODO FIXME
srcs = ["mediapipe_internal/graphqueue.cpp"],
deps = [
"libovms_queue",
"libovmslogging",
"execution_context",
"libovmstimer",
"libovmsmetrics",
"model_metric_reporter",
"//third_party:openvino",
"@mediapipe//mediapipe/framework:calculator_graph",
"//src/python:libovmspythonmodule", # TODO not splitted
"//src/llm:genai_servables", # TODO split!
],
copts = [],
visibility = ["//visibility:public",],
linkopts = [],
)
cc_library(
name = "libovms_ovinferrequestsqueue",
hdrs = ["ovinferrequestsqueue.hpp"],
Expand Down Expand Up @@ -361,7 +384,7 @@ cc_library(
srcs = ["model_metric_reporter.cpp"],
deps = [
"libovmsmodelversion",
"libovms_execution_context",
"execution_context",
"libovmslogging",
"libovmsmetrics",
],
Expand Down Expand Up @@ -571,6 +594,7 @@ cc_library(
"mediapipe_internal/mediapipegraphconfig.cpp",
"mediapipe_internal/mediapipegraphdefinition.cpp",
"mediapipe_internal/mediapipegraphdefinition.hpp",
"mediapipe_internal/outputstreamobserver.hpp",
"mediapipe_internal/mediapipegraphexecutor.cpp",
"mediapipe_internal/mediapipegraphexecutor.hpp",
"mediapipe_internal/packettypes.hpp",
Expand Down Expand Up @@ -631,7 +655,7 @@ cc_library(
"libovms_dags_nodesessionresult",
"libovms_dags_nodeinputhandler",
"custom_node_output_allocator",
"libovms_execution_context",
"execution_context",
"executingstreamidguard",
"libovmsfilesystem",
"libovmsfilesystemfactory_h",
Expand Down Expand Up @@ -695,6 +719,7 @@ cc_library(
})
+ select({
"//conditions:default": [
"mediapipe_internal_graphqueue",
"@mediapipe_calculators//:mediapipe_calculators", # Need this dependencies here because we use ovms/src - cannot add in ovms_dependencies because we copy src directory later in Dockerfile
"@mediapipe//mediapipe/graphs/holistic_tracking:holistic_tracking_to_render_data",
"@mediapipe//mediapipe/graphs/iris_tracking:iris_tracking_cpu_deps",
Expand Down Expand Up @@ -1842,7 +1867,7 @@ cc_library(
],
deps = [
"libovmslogging",
"libovms_execution_context",
"execution_context",
"libovms_dags_session_id",
],
visibility = ["//visibility:public"],
Expand Down Expand Up @@ -2252,7 +2277,7 @@ cc_library(
linkopts = LINKOPTS_ADJUSTED,
)
cc_library(
name = "libovms_execution_context",
name = "execution_context",
hdrs = ["execution_context.hpp",],
deps = [],
visibility = ["//visibility:public"],
Expand Down Expand Up @@ -2349,7 +2374,7 @@ cc_library(
deps = [
"libovmslogging",
"libovmsmodelversionstatus",
"libovms_execution_context",
"execution_context",
"libovms_dags_aliases",
"libovms_dags_pipelineeventqueue",
"libovms_dags_session_id",
Expand Down Expand Up @@ -2902,6 +2927,7 @@ cc_test(
],
}),
copts = COPTS_TESTS,
local_defines = COMMON_LOCAL_DEFINES,
)

cc_library(
Expand Down
3 changes: 3 additions & 0 deletions src/kfs_frontend/kfs_graph_executor_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -925,6 +925,7 @@ static Status createPacketAndPushIntoGraph(const std::string& name, std::shared_
}
std::unique_ptr<T> inputTensor;
OVMS_RETURN_ON_FAIL(deserializeTensor(name, *request, inputTensor, pythonBackend));
SPDLOG_ERROR("Current Timestamp before actual pushing:{}", timestamp.Value());
MP_RETURN_ON_FAIL(graph.AddPacketToInputStream(
name,
::mediapipe::packet_internal::Create(
Expand Down Expand Up @@ -1040,8 +1041,10 @@ static Status deserializeTimestampIfAvailable(
return status;
}
} else {
SPDLOG_ERROR("Current Timestamp before setting:{}", timestamp.Value());
auto now = std::chrono::system_clock::now();
timestamp = ::mediapipe::Timestamp(std::chrono::duration_cast<std::chrono::microseconds>(now.time_since_epoch()).count());
SPDLOG_ERROR("Current Timestamp setting:{}", timestamp.Value());
}
return StatusCode::OK;
}
Expand Down
3 changes: 2 additions & 1 deletion src/logging.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ std::shared_ptr<spdlog::logger> rerank_calculator_logger = std::make_shared<spdl
#if (OV_TRACE == 1)
std::shared_ptr<spdlog::logger> ov_logger = std::make_shared<spdlog::logger>("openvino");
#endif
const std::string default_pattern = "[%Y-%m-%d %T.%e][%t][%n][%l][%s:%#] %v";
//const std::string default_pattern = "[%i] [%Y-%m-%d %T.%f][%t][%n][%l][%s:%#] %v";
const std::string default_pattern = "[%Y-%m-%d %T.%f][%t][%n][%l][%s:%#] %v";

static void set_log_level(const std::string log_level, std::shared_ptr<spdlog::logger> logger) {
logger->set_level(spdlog::level::info);
Expand Down
120 changes: 120 additions & 0 deletions src/mediapipe_internal/graphqueue.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
//*****************************************************************************
// Copyright 2025 Intel Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//*****************************************************************************
#include "graphqueue.hpp"

#include <atomic>
#include <condition_variable>
#include <future>
#include <memory>
#include <mutex>
#include <optional>
#include <queue>
#include <thread>
#include <utility>
#include <vector>

#include "../queue.hpp"
#include "src/python/pythonnoderesources.hpp"
#include "src/llm/servable.hpp"

#include "mediapipe/framework/calculator_graph.h"
#include "mediapipe/framework/port/status.h"

#include "outputstreamobserver.hpp"
namespace {
//const ::mediapipe::Timestamp STARTING_TIMESTAMP = ::mediapipe::Timestamp(0); // TODO @atobisze common
const std::string PYTHON_SESSION_SIDE_PACKET_NAME = "py";
const std::string LLM_SESSION_SIDE_PACKET_NAME = "llm";
} // namespace
namespace ovms {
GraphQueue::GraphQueue(const ::mediapipe::CalculatorGraphConfig& config, std::shared_ptr<PythonNodeResourcesMap> pythonNodeResourcesMap, std::shared_ptr<GenAiServableMap> genAiServableMap, int streamsLength) :
Queue(streamsLength),
pythonNodeResourcesMap(pythonNodeResourcesMap),
genAiServableMap(genAiServableMap) {
SPDLOG_ERROR("ER Constr graph queue:{}", (void*)this);
inferRequests.reserve(streamsLength);
// TODO FIXME split constructor to init to handle retCodes?
for (auto i = 0; i < streamsLength; ++i) {
auto gh = std::make_shared<GraphHelper>();
gh->graph = std::make_shared<::mediapipe::CalculatorGraph>();
gh->currentTimestamp = ::mediapipe::Timestamp(0);

auto absStatus = gh->graph->Initialize(config);
if (!absStatus.ok()) {
SPDLOG_ERROR("ER issue:{} {}", absStatus.ToString(), (void*)this);
throw 42;
}
for (auto& name : config.output_stream()) {
std::string streamName = getStreamName(name);
gh->outStreamObservers[streamName] = std::shared_ptr<OutputStreamObserverI>(new NullOutputStreamObserver()); // TODO use at() FIXME
auto& perGraphObserverFunctor = gh->outStreamObservers[streamName];
absStatus = gh->graph->ObserveOutputStream(streamName, [&perGraphObserverFunctor](const ::mediapipe::Packet& packet) -> absl::Status { return perGraphObserverFunctor->handlePacket(packet); }); // TODO FIXME throw?
if (!absStatus.ok()) {
SPDLOG_ERROR("ER issue:{} {}", absStatus.ToString(), (void*)this);
throw 42;
}
}
std::map<std::string, mediapipe::Packet> inputSidePackets;
inputSidePackets[PYTHON_SESSION_SIDE_PACKET_NAME] = mediapipe::MakePacket<PythonNodeResourcesMap>(*pythonNodeResourcesMap)
.At(STARTING_TIMESTAMP);
inputSidePackets[LLM_SESSION_SIDE_PACKET_NAME] = mediapipe::MakePacket<GenAiServableMap>(*genAiServableMap).At(STARTING_TIMESTAMP);
for (auto [k, v] : inputSidePackets) {
SPDLOG_ERROR("k:{} v", k);
}
SPDLOG_ERROR("ER");
absStatus = gh->graph->StartRun(inputSidePackets);
SPDLOG_ERROR("ER");
if (!absStatus.ok()) {
SPDLOG_ERROR("Input sidePackets size:{}, python map size:{} key:{} side packet name:{}", inputSidePackets.size(), pythonNodeResourcesMap->size(), pythonNodeResourcesMap->begin()->first, PYTHON_SESSION_SIDE_PACKET_NAME);
SPDLOG_ERROR("ER issue:{} {}", absStatus.ToString(), (void*)this);
throw 42;
}

SPDLOG_ERROR("ER");
inferRequests.emplace_back(std::move(gh));
SPDLOG_ERROR("ER");
}
}
GraphQueue::~GraphQueue() {
SPDLOG_ERROR("ER Destroy graph queue:{}", (void*)this);
for (auto& graphHelper : inferRequests) {
auto absStatus = graphHelper->graph->WaitUntilIdle();
if (!absStatus.ok()) {
SPDLOG_ERROR("ER issue:{} {}", absStatus.ToString(), (void*)this);
// throw 42.2;
}
absStatus = graphHelper->graph->CloseAllPacketSources();
if (!absStatus.ok()) {
SPDLOG_ERROR("ER issue:{} {}", absStatus.ToString(), (void*)this);
// throw "as";
}
absStatus = graphHelper->graph->WaitUntilDone();
if (!absStatus.ok()) {
SPDLOG_ERROR("ER issue:{} {}", absStatus.ToString(), (void*)this);
// throw 42.2;
}
graphHelper->graph->Cancel();
if (!absStatus.ok()) {
SPDLOG_ERROR("ER issue:{} {}", absStatus.ToString(), (void*)this);
// throw 42.2;
}
SPDLOG_ERROR("ER");
graphHelper->graph.reset();
SPDLOG_ERROR("ER");
}
SPDLOG_ERROR("ER Destroy graph queue:{}", (void*)this);
}
} // namespace ovms
93 changes: 93 additions & 0 deletions src/mediapipe_internal/graphqueue.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
//*****************************************************************************
// Copyright 2025 Intel Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//*****************************************************************************
#pragma once

#include <atomic>
#include <condition_variable>
#include <future>
#include <memory>
#include <mutex>
#include <optional>
#include <queue>
#include <thread>
#include <utility>
#include <vector>

#include "../queue.hpp"
#include "src/python/pythonnoderesources.hpp"
#include "src/llm/servable.hpp"

#include "mediapipe/framework/calculator_graph.h"
#include "mediapipe/framework/port/status.h"

#include "outputstreamobserver.hpp"
// TODO FIXME HEADERS
const ::mediapipe::Timestamp STARTING_TIMESTAMP = ::mediapipe::Timestamp(0); // TODO @atobisze common
const std::string PYTHON_SESSION_SIDE_PACKET_TAG = "PYTHON_NODE_RESOURCES";
const std::string LLM_SESSION_SIDE_PACKET_TAG = "LLM_NODE_RESOURCES";
namespace ovms {
class OutputStreamObserverI;
class NullOutputStreamObserver;
struct GraphHelper {
std::shared_ptr<::mediapipe::CalculatorGraph> graph; // TODO FIXME this does not have to be shared_ptr
std::unordered_map<std::string, std::shared_ptr<OutputStreamObserverI>> outStreamObservers;
::mediapipe::Timestamp currentTimestamp; // TODO FIXME const
// TODO FIXME move constr/=
GraphHelper() = default;
GraphHelper(const GraphHelper&) = delete;
GraphHelper& operator=(const GraphHelper&) = delete;
GraphHelper(GraphHelper&& gh) :
graph(std::move(gh.graph)),
outStreamObservers(std::move(gh.outStreamObservers)),
currentTimestamp(gh.currentTimestamp) {}
GraphHelper& operator=(GraphHelper&& gh) = default;
};
// we need to keep Graph alive during MP reload hence shared_ptr
//class GraphQueue : public Queue<std::shared_ptr<::mediapipe::CalculatorGraph>> {
class GraphQueue : public Queue<std::shared_ptr<GraphHelper>> {
std::shared_ptr<PythonNodeResourcesMap> pythonNodeResourcesMap;
std::shared_ptr<GenAiServableMap> genAiServableMap;

public:
GraphQueue(const ::mediapipe::CalculatorGraphConfig& config, std::shared_ptr<PythonNodeResourcesMap> pythonNodeResourcesMap, std::shared_ptr<GenAiServableMap> genAiServableMap, int streamsLength);
~GraphQueue();
};

struct GraphIdGuard {
std::weak_ptr<GraphQueue> weakQueue;
const int id;
std::shared_ptr<GraphHelper> gh;
// TODO FIXME shared_ptr
::mediapipe::CalculatorGraph& graph;
GraphIdGuard(std::shared_ptr<GraphQueue>& queue) :
weakQueue(queue),
id(queue->getIdleStream().get()),
gh((queue->getInferRequest(id))),
graph(*gh->graph) {
SPDLOG_ERROR("ER Guard construct this:{}", (void*)this);
}
GraphIdGuard(GraphIdGuard&&) = default;
GraphIdGuard(const GraphIdGuard&) = delete;
~GraphIdGuard() {
auto existingQueue = weakQueue.lock();
SPDLOG_ERROR("ER DEstroy Guard begin qu:{}", (void*)existingQueue.get());
if (existingQueue)
existingQueue->returnStream(this->id);
SPDLOG_ERROR("ER Destroy Guard end qu:{}", (void*)existingQueue.get());
SPDLOG_ERROR("ER Guard destroy this:{}", (void*)this);
}
};
} // namespace ovms
Loading