Skip to content

Architect

Architect

Pipeline orchestrator for soi data warehouse.

__init__(environment, msi, pipeline_version='0.0.1')

Orchestrator of pipeline runs to SOI Warehouse This is the main pipeline orchestrator object for running ETL operations to the SOI data warehouse.

Using this pipeline requires the stout dependency and will expect that system credentials be stored as environment variables.

TODO: Graceful error handling... how to get sqlalchemy to skip row errors without rolling back the full session.

Parameters:

Name Type Description Default
`environment` str

The warehouse environment to write to, either 'Dev_v01xx' or 'Prod_v01xx'.

required
`pipeline_version` str

The pipeline version, currently hard coded but should eventually move to commit hash.

required

build(blueprint, create_log=False)

Main method for the pipeline orchestrator. blueprint (dict or set): A dictionary or set that outlines which pipelines to run, in what order, and the arguments that define how each pipeline runs. Ex. {'CensusPipeline': { 'extract_source': True, 'remove_views': False, 'create_views': False }, 'OpenMRSPipeline': { 'extract_legacy': False, 'extract_source': True, 'load_to_core': True, 'remove_views': False, 'create_views': False }} Ex. {'CensusPipeline', 'OpenMRSPipeline', 'SolerPipeline'}

log_args(run_id, asset_id, args)

Helper function to update log with parsed arguments

Arguments: run_id (str) : uuid4 for full soflow run asset_id (str) : uuid4 for specific process in the pipeline run args (dict) : updated arguments to be passed to the process

log_end(run_id, asset_id, status)

Helper function to update log with final runtime and status

Arguments: run_id (str) : uuid4 for full soflow run asset_id (str) : uuid4 for specific process in the pipeline run status (str) : updated status for the process -- Aborted or Completed

log_start(run_id, asset_id, name, version, args)

Helper function to create entry in Log for process

Arguments: run_id (str) : uuid4 for full soflow run asset_id (str) : uuid4 for specific process in the pipeline run name (str) : name of process version (str) : package version of process args (dict) : updated arguments to be passed to the process