Skip to content

Commit 6716c19

Browse files
authored
Fix bug where a node can be stopped when it hasn't started (#164)
Signed-off-by: Adam Glustein <[email protected]>
1 parent aa3e18a commit 6716c19

File tree

12 files changed

+259
-20
lines changed

12 files changed

+259
-20
lines changed

cpp/cmake/modules/FindCSP.cmake

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# CSP_BASELIB_STATIC_LIBRARY
1414
# CSP_BASKETLIB_LIBRARY
1515
# CSP_BASKETLIB_STATIC_LIBRARY
16+
# CSP_TESTLIB_LIBRARY
1617
# CSP_MATH_LIBRARY
1718
# CSP_MATH_STATIC_LIBRARY
1819
# CSP_STATS_LIBRARY
@@ -74,6 +75,8 @@ find_library(CSP_BASELIB_STATIC_LIBRARY NAMES libbaselibimpl_static.a PATHS "${_
7475
find_library(CSP_BASKETLIB_LIBRARY NAMES _cspbasketlibimpl.so PATHS "${__csp_lib_path}" NO_DEFAULT_PATH)
7576
find_library(CSP_BASKETLIB_STATIC_LIBRARY NAMES libbasketlibimpl_static.a PATHS "${__csp_lib_path}" NO_DEFAULT_PATH)
7677

78+
find_library(CSP_TESTLIB_LIBRARY NAMES _csptestlibimpl.so PATHS "${__csp_lib_path}" NO_DEFAULT_PATH)
79+
7780
find_library(CSP_MATH_LIBRARY NAMES _cspmathimpl.so PATHS "${__csp_lib_path}" NO_DEFAULT_PATH)
7881
find_library(CSP_MATH_STATIC_LIBRARY NAMES libmathimpl_static.a PATHS "${__csp_lib_path}" NO_DEFAULT_PATH)
7982

cpp/csp/engine/AdapterManager.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ ManagedSimInputAdapter::ManagedSimInputAdapter( csp::Engine * engine,
1313
{
1414
}
1515

16-
AdapterManager::AdapterManager( csp::Engine * engine ) : m_engine( engine ), m_statusAdapter( nullptr )
16+
AdapterManager::AdapterManager( csp::Engine * engine ) : m_engine( engine ), m_statusAdapter( nullptr ), m_started( false )
1717
{
1818
if( !m_engine -> isRootEngine() )
1919
CSP_THROW( NotImplemented, "AdapterManager support is not currently available in dynamic graphs" );

cpp/csp/engine/AdapterManager.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,9 @@ class AdapterManager : public EngineOwned
118118

119119
DateTime starttime() const { return m_starttime; }
120120
DateTime endtime() const { return m_endtime; }
121+
122+
void setStarted() { m_started = true; }
123+
bool started() const { return m_started; }
121124

122125
StatusAdapter *createStatusAdapter( CspTypePtr &type, PushMode pushMode );
123126
void pushStatus( int64_t level, int64_t errCode, const std::string &errMsg, PushBatch *batch = nullptr ) const;
@@ -134,6 +137,7 @@ class AdapterManager : public EngineOwned
134137
DateTime m_starttime;
135138
DateTime m_endtime;
136139
StatusAdapter * m_statusAdapter;
140+
bool m_started;
137141
};
138142

139143
inline void AdapterManager::scheduleTimerCB( DateTime next )

cpp/csp/engine/Consumer.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ namespace csp
77

88
Consumer::Consumer( Engine * engine ) : m_engine( engine ),
99
m_next( nullptr ),
10-
m_rank( -1 )
10+
m_rank( -1 ),
11+
m_started( false )
1112
{
1213
}
1314

cpp/csp/engine/Consumer.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ class Consumer
3434
DateTime now() const { return rootEngine() -> now(); }
3535
uint64_t cycleCount() const { return rootEngine() -> cycleCount(); }
3636

37+
void setStarted() { m_started = true; }
38+
bool started() const { return m_started; }
39+
3740
//called when input timeseries has an event, schedules in
3841
//step propagation. See if we can do better than virtual per tick...
3942
virtual void handleEvent( InputId id )
@@ -126,6 +129,7 @@ class Consumer
126129
Consumer * m_next;
127130

128131
int32_t m_rank;
132+
bool m_started;
129133
};
130134

131135
};

cpp/csp/engine/Engine.cpp

Lines changed: 38 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -156,24 +156,36 @@ void Engine::start()
156156
auto start = std::max( m_rootEngine -> now(), m_rootEngine -> startTime() );
157157
auto end = m_rootEngine -> endTime();
158158

159-
160159
//start up managers
161160
for( auto & manager : m_adapterManagers )
161+
{
162162
manager -> start( start, end );
163+
manager -> setStarted();
164+
}
163165

164166
//start up output adapters
165167
for( auto & adapter : m_outputAdapters )
168+
{
166169
adapter -> start();
170+
adapter -> setStarted();
171+
}
167172

168173
for( auto & entry : m_graphOutputs )
169174
{
170-
if( entry.second -> engine() == this )
171-
entry.second -> start();
175+
auto & graphOutputAdapter = entry.second;
176+
if( graphOutputAdapter -> engine() == this )
177+
{
178+
graphOutputAdapter -> start();
179+
graphOutputAdapter -> setStarted();
180+
}
172181
}
173182

174183
//start up input adapters
175184
for( auto & adapter : m_inputAdapters )
185+
{
176186
adapter -> start( start, end );
187+
adapter -> setStarted();
188+
}
177189

178190
//see registerOwnedObject( AdapterManager ) above, we register adapter managers with root. At this point we dont
179191
//need the list of mgrs created in a dynamic engine anymore, so we clear out the mem ( and effetively take them out of the stop() list for dynamic shutdown )
@@ -182,28 +194,45 @@ void Engine::start()
182194

183195
//startup nodes
184196
for( auto & node : m_nodes )
197+
{
185198
node -> start();
199+
node -> setStarted();
200+
}
186201
}
187202

188203
void Engine::stop()
189204
{
205+
// Ensure we only stop nodes/adapters that have started in the case an exception occurs during startup
190206
for( auto & node : m_nodes )
191-
node -> stop();
207+
{
208+
if( node -> started() )
209+
node -> stop();
210+
}
192211

193212
for( auto & adapter : m_inputAdapters )
194-
adapter -> stop();
213+
{
214+
if( adapter -> started() )
215+
adapter -> stop();
216+
}
195217

196218
for( auto & entry : m_graphOutputs )
197219
{
198-
if( entry.second -> engine() == this )
199-
entry.second -> stop();
220+
auto & graphOutputAdapter = entry.second;
221+
if( graphOutputAdapter -> started() && graphOutputAdapter -> engine() == this )
222+
graphOutputAdapter -> stop();
200223
}
201224

202225
for( auto & adapter : m_outputAdapters )
203-
adapter -> stop();
226+
{
227+
if( adapter -> started() )
228+
adapter -> stop();
229+
}
204230

205231
for( auto & manager : m_adapterManagers )
206-
manager -> stop();
232+
{
233+
if( manager -> started() )
234+
manager -> stop();
235+
}
207236
}
208237

209238
}

cpp/csp/engine/InputAdapter.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@ namespace csp
55
{
66

77
InputAdapter::InputAdapter( Engine *engine, const CspTypePtr &type, PushMode pushMode ) : m_rootEngine( engine -> rootEngine() ),
8-
m_pushMode( pushMode )
8+
m_pushMode( pushMode ),
9+
m_started( false )
910
{
1011
if( pushMode == PushMode::BURST )
1112
init( CspArrayType::create( type ) );

cpp/csp/engine/InputAdapter.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ class InputAdapter : public TimeSeriesProvider, public EngineOwned
3535

3636
PushMode pushMode() const { return m_pushMode; }
3737

38+
void setStarted() { m_started = true; }
39+
bool started() const { return m_started; }
40+
3841
//if adapter is BURST this will return the type of the data, rather than the BURST vector<Data>
3942
const CspType * dataType() const
4043
{
@@ -46,6 +49,7 @@ class InputAdapter : public TimeSeriesProvider, public EngineOwned
4649
protected:
4750
RootEngine * m_rootEngine;
4851
PushMode m_pushMode;
52+
bool m_started;
4953
};
5054

5155
template<typename T>

cpp/csp/python/CMakeLists.txt

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,14 +93,18 @@ target_link_libraries(cspmathimpl cspimpl mathimpl)
9393
add_library(cspstatsimpl SHARED cspstatsimpl.cpp)
9494
target_link_libraries(cspstatsimpl cspimpl statsimpl)
9595

96+
## Testlib c++ module
97+
add_library(csptestlibimpl SHARED csptestlibimpl.cpp)
98+
target_link_libraries(csptestlibimpl cspimpl)
99+
96100
## NumPy stats c++ module
97101
add_library(npstatsimpl STATIC npstatsimpl.cpp)
98102
add_library(cspnpstatsimpl SHARED cspnpstatsimpl.cpp)
99103
target_link_libraries(cspnpstatsimpl cspimpl npstatsimpl)
100104
target_include_directories(npstatsimpl PRIVATE ${NUMPY_INCLUDE_DIRS})
101105
target_include_directories(cspnpstatsimpl PRIVATE ${NUMPY_INCLUDE_DIRS})
102106

103-
install(TARGETS csptypesimpl cspimpl cspbaselibimpl cspbasketlibimpl cspmathimpl cspstatsimpl cspnpstatsimpl
107+
install(TARGETS csptypesimpl cspimpl cspbaselibimpl cspbasketlibimpl cspmathimpl cspstatsimpl csptestlibimpl cspnpstatsimpl
104108
PUBLIC_HEADER DESTINATION include/csp/python
105109
RUNTIME DESTINATION bin/
106110
LIBRARY DESTINATION lib/

cpp/csp/python/csptestlibimpl.cpp

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
#include <Python.h>
2+
#include <csp/engine/CppNode.h>
3+
#include <csp/python/Conversions.h>
4+
#include <csp/python/InitHelper.h>
5+
#include <csp/python/PyCppNode.h>
6+
#include <csp/python/PyObjectPtr.h>
7+
8+
namespace csp::cppnodes
9+
{
10+
11+
// Expose C++ nodes for testing in Python
12+
// Keep nodes for a specific test to a single namespace under testing
13+
14+
namespace testing
15+
{
16+
17+
namespace stop_start_test
18+
{
19+
20+
using namespace csp::python;
21+
22+
void setStatus( const DialectGenericType & obj_, const std::string & name )
23+
{
24+
PyObjectPtr obj = PyObjectPtr::own( toPython( obj_ ) );
25+
PyObjectPtr attr = PyObjectPtr::own( PyUnicode_FromString( name.c_str() ) );
26+
PyObject_SetAttr( obj.get(), attr.get(), Py_True );
27+
}
28+
29+
DECLARE_CPPNODE( start_n1_set_value )
30+
{
31+
INIT_CPPNODE( start_n1_set_value ) {}
32+
33+
SCALAR_INPUT( DialectGenericType, obj_ );
34+
35+
START()
36+
{
37+
setStatus( obj_, "n1_started" );
38+
}
39+
INVOKE() {}
40+
41+
STOP()
42+
{
43+
setStatus( obj_, "n1_stopped" );
44+
}
45+
};
46+
EXPORT_CPPNODE( start_n1_set_value );
47+
48+
DECLARE_CPPNODE( start_n2_throw )
49+
{
50+
INIT_CPPNODE( start_n2_throw ) {}
51+
52+
SCALAR_INPUT( DialectGenericType, obj_ );
53+
54+
START()
55+
{
56+
CSP_THROW( ValueError, "n2 start failed" );
57+
}
58+
INVOKE() {}
59+
60+
STOP()
61+
{
62+
setStatus( obj_, "n2_stopped" );
63+
}
64+
};
65+
EXPORT_CPPNODE( start_n2_throw );
66+
67+
}
68+
69+
}
70+
71+
}
72+
73+
// Test nodes
74+
REGISTER_CPPNODE( csp::cppnodes::testing::stop_start_test, start_n1_set_value );
75+
REGISTER_CPPNODE( csp::cppnodes::testing::stop_start_test, start_n2_throw );
76+
77+
static PyModuleDef _csptestlibimpl_module = {
78+
PyModuleDef_HEAD_INIT,
79+
"_csptestlibimpl",
80+
"_csptestlibimpl c++ module",
81+
-1,
82+
NULL, NULL, NULL, NULL, NULL
83+
};
84+
85+
PyMODINIT_FUNC PyInit__csptestlibimpl(void)
86+
{
87+
PyObject* m;
88+
89+
m = PyModule_Create( &_csptestlibimpl_module);
90+
if( m == NULL )
91+
return NULL;
92+
93+
if( !csp::python::InitHelper::instance().execute( m ) )
94+
return NULL;
95+
96+
return m;
97+
}

0 commit comments

Comments
 (0)