Pipelines
Census
CensusPipeline
Bases: BasePipeline
Pipeline class for Census.
__init__(environment, msi=True, pipeline_version='0.0.1')
Census ETL pipeline to SOI Warehouse.
This is the main pipeline object for running ETL operations from Census. It directly accesses the source connection to extract the data and load it into Stage.Census{}. Once all tables have updated, tables are inserted from Stage into Census. Backup tables are created as the first step in the process.
Using this pipeline requires the stout
dependency (also contained
in this repo) and will expect that system credentials be stored as
environment variables.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
`environment` |
str
|
The warehouse environment to write to, either 'Dev_v01xx' or 'Prod_v01xx'. |
required |
`pipeline_version` |
str
|
The pipeline version, currently hard coded but should eventually move to commit hash. |
required |
Attributes:
source_connection
(stout.Connection): The connection engine to
the source system, currently Census.
target_connection
(stout.Connection): The connection engine to
the target system, our warehouse, and based
on the environment variable.
all_tables
(list): The list of tables provided by Census.
run(spawn_new=False, extract_source=False, remove_views=False, create_views=False, create_verification=False)
Main run method for the Census pipeline.
This method runs serially for each table with the following steps:
- Initialize the extractor to Census.
- Initialize the loader to SOI Warehouse (Dev or Prod).
- For each table, extract from Census, transform and load to DWH.
Common
CommonPipeline
Bases: BasePipeline
Pipeline class for Common data tables.
__init__(environment, msi=True, pipeline_version='0.0.1')
Common ETL pipeline to SOI Warehouse.
This is the main pipeline object for running ETL operations to create the common tables. The data is located in soload in csv files and this loads it directly to the datawarehouse.
Using this pipeline requires the stout
dependency (also contained
in this repo) and will expect that system credentials be stored as
environment variables.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
`environment` |
str
|
The warehouse environment to write to, either 'Dev_v01xx' or 'Prod_v01xx'. |
required |
`pipeline_version` |
str
|
The pipeline version, currently hard coded but should eventually move to commit hash. |
required |
Attributes:
target_connection
(stout.Connection): The connection engine to
the target system, our warehouse, and based
on the environment variable.
all_tables
(list): The list of tables provided by Census.
run(spawn_new=False, create_WorldGames=False, create_RegionalGames=False, create_ProgramsCDC=False, create_CountriesCDC=False, create_MultipleEventOwners=False, create_CountryISOCodes=False)
Main run method for the Common pipeline.
This method runs serially for each table with the following steps:
- Initialize the extractor to get the csv files.
- Initialize the loader to SOI Warehouse (Dev or Prod).
- For each table, extract from the csv, transform and load to DWH.
GMS
GMSPipeline
Bases: BasePipeline
Pipeline class for OpenMRS.
__init__(environment, msi=True, pipeline_version='0.0.1')
GMS ETL pipeline to SOI Warehouse.
This is the main pipeline object for running ETL operations
from GMS. It consists of calls to loaders and extractors
in the correct order so the user need only interact with this
class's run method.
Using this pipeline requires the stout
dependency (also contained
in this repo) and will expect that system credentials be stored as
environment variables.
TODO: Graceful error handling... how to get sqlalchemy to skip
row errors without rolling back the full session.
Args:
environment
(str): The warehouse environment to write to,
either 'Dev_v01xx' or 'Prod_v01xx'.
pipeline_version
(str): The pipeline version, currently
hard coded but should eventually move to commit
hash.
Attributes:
target_connection
(soload.Connection): The connection engine to
the target system, our warehouse, and based
on the environment variable.
all_programs
(list): The full set of GMS databases.
run(programs, spawn_new=False, extract_source=False, clear_source=False)
Main run method for the GMS pipeline.
This method runs serially for each discipline with the following steps:
1. Initialize the extractor to GMS.
2. Initialize the loader to SOI Warehouse (Dev or Prod).
3. For each program, extract from GMS, transform and load to DWH.
4. Optionally load information to core
Args:
program
(dictionary): The programs/sources to run through the pipeline.
spawn_new
(bool): Whether to spawn a new instance of GMS tables. Default False.
extract_source
(bool): Whether to load from GMS. Default False.
clear_source
(bool): Whether to clear source from GMS tables.
Sources to clear are defined in program
arg.Default False.
OpenMRS
OpenMRSPipeline
Bases: BasePipeline
Pipeline class for OpenMRS.
__init__(environment, msi=True, pipeline_version='0.0.1')
OpenMRS ETL pipeline to SOI Warehouse.
This is the main pipeline object for running ETL operations
from OpenMRS. It consists of calls to loaders and extractors
in the correct order so the user need only interact with this
class's run method.
Using this pipeline requires the stout
dependency (also contained
in this repo) and will expect that system credentials be stored as
environment variables.
TODO: Graceful error handling... how to get sqlalchemy to skip row errors without rolling back the full session.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
`environment` |
str
|
The warehouse environment to write to, either 'Dev_v01xx' or 'Prod_v01xx'. |
required |
`pipeline_version` |
str
|
The pipeline version, currently hard coded but should eventually move to commit hash. |
required |
Attributes:
source_connection
(soload.Connection): The connection engine to
the source system, currently just OpenMRS.
target_connection
(soload.Connection): The connection engine to
the target system, our warehouse, and based
on the environment variable.
legacy_connection
(soload.Connection): The connection engine to
the Prod_v01xx system in our warehouse, which
is where LegacyHAS is located, and based
on the environment variable.
all_disciplines
(list): The full set of HAS disciplines.
run(disciplines, spawn_new=False, extract_legacy=False, extract_source=False, remove_views=False, create_views=False, create_connected_has_table=False, remove_haspublic_views=False, create_haspublic_views=False)
Main run method for the OpenMRS pipeline.
This method runs serially for each discipline with the following steps:
- Initialize the extractor to OpenMRS.
- Initialize the loader to SOI Warehouse (Dev or Prod).
- For each discipline, extract from OpenMRS, transform and load to DWH.
- Optionally load the LegacyHAS data in the same manner as OpenMRS
- Optionally load information to core
- Optionally remove views (ex: FitFeetView)
- Optionally create views
- Update the log tracker in HAS schema.
The final step is used upstream for CDC.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
`disciplines` |
list
|
The disciplines to run through the pipeline. |
required |
`spawn_new` |
bool
|
Whether to spawn new has tables. Default False |
required |
`extract_legacy` |
bool
|
Whether to load from LegacyHAS. Default False. |
required |
`extract_source` |
bool
|
Whether to load from OpenMRS. Default False. |
required |
`load_to_core` |
bool
|
Whether to transform-load to core. Default False. |
required |
`remove_views` |
bool
|
Whether to remove current HAS views (if views already exist, it should be True, otherwise False). Default False. |
required |
`create_views` |
bool
|
Whether to update the HAS views. Default False. |
required |
`create_connected_has_table` |
bool
|
Whether to create HASPublic. Default False. |
required |
`remove_haspublic_views` |
bool
|
Whether to remove current HASPublic views (if views already exist, it should be True, otherwise False). Default False. |
required |
`create_haspublic_views` |
bool
|
Whether to update the HASPublic views. Default False. |
required |
SHE
SHEPipeline
Bases: BasePipeline
Pipeline class for SHE data tables.
__init__(environment, msi=True, pipeline_version='0.0.1')
Common ETL pipeline to SOI Warehouse.
This is the main pipeline object for running ETL operations
to create the SHE tables. The data is located in the Azure data lake
as xlsx files and this loads it directly to the datawarehouse. This has the ability to take multiple files in the same format for the SHE.
Using this pipeline requires the `stout` dependency (also contained
in this repo) and will expect that system credentials be stored as
environment variables.
Args:
`environment` (str): The warehouse environment to write to,
either 'Dev_v01xx' or 'Prod_v01xx'.
`pipeline_version` (str): The pipeline version, currently
hard coded but should eventually move to commit
hash.
Attributes:
`target_connection` (stout.Connection): The connection engine to
the target system, our warehouse, and based
on the environment variable.
`all_tables` (list): The list of tables provided by Census.
run(spawn_new=False, create_summary_table=False, create_long_summary_table=False)
Main run method for the Common pipeline.
This method runs for each table with the following steps:
1. Initialize the extractor to get the xlsx files.
2. Initialize the loader to SOI Warehouse (Dev or Prod).
3. For each table, extract from the files and append to one dataframe.
- Transform the raw data.
- Create a calculated summary table and load to DWH.
- Create a long version of the summary table and load to DWH.
SportPartnership
SportPartnershipPipeline
Pipeline class for Sport Partnership table.
__init__(environment, msi=True, pipeline_version='0.0.1')
This is the main pipeline object for running ETL operations to create the Sport Partnership tables. The data is located in the Azure data lake as xlsx files and this loads it directly to the datawarehouse.
Using this pipeline requires the stout
dependency (also contained
in this repo) and will expect that system credentials be stored as
environment variables.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
`environment` |
str
|
The warehouse environment to write to, either 'Dev_v01xx' or 'Prod_v01xx'. |
required |
`pipeline_version` |
str
|
The pipeline version, currently hard coded but should eventually move to commit hash. |
required |
Attributes:
target_connection
(stout.Connection): The connection engine to
the target system, our warehouse, and based
on the environment variable.
all_tables
(list): The list of tables provided by Census.
run(spawn_new=False, create_sport_partnership_table=False)
Main run method for the Sport Partnership pipeline.
This method runs for each table with the following steps:
- Initialize the extractor to get the xlsx files.
- Initialize the loader to SOI Warehouse (Dev or Prod).
- For each table, extract from the files and append to one dataframe.
- Transform the raw data.
- Load to DWH.