Architect
Architect
Pipeline orchestrator for soi data warehouse.
__init__(environment, msi, pipeline_version='0.0.1')
Orchestrator of pipeline runs to SOI Warehouse This is the main pipeline orchestrator object for running ETL operations to the SOI data warehouse.
Using this pipeline requires the stout
dependency and will expect that system credentials be stored as
environment variables.
TODO: Graceful error handling... how to get sqlalchemy to skip row errors without rolling back the full session.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
`environment` |
str
|
The warehouse environment to write to, either 'Dev_v01xx' or 'Prod_v01xx'. |
required |
`pipeline_version` |
str
|
The pipeline version, currently hard coded but should eventually move to commit hash. |
required |
build(blueprint, create_log=False)
Main method for the pipeline orchestrator.
blueprint
(dict or set): A dictionary or set that outlines which pipelines to run, in what order,
and the arguments that define how each pipeline runs.
Ex. {'CensusPipeline': {
'extract_source': True,
'remove_views': False,
'create_views': False
},
'OpenMRSPipeline': {
'extract_legacy': False,
'extract_source': True,
'load_to_core': True,
'remove_views': False,
'create_views': False
}}
Ex. {'CensusPipeline', 'OpenMRSPipeline', 'SolerPipeline'}
log_args(run_id, asset_id, args)
Helper function to update log with parsed arguments
Arguments: run_id (str) : uuid4 for full soflow run asset_id (str) : uuid4 for specific process in the pipeline run args (dict) : updated arguments to be passed to the process
log_end(run_id, asset_id, status)
Helper function to update log with final runtime and status
Arguments: run_id (str) : uuid4 for full soflow run asset_id (str) : uuid4 for specific process in the pipeline run status (str) : updated status for the process -- Aborted or Completed
log_start(run_id, asset_id, name, version, args)
Helper function to create entry in Log for process
Arguments: run_id (str) : uuid4 for full soflow run asset_id (str) : uuid4 for specific process in the pipeline run name (str) : name of process version (str) : package version of process args (dict) : updated arguments to be passed to the process