Skip to content
This repository was archived by the owner on Mar 13, 2020. It is now read-only.

Commit 2f4782f

Browse files
authored
Merge pull request #1 from PageUpPeopleOrg/feature/OSC-967-ability-to-start-new-execution
[OSC-967] - Add ability to start new data pipeline execution
2 parents 3abb5f1 + 8ecce45 commit 2f4782f

14 files changed

+431
-1
lines changed

.gitignore

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,3 +102,9 @@ venv.bak/
102102

103103
# mypy
104104
.mypy_cache/
105+
106+
# Visual Studio Code
107+
.vscode/
108+
109+
# JetBrains
110+
.idea/

README.md

Lines changed: 119 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,119 @@
1-
# model-change-detector
1+
# Model Change Detector
2+
3+
## About
4+
5+
A utility that detects changes in models.
6+
7+
## Usage
8+
9+
```commandline
10+
py mcd.py <command> <db-connection-string> [--help] [--log-level]
11+
```
12+
13+
- `command` is the function to be performed by the utility. The currently supported values are
14+
- `START`: Marks the start of a new execution by creating a record for the same in the given database and returns an ID of the new execution.
15+
- `db-connection-string` is a [PostgreSQL Db Connection String](http://docs.sqlalchemy.org/en/latest/dialects/postgresql.html#module-sqlalchemy.dialects.postgresql.psycopg2) of the format `postgresql+psycopg2://user:password@host:port/dbname`
16+
17+
### As a script
18+
19+
- Use a local isolated/virtual python environment for this project
20+
- Install project dependencies
21+
- `py mcd.py <command> <db-connection-string> [--help] [--log-level]`
22+
23+
_Windows example:_
24+
25+
```commandline
26+
py -m venv new-env --clear
27+
new-env\scripts\activate
28+
29+
py -m pip install -r requirements.txt
30+
31+
py mcd.py START postgresql+psycopg2://user:password@host:port/dbname
32+
```
33+
34+
### As a package
35+
36+
- Use/create an empty directory
37+
- Use a local isolated/virtual python environment for this project
38+
- [Install](https://pip.pypa.io/en/stable/reference/pip_install/#editable-installs) this package
39+
- `py -m mcd <command> <db-connection-string> [--help] [--log-level]`
40+
41+
_Windows example:_
42+
43+
```commandline
44+
mkdir new-dir
45+
cd new-dir
46+
47+
py -m venv new-env --clear
48+
new-env\scripts\activate
49+
50+
pip install -e path\to\model-change-detector
51+
52+
py -m mcd START postgresql+psycopg2://user:password@host:port/dbname
53+
```
54+
55+
## Setup
56+
57+
1. Install pre-requisites
58+
2. Use a local isolated/virtual python environment for this project
59+
3. Install project dependencies
60+
4. Develop and test code changes
61+
5. Once done, deactivate the virtual environment
62+
63+
### Install pre-requisites
64+
65+
#### Python 3
66+
67+
Install from [here](https://www.python.org/) _(or your choice of safe software provider)_. During installation, choose the option to _Add to PATH_ and _Custom Installation_ so you can provide the install path to not be the default `C:\Program Files\Python37\` and be outside the `Program Files` directory to say, `C:\Python37\`. This is just a suggestion since there have been issues with updating key python packages once Python is installed within `Program Files` on Windows.
68+
69+
Verify your installation by running the below commands.
70+
71+
```powershell
72+
py --version
73+
python --version
74+
pip --version
75+
```
76+
77+
If you end up with multiple accidental/purposeful python installations, use the below in Windows Commandline to figure out where the executables are located.
78+
79+
```cmd
80+
where py
81+
where python
82+
where pip
83+
```
84+
85+
### Use a local isolated/virtual python environment for this project
86+
87+
`py -m venv /path/to/new/virtual/environment` _e.g._ `py -m venv new-env`
88+
89+
If you build with `--system-site-packages` directory, your virtual environment will be allowed access to packages from your global site-packages directory. Although, if you want isolation from the global system, do not use this flag. Once you've created a new environment, you need to activate the same.
90+
91+
On Windows:
92+
93+
`path\to\environment\scripts\activate` _e.g._ `new-env\scripts\activate`
94+
95+
On Linux / Mac OS
96+
97+
`source path/to/environment/bin/activate` _e.g._ `source new-env/bin/activate`
98+
99+
You should see the name of your virtual environment in brackets on your terminal line, e.g.:
100+
```
101+
C:\path\to\working\dir: new-env\scripts\activate
102+
(new-env) C:\path\to\working\dir: _
103+
```
104+
Any python commands you use will now, work within your virtual environment only.
105+
106+
### Install project dependencies
107+
108+
```powershell
109+
pip install -r requirements.txt
110+
```
111+
112+
### Deactivate the virtual environment
113+
114+
Once done, deactivate the virtual environment with a simple `decativate` command, e.g.:
115+
116+
```commandline
117+
(new-env) C:\path\to\working\dir: deactivate
118+
C:\path\to\working\dir: _
119+
```

mcd.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
from modules.ModelChangeDetector import ModelChangeDetector
2+
3+
if __name__ == "__main__":
4+
ModelChangeDetector().main()

modules/BaseObject.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
import logging
2+
3+
4+
class BaseObject(object):
5+
def __init__(self, logger=None):
6+
self.logger = logger or logging.getLogger(self.__module__ + '.' + self.__class__.__qualname__)
7+
self.logger.debug(self.logger)
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
from sqlalchemy import Column, DateTime, Integer, String
2+
from sqlalchemy.sql import func
3+
from sqlalchemy.ext.declarative import declarative_base
4+
from sqlalchemy.dialects.postgresql import UUID
5+
import uuid
6+
from modules.Shared import Constants
7+
8+
Base = declarative_base()
9+
10+
11+
class DataPipelineExecutionEntity(Base):
12+
13+
__tablename__ = 'data_pipeline_execution'
14+
__table_args__ = {'schema': Constants.DATA_PIPELINE_EXECUTION_SCHEMA_NAME}
15+
16+
id = Column('id',
17+
UUID(as_uuid=True),
18+
primary_key=True,
19+
default=uuid.uuid4())
20+
21+
created_on = Column('created_on',
22+
DateTime(timezone=True),
23+
nullable=False,
24+
server_default=func.now())
25+
26+
last_updated_on = Column('last_updated_on',
27+
DateTime(timezone=True),
28+
nullable=False,
29+
server_default=func.now(),
30+
onupdate=func.now())
31+
32+
# unused for now, for future use
33+
execution_time_ms = Column('execution_time_ms',
34+
Integer,
35+
nullable=True)
36+
37+
status = Column('status',
38+
String(50),
39+
nullable=False,
40+
server_default=str(Constants.DataPipelineExecutionStatus.STARTED))
41+
42+
def __str__(self):
43+
return f'id={self.id}, ' \
44+
f'created_on={self.created_on}, ' \
45+
f'last_updated_on={self.last_updated_on}, ' \
46+
f'execution_time_ms={self.execution_time_ms}, ' \
47+
f'status={self.status}.'
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
from sqlalchemy import desc
2+
from modules.DataPipelineExecutionEntity import DataPipelineExecutionEntity, Base
3+
from modules.BaseObject import BaseObject
4+
from modules.Shared import Constants
5+
6+
7+
class DataPipelineExecutionRepository(BaseObject):
8+
def __init__(self, session_maker, logger=None):
9+
super().__init__(logger)
10+
self.session_maker = session_maker
11+
12+
def create_schema(self, engine):
13+
engine.execute(f'CREATE SCHEMA IF NOT EXISTS {Constants.DATA_PIPELINE_EXECUTION_SCHEMA_NAME}')
14+
Base.metadata.create_all(engine)
15+
16+
def start_new(self):
17+
session = self.session_maker()
18+
data_pipeline_execution = DataPipelineExecutionEntity()
19+
session.add(data_pipeline_execution)
20+
session.commit()
21+
return data_pipeline_execution
22+
23+
def get_last_successful_data_load_execution(self):
24+
session = self.session_maker()
25+
return session.query(DataPipelineExecutionEntity)\
26+
.filter_by(status=Constants.DataPipelineExecutionStatus.COMPLETED_SUCCESSFULLY)\
27+
.order_by(desc(DataPipelineExecutionEntity.created_on))\
28+
.order_by(desc(DataPipelineExecutionEntity.last_updated_on))\
29+
.first()
30+

modules/ModelChangeDetector.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
import argparse
2+
import logging
3+
from modules import Shared
4+
from modules.Shared import Constants
5+
from modules.commands import Commands
6+
from modules.commands.CommandFactory import CommandFactory
7+
from modules.BaseObject import BaseObject
8+
9+
10+
class ModelChangeDetector(BaseObject):
11+
12+
_commandNames = [Commands.get_name(Commands.START)]
13+
14+
def __init__(self, logger=None):
15+
self.args = self.get_arguments()
16+
Shared.configure_root_logger(self.args.log_level)
17+
18+
super().__init__(logger)
19+
20+
self.logger.debug(self.args)
21+
self.logger.debug(f'args.log_level = {self.args.log_level} = {logging.getLevelName(self.args.log_level)}')
22+
23+
self.command_factory = CommandFactory()
24+
25+
def main(self):
26+
command_executor = self.command_factory.create_command(self.args.command, self.args.db_connection_string)
27+
command_executor.execute()
28+
29+
def get_arguments(self):
30+
parser = argparse.ArgumentParser(description=Constants.APP_NAME,
31+
parents=[Shared.get_default_arguments()])
32+
33+
parser.add_argument('command',
34+
type=self.get_command_value_from_name,
35+
help=f'choose from {", ".join(self._commandNames)}, more coming soon..')
36+
37+
parser.add_argument('db_connection_string',
38+
metavar='db-connection-string',
39+
help='provide in PostgreSQL & Psycopg format, '
40+
'postgresql+psycopg2://username:password@host:port/dbname')
41+
42+
args = parser.parse_args()
43+
44+
return args
45+
46+
def get_command_value_from_name(self, command_name):
47+
if command_name not in self._commandNames:
48+
message = f'invalid choice: {command_name} (choose from {", ".join(self._commandNames)})'
49+
raise argparse.ArgumentTypeError(message)
50+
51+
command_value = getattr(Commands, command_name, Commands.UNKNOWN)
52+
53+
return command_value

modules/Shared.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
import logging
2+
import argparse
3+
4+
5+
class Constants:
6+
APP_NAME = 'model-change-detector'
7+
DATA_PIPELINE_EXECUTION_SCHEMA_NAME = 'data_pipeline'
8+
9+
class DataPipelineExecutionStatus:
10+
STARTED = 'STARTED'
11+
COMPLETED_SUCCESSFULLY = 'SUCCESSFUL'
12+
13+
14+
_logLevelStrings = [logging.getLevelName(logging.CRITICAL),
15+
logging.getLevelName(logging.ERROR),
16+
logging.getLevelName(logging.WARNING),
17+
logging.getLevelName(logging.INFO),
18+
logging.getLevelName(logging.DEBUG)]
19+
20+
_defaultLogLevelString = logging.getLevelName(logging.INFO)
21+
22+
23+
def configure_root_logger(log_level):
24+
# get the root logger
25+
logger = logging.getLogger()
26+
27+
# with the given log level
28+
logger.setLevel(logging.DEBUG)
29+
30+
# and one handler, at the same log level
31+
console_stream_handler = logging.StreamHandler()
32+
console_stream_handler.setLevel(log_level)
33+
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
34+
console_stream_handler.setFormatter(formatter)
35+
logger.addHandler(console_stream_handler)
36+
37+
return
38+
39+
40+
def get_default_arguments():
41+
parser = argparse.ArgumentParser(add_help=False)
42+
43+
parser.add_argument('-l', '--log-level',
44+
action='store',
45+
const=_defaultLogLevelString,
46+
default=_defaultLogLevelString,
47+
type=get_log_level_int_from_string,
48+
nargs='?',
49+
help=f'choose program\'s logging level, from {", ".join(_logLevelStrings)}; '
50+
f'default is {_defaultLogLevelString}')
51+
52+
return parser
53+
54+
55+
def get_log_level_int_from_string(log_level_string):
56+
if log_level_string not in _logLevelStrings:
57+
message = f'invalid choice: {log_level_string} (choose from {", ".join(_logLevelStrings)})'
58+
raise argparse.ArgumentTypeError(message)
59+
60+
log_level_int = getattr(logging, log_level_string, logging.getLevelName(_defaultLogLevelString))
61+
62+
# check the logging log_level_choices have not changed from our expected values
63+
assert isinstance(log_level_int, int)
64+
65+
return log_level_int

modules/commands/BaseCommand.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
from sqlalchemy import create_engine
2+
from sqlalchemy.orm import sessionmaker
3+
from modules.DataPipelineExecutionRepository import DataPipelineExecutionRepository
4+
from modules.BaseObject import BaseObject
5+
6+
7+
class BaseCommand(BaseObject):
8+
def __init__(self, db_connection_string, logger=None):
9+
super().__init__(logger)
10+
self.db_engine = create_engine(db_connection_string, echo=False)
11+
self.session_maker = sessionmaker(bind=self.db_engine)
12+
self.repository = DataPipelineExecutionRepository(self.session_maker)
13+
self.repository.create_schema(engine=self.db_engine)

modules/commands/CommandFactory.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
from modules.commands.StartCommand import StartCommand
2+
from modules.BaseObject import BaseObject
3+
4+
5+
class CommandFactory(BaseObject):
6+
def __init__(self, logger=None):
7+
super().__init__(logger)
8+
self.commandTypes = [StartCommand]
9+
10+
def create_command(self, command, db_connection_string):
11+
for commandType in self.commandTypes:
12+
if commandType.can_execute_command(command):
13+
self.logger.debug(f'Found command type {commandType} for command {command}.')
14+
return commandType(db_connection_string)
15+
16+
raise RuntimeError(f'There are no command types that can handle the {command} command')

0 commit comments

Comments
 (0)