Skip to content
This repository was archived by the owner on Mar 13, 2020. It is now read-only.

Commit 25a64e4

Browse files
committed
add support for (a) bigint for execution_time_ms (b) rows processed for each execution-step
1 parent 62727ad commit 25a64e4

File tree

8 files changed

+43
-21
lines changed

8 files changed

+43
-21
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ $ python -m dpo [options] <db-connection-string> <command> [command-args]
2929
- `previous-execution-id`: identifier of an existing execution, ideally as returned by the `get-last-successful-execution` command.
3030
- `complete-step`: Marks the completion of an existing execution's step. Returns nothing unless there's an error.
3131
- `step-id`: a GUID identifier of an existing execution's step as returned by the `init-step` command.
32+
- `rows-processed`: an optional numeric value to indicate the number of rows processed during this step. supports a postgresql BIGINT type value.
3233
- `complete-execution`: Marks the completion of an existing execution. Returns nothing unless there's an error.
3334
- `execution-id`: a GUID identifier of an existing execution as returned by the `init-execution` command.
3435

dpo/DataPipelineOrchestrator.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ def __process_compare_step_models_command(self):
4646
).execute()
4747

4848
def __process_complete_step_command(self):
49-
CompleteStepCommand(self.args.db_connection_string, self.args.step_id).execute()
49+
CompleteStepCommand(self.args.db_connection_string, self.args.step_id, self.args.rows_processed).execute()
5050

5151
def __process_complete_execution_command(self):
5252
CompleteExecutionCommand(self.args.db_connection_string, self.args.execution_id).execute()
@@ -137,6 +137,15 @@ def __get_arguments(self):
137137
'step_id',
138138
metavar='step-id',
139139
help='an execution\'s step id as received using \'init-step\' command')
140+
complete_step_command_parser.add_argument(
141+
'-r', '--rows-processed',
142+
action='store',
143+
const=None,
144+
default=None,
145+
type=int,
146+
nargs='?',
147+
help='An optional numeric value to indicate the number of rows processed during this step. '
148+
'Supports a PostgreSQL BIGINT type value.')
140149

141150
complete_execution_command_parser = subparsers.add_parser(
142151
'complete-execution', help='completes the given execution.')

dpo/DataRepository.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ def save_execution_step_models(self, execution_step_id, model_checksums):
9999

100100
return execution_step
101101

102-
def complete_execution_step(self, execution_step_id):
102+
def complete_execution_step(self, execution_step_id, rows_processed):
103103
session = self.session_maker()
104104

105105
execution_step = session.query(ExecutionStepEntity) \
@@ -110,6 +110,7 @@ def complete_execution_step(self, execution_step_id):
110110
execution_step.completed_on = self.get_current_db_datetime_with_timezone()
111111
execution_step.execution_time_ms \
112112
= (execution_step.completed_on - execution_step.started_on).total_seconds() * 1000
113+
execution_step.rows_processed = rows_processed
113114

114115
session.commit()
115116

dpo/alembic/versions/c5c34dd0b8f2_refactor_schema_to_support_steps_and_.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ def upgrade():
3333
sa.Column('started_on', sa.DateTime(timezone=True),
3434
server_default=sa.text('now()'), nullable=False),
3535
sa.Column('completed_on', sa.DateTime(timezone=True), nullable=True),
36-
sa.Column('execution_time_ms', sa.Integer(), nullable=True),
36+
sa.Column('execution_time_ms', sa.BigInteger(), nullable=True),
3737
sa.PrimaryKeyConstraint('execution_id'),
3838
schema='dpo'
3939
)
@@ -49,7 +49,8 @@ def upgrade():
4949
sa.Column('started_on', sa.DateTime(timezone=True), server_default=sa.text('now()'),
5050
nullable=False),
5151
sa.Column('completed_on', sa.DateTime(timezone=True), nullable=True),
52-
sa.Column('execution_time_ms', sa.Integer(), nullable=True),
52+
sa.Column('execution_time_ms', sa.BigInteger(), nullable=True),
53+
sa.Column('rows_processed', sa.BigInteger(), nullable=True),
5354
sa.ForeignKeyConstraint(['execution_id'], ['dpo.execution.execution_id'], ),
5455
sa.PrimaryKeyConstraint('execution_step_id'),
5556
schema='dpo'
@@ -83,7 +84,7 @@ def upgrade():
8384
'''
8485
INSERT INTO dpo.execution_step (
8586
execution_step_id, execution_id, created_on, updated_on,
86-
step_name, status, started_on, completed_on, execution_time_ms
87+
step_name, status, started_on, completed_on, execution_time_ms, rows_processed
8788
)
8889
SELECT
8990
uuid_generate_v4() AS execution_step_id
@@ -95,6 +96,7 @@ def upgrade():
9596
, (select min(sm.created_on) from dpo.fe9eed6d812f_execution_model sm where sm.execution_id = e.id and sm.type = m.type) AS started_on
9697
, NULL AS completed_on
9798
, NULL AS execution_time_ms
99+
, NULL AS rows_processed
98100
FROM dpo.fe9eed6d812f_execution e
99101
JOIN dpo.fe9eed6d812f_execution_model m on e.id = m.execution_id
100102
GROUP BY e.id, e.status, e.created_on, m.type

dpo/commands/CompleteStepCommand.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,11 @@
22

33

44
class CompleteStepCommand(BaseCommand):
5-
def __init__(self, db_connection_string, step_id, logger=None):
5+
def __init__(self, db_connection_string, step_id, rows_processed, logger=None):
66
super().__init__(db_connection_string, logger)
77
self._step_id = step_id
8+
self._rows_processed = rows_processed
89

910
def execute(self):
10-
execution_step = self.repository.complete_execution_step(self._step_id)
11+
execution_step = self.repository.complete_execution_step(self._step_id, self._rows_processed)
1112
self.logger.debug('Completed Execution Step: ' + str(execution_step))

dpo/entities/execution_entity.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from sqlalchemy import Column, DateTime, Integer, String
1+
from sqlalchemy import Column, DateTime, BigInteger, String
22
from sqlalchemy.sql import func
33
from sqlalchemy.dialects.postgresql import UUID
44
import uuid
@@ -44,7 +44,7 @@ class ExecutionEntity(Shared.BaseEntity):
4444
nullable=True)
4545

4646
execution_time_ms = Column('execution_time_ms',
47-
Integer,
47+
BigInteger,
4848
nullable=True)
4949

5050
def __str__(self):

dpo/entities/execution_step_entity.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from sqlalchemy import Column, DateTime, Integer, String, ForeignKey
1+
from sqlalchemy import Column, DateTime, BigInteger, String, ForeignKey
22
from sqlalchemy.sql import func
33
from sqlalchemy.dialects.postgresql import UUID
44
import uuid
@@ -57,9 +57,13 @@ class ExecutionStepEntity(Shared.BaseEntity):
5757
nullable=True)
5858

5959
execution_time_ms = Column('execution_time_ms',
60-
Integer,
60+
BigInteger,
6161
nullable=True)
6262

63+
rows_processed = Column('rows_processed',
64+
BigInteger,
65+
nullable=True)
66+
6367
def __str__(self):
6468
return f'execution_step_id={self.execution_step_id}, ' \
6569
f'created_on={self.created_on}, ' \
@@ -69,4 +73,5 @@ def __str__(self):
6973
f'status={self.status}, ' \
7074
f'started_on={self.started_on}, ' \
7175
f'completed_on={self.completed_on}, ' \
72-
f'execution_time_ms={self.execution_time_ms}.'
76+
f'execution_time_ms={self.execution_time_ms}, ' \
77+
f'rows_processed={self.rows_processed}.'

tests/integration/test_integration.sh

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -118,8 +118,9 @@ CompareStepModels () {
118118

119119
CompleteStep () {
120120
local stepId=$1
121+
local rowsProcessed=$2
121122

122-
$dpo $dpo_conn_str complete-step $stepId
123+
$dpo $dpo_conn_str complete-step $stepId --rows-processed $rowsProcessed
123124
}
124125

125126
CompleteExecution () {
@@ -134,10 +135,12 @@ CompleteExecution () {
134135
ExecuteAndAssert () {
135136
# Arrange
136137
local iter_no=$1
137-
local expected_lastSuccessfulExecId=$2
138-
local expected_changedLoadModels=$3
139-
local expected_changedTransformModels=$4
140-
local __resultvar=$5
138+
local loadRowsProcessed=$2
139+
local transformRowsProcessed=$3
140+
local expected_lastSuccessfulExecId=$4
141+
local expected_changedLoadModels=$5
142+
local expected_changedTransformModels=$6
143+
local __resultvar=$7
141144

142145
# Act
143146
local execId=$(InitialiseExecution)
@@ -153,13 +156,13 @@ ExecuteAndAssert () {
153156
echo " iter$iter_no loadStepId = $loadStepId"
154157
local changedLoadModels=$(CompareStepModels $loadStepId $lastSuccessfulExecId)
155158
echo " iter$iter_no changedLoadModels = '$changedLoadModels'"
156-
CompleteStep $loadStepId
159+
CompleteStep $loadStepId $loadRowsProcessed
157160

158161
local transformStepId=$(InitialiseStep $execId TRANSFORM $transformModelDirectory)
159162
echo " iter$iter_no transformStepId = $transformStepId"
160163
local changedTransformModels=$(CompareStepModels $transformStepId $lastSuccessfulExecId)
161164
echo " iter$iter_no changedTransformModels = '$changedTransformModels'"
162-
CompleteStep $transformStepId
165+
CompleteStep $transformStepId $transformRowsProcessed
163166

164167
CompleteExecution $execId
165168

@@ -209,7 +212,7 @@ echo "transform_model_3" > "$transformModelDirectory/$transform_model_3.sql"
209212
iter1_expected_lastSuccessfulExecId="" # pass in empty string to skip test since we don't know the past state of the pipeline
210213
iter1_expected_changedLoadModels="$load_model_1 $load_model_2"
211214
iter1_expected_changedTransformModels="$transform_model_1 $transform_model_2 $transform_model_3"
212-
ExecuteAndAssert "1" "$iter1_expected_lastSuccessfulExecId" "$iter1_expected_changedLoadModels" "$iter1_expected_changedTransformModels" iter1_execId
215+
ExecuteAndAssert "1" "2147483647" "" "$iter1_expected_lastSuccessfulExecId" "$iter1_expected_changedLoadModels" "$iter1_expected_changedTransformModels" iter1_execId
213216

214217
###############
215218
# Execution 2 #
@@ -232,4 +235,4 @@ echo "transform_model_4" > "$transformModelDirectory/$transform_model_4.sql"
232235
iter2_expected_lastSuccessfulExecId="$iter1_execId"
233236
iter2_expected_changedLoadModels=""
234237
iter2_expected_changedTransformModels="$transform_model_2 $transform_model_4"
235-
ExecuteAndAssert "2" "$iter2_expected_lastSuccessfulExecId" "$iter2_expected_changedLoadModels" "$iter2_expected_changedTransformModels" iter2_execId
238+
ExecuteAndAssert "2" "9223372036854775807" "" "$iter2_expected_lastSuccessfulExecId" "$iter2_expected_changedLoadModels" "$iter2_expected_changedTransformModels" iter2_execId

0 commit comments

Comments
 (0)