Skip to content
This repository was archived by the owner on Mar 13, 2020. It is now read-only.

Commit 6995e52

Browse files
authored
Merge pull request #29 from pageuppeople-opensource/add-ability-to-accept-initial-execution-id
Add ability to accept initial execution id
2 parents 3e1a7b2 + fc25b8d commit 6995e52

File tree

6 files changed

+44
-16
lines changed

6 files changed

+44
-16
lines changed

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# Data Pipeline Orchestrator
22

3-
[![Build Status](https://travis-ci.com/PageUpPeopleOrg/data-pipeline-orchestrator.svg?branch=master)](https://travis-ci.com/PageUpPeopleOrg/data-pipeline-orchestrator)
3+
[![Build Status](https://travis-ci.com/pageuppeople-opensource/data-pipeline-orchestrator.svg?branch=master)](https://travis-ci.com/pageuppeople-opensource/data-pipeline-orchestrator)
44

55
A utility that persists state of a data pipeline execution and uses them to detect changes in models.
66

@@ -16,6 +16,7 @@ $ python -m dpo [options] <db-connection-string> <command> [command-args]
1616
- `db-connection-string`: a [PostgreSQL Db Connection String](http://docs.sqlalchemy.org/en/latest/dialects/postgresql.html#module-sqlalchemy.dialects.postgresql.psycopg2) of the format `postgresql+psycopg2://user:password@host:port/dbname`
1717
- `command` is the function to be performed by the utility. The currently supported values are:
1818
- `init-execution`: Marks the start of a new execution. Returns an `execution-id` which is a GUID identifier of the new execution.
19+
- `execution-id`: An optional GUID to use as the execution-id of new execution. Supports a PostgreSQL UUID type value. Throws an error if the GUID provided is already in use.
1920
- `get-last-successful-execution`: Finds the last successful execution. Returns an `execution-id` which is a GUID identifier of the new execution, if found; else returns and empty string.
2021
- `get-execution-completion-timestamp`: Returns the `last-updated-on` timestamp with timezone of the given `execution-id`. Raises error if given `execution-id` is invalid.
2122
- `execution-id`: a GUID identifier of an existing execution.

dpo/DataPipelineOrchestrator.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ def __init__(self, logger=None):
2626
self.args.func()
2727

2828
def __process_init_execution_command(self):
29-
InitialiseExecutionCommand(self.args.db_connection_string).execute()
29+
InitialiseExecutionCommand(self.args.db_connection_string, self.args.execution_id).execute()
3030

3131
def __process_get_last_successful_execution_command(self):
3232
GetLastSuccessfulExecutionCommand(self.args.db_connection_string).execute()
@@ -70,6 +70,16 @@ def __get_arguments(self):
7070
init_execution_command_parser = subparsers.add_parser(
7171
'init-execution', help='initialises a new execution')
7272
init_execution_command_parser.set_defaults(func=self.__process_init_execution_command)
73+
init_execution_command_parser.add_argument(
74+
'-id', '--execution-id',
75+
action='store',
76+
const=None,
77+
default=None,
78+
type=str,
79+
nargs='?',
80+
help='An optional GUID to use as the execution-id of new execution.'
81+
'Supports a PostgreSQL UUID type value.'
82+
'Throws an error if the GUID provided is already in use.')
7383

7484
get_last_successful_execution_command_parser = subparsers.add_parser(
7585
'get-last-successful-execution',

dpo/DataRepository.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,11 @@ def __init__(self, db_engine, logger=None):
1616
def get_current_db_datetime_with_timezone(self):
1717
return self.db_engine.execute(select([func.now()])).fetchone()[0]
1818

19-
def initialise_execution(self):
19+
def initialise_execution(self, execution_id=None):
2020
session = self.session_maker()
2121

2222
data_pipeline_execution = ExecutionEntity()
23+
data_pipeline_execution.execution_id = execution_id if execution_id is not None else data_pipeline_execution.execution_id
2324
session.add(data_pipeline_execution)
2425

2526
session.commit()

dpo/commands/InitialiseExecutionCommand.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,12 @@
22

33

44
class InitialiseExecutionCommand(BaseCommand):
5-
def __init__(self, db_connection_string, logger=None):
5+
def __init__(self, db_connection_string, execution_id=None, logger=None):
66
super().__init__(db_connection_string, logger)
7+
self._execution_id = execution_id
78

89
def execute(self):
9-
data_pipeline_execution = self.repository.initialise_execution()
10+
data_pipeline_execution = self.repository.initialise_execution(execution_id=self._execution_id)
1011
self.logger.debug('Initialised new data_pipeline_execution = ' + str(data_pipeline_execution))
1112
self.output(data_pipeline_execution.execution_id)
1213

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from setuptools import setup, find_packages
22

33
setup(name='dpo',
4-
version='0.1.1',
4+
version='0.1.6-beta',
55
packages=find_packages(),
66
install_requires=[
77
'psycopg2-binary==2.8.2',

tests/integration/test_integration.sh

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,13 @@ AssertAreEqual () {
5959
}
6060

6161
InitialiseExecution () {
62-
local executionId=$($dpo $dpo_conn_str init-execution)
62+
local providedExecutionId=$1
63+
if [ ! -z "$providedExecutionId" ]
64+
then
65+
local executionId=$($dpo $dpo_conn_str init-execution --execution-id "$providedExecutionId")
66+
else
67+
local executionId=$($dpo $dpo_conn_str init-execution)
68+
fi
6369

6470
if [ ${#executionId} != 36 ]
6571
then
@@ -135,15 +141,22 @@ CompleteExecution () {
135141
ExecuteAndAssert () {
136142
# Arrange
137143
local iter_no=$1
138-
local loadRowsProcessed=$2
139-
local transformRowsProcessed=$3
140-
local expected_lastSuccessfulExecId=$4
141-
local expected_changedLoadModels=$5
142-
local expected_changedTransformModels=$6
143-
local __resultvar=$7
144+
local providedExecutionId=$2
145+
local loadRowsProcessed=$3
146+
local transformRowsProcessed=$4
147+
local expected_lastSuccessfulExecId=$5
148+
local expected_changedLoadModels=$6
149+
local expected_changedTransformModels=$7
150+
local __resultvar=$8
144151

145152
# Act
146-
local execId=$(InitialiseExecution)
153+
if [ ! -z "$providedExecutionId" ]
154+
then
155+
local execId=$(InitialiseExecution $providedExecutionId)
156+
AssertAreEqual "Iteration $iter_no's Initialised ExecutionID" "$execId" "$providedExecutionId"
157+
else
158+
local execId=$(InitialiseExecution)
159+
fi
147160
echo " iter$iter_no execId = $execId"
148161

149162
local lastSuccessfulExecId=$(GetLastSuccessfulExecution)
@@ -192,6 +205,7 @@ transform_model_4="transform_model_4_$(NewUUID)"
192205
echo -e "\nBeginning execution #1"
193206

194207
# ARRANGE
208+
iter1_execution_id=""
195209
## Remove test models' directory, if exists
196210
rm -rf $modelDirectory
197211

@@ -212,14 +226,15 @@ echo "transform_model_3" > "$transformModelDirectory/$transform_model_3.sql"
212226
iter1_expected_lastSuccessfulExecId="" # pass in empty string to skip test since we don't know the past state of the pipeline
213227
iter1_expected_changedLoadModels="$load_model_1 $load_model_2"
214228
iter1_expected_changedTransformModels="$transform_model_1 $transform_model_2 $transform_model_3"
215-
ExecuteAndAssert "1" "2147483647" "" "$iter1_expected_lastSuccessfulExecId" "$iter1_expected_changedLoadModels" "$iter1_expected_changedTransformModels" iter1_execId
229+
ExecuteAndAssert "1" "$iter1_execution_id" "2147483647" "" "$iter1_expected_lastSuccessfulExecId" "$iter1_expected_changedLoadModels" "$iter1_expected_changedTransformModels" iter1_execId
216230

217231
###############
218232
# Execution 2 #
219233
###############
220234
echo -e "\nBeginning execution #2"
221235

222236
# ARRANGE
237+
iter2_execution_id=$(NewUUID)
223238
echo " Making no changes to any LOAD models"
224239

225240
echo " Modifying transform_model_2"
@@ -235,4 +250,4 @@ echo "transform_model_4" > "$transformModelDirectory/$transform_model_4.sql"
235250
iter2_expected_lastSuccessfulExecId="$iter1_execId"
236251
iter2_expected_changedLoadModels=""
237252
iter2_expected_changedTransformModels="$transform_model_2 $transform_model_4"
238-
ExecuteAndAssert "2" "9223372036854775807" "" "$iter2_expected_lastSuccessfulExecId" "$iter2_expected_changedLoadModels" "$iter2_expected_changedTransformModels" iter2_execId
253+
ExecuteAndAssert "2" "$iter2_execution_id" "9223372036854775807" "" "$iter2_expected_lastSuccessfulExecId" "$iter2_expected_changedLoadModels" "$iter2_expected_changedTransformModels" iter2_execId

0 commit comments

Comments
 (0)