Skip to content

SoLoad

This is the main repository for SoLoad -- a package that defines processes for removing, extracting, and loading data to the data warehouse.

SOLOAD ETL Library

SOLOAD is an ETL library, currently supporting pipelines from HAS (OpenMRS), GMS, and Census to our data warehouse. For easy install:

  1. cd soidatawarehouse/soload
  2. pip install -r requirements.txt
  3. python setup.py install

You can also build a whl artifact for deployment.

Interaction and execution of the pipeline is fairly straightforward. Pipelines defined and run in SoFlow. See below for an example of running a pipeline from SoLoad.

CorePipeline()

from soload.pipelines import *
# msi = True is pipeline is being run on the vm
# msi = False if pipeline is being run on a local machine

pipe = CorePipeline('Dev_v01xx', msi = True) 

# DEFAULT
pipe.run(sources_to_clear = None, spawn_new=False, clear_core=False, 
                    load_has=False, update_has=False, 
                    load_gms=False, update_gms=False)


# To update has data in Core:

pipe.run(sources_to_clear = None, spawn_new=False, clear_core=False, 
                    load_has=False, update_has=True, 
                    load_gms=True, update_gms=False)

CensusPipeline()

from soload.pipelines import *

pipe = CensusPipeline('Dev_v01xx', msi = True)

# DEFAULT
pipe.run(spawn_new=False, extract_source=False, remove_views = False, create_views = False)

# To extract Census data and create views:
pipe.run(spawn_new=False, extract_source=True, remove_views = True, create_views = True)

OpenMRSPipeline()

from soload.pipelines import *

pipe = OpenMRSPipeline('Dev_v01xx', msi = True)

# DEFAULT
pipe.run(disciplines = pipe.all_disciplines, spawn_new = False, extract_legacy=False, 
            extract_source=False, remove_views=False, create_views=False, 
            create_connected_has_table = False)


# To extract OpenMRS data:
pipe.run(disciplines = pipe.all_disciplines, spawn_new = False, extract_legacy=False, 
            extract_source=True, remove_views=False, create_views=False, 
            create_connected_has_table = False)

# To drop HAS tables and relaod from legacy & source
pipe.run(disciplines = pipe.all_disciplines, spawn_new = True, extract_legacy=True, 
            extract_source=True, remove_views=False, create_views=False, 
            create_connected_has_table = False)

GMSPipeline()

from soload.pipelines import *

pipe = GMSPipeline('Dev_v01xx', msi = True)

#DEFAULT
pipe.run(programs = pipe.all_programs, spawn_new = False, extract_source=False, clear_source=False)

# To extract GMS data:
pipe.run(programs = pipe.all_programs, spawn_new = True, extract_source=True, clear_source=False)

CommonPipeline()

To extract common tables from csv files and load to warehouse:

from soload.pipelines import *

pipe = CommonPipeline('Dev_v01xx', msi = True)

# DEFAULT
pipe.run(spawn_new=False, create_WorldGames = False, create_RegionalGames = False, 
            create_ProgramsCDC = False, create_CountriesCDC = False,
            create_MultipleEventOwners = False, create_CountryISOCodes = False)

# Drop ProgramsCDC and reload
pipe.run(spawn_new=True, create_WorldGames = False, create_RegionalGames = False, 
            create_ProgramsCDC = True, create_CountriesCDC = False,
            create_MultipleEventOwners = False, create_CountryISOCodes = False)

SHEPipeline()

To extract common tables from csv files and load to warehouse:

from soload.pipelines import *

pipe = SHEPipeline('Dev_v01xx', msi = True)

# DEFAULT
pipe.run(spawn_new = False, create_summary_table=False, create_long_summary_table = False)

SportPartnershipPipeline()

To extract common tables from csv files and load to warehouse:

from soload.pipelines import *

pipe = SportPartnershipPipeline('Dev_v01xx', msi = True)

# DEFAULT
pipe.run(spawn_new = False, create_sport_partnership_table=False)

GovtRelationsSurveyPipeline()

from soload.pipelines import *

pipe = GovtRelationsSurveyPipeline('Dev_v01xx', msi = True)

# DEFAULT
pipe.run(spawn_new = False, survey = None, load_survey = False, drop_survey = False, 
            update_survey = False, update_users = False, load_views = False)

# To spawn new tables and load all survey data, including the UserAccess table:
pipe.run(spawn_new = True, survey = pipe.all_surveys, load_survey = True, 
            drop_survey = False, update_survey = False, update_users = False, 
            load_views = True)

# To update survey data (only GEP.Demo, GEP,ResponsesText, and GEP.ResponsesChoices):
pipe.run(spawn_new = False, survey = ['2023_US'], load_survey = False, 
            drop_survey = False, update_survey = True, update_users = False, 
            load_views = True)

# To drop and reload survey data:
pipe.run(spawn_new = False, survey = ['2023_US'], load_survey = True, 
            drop_survey = True, update_survey = False, update_users = False, 
            load_views = True)

# To update the UserAccess table:
# First make sure the files have been updated in soidatalake01xx/govt-relations-survey/user_access/
# After the pipeline has run, make sure the new users are added to the Azure environment & are added to the "DS&E Government Engagement & Policy" group.
pipe.run(spawn_new = False, survey = None, load_survey = False, 
            drop_survey = False, update_survey = True, update_users = True, 
            load_views = False)

AccreditationPipeline()

This pipeline will do a few separate processes associated with Accreditation and PQS (because PQS is a part of the accreditation process). It will download accreditation files from the ATS accreditation system and upload them to sharepoint. It also will download PQS files from the ATS accreditation system and upload them to blob storage. Separately, it will run the creation of the accreditation tables from soiservices and load them to the data warehouse. Finally, it will run the PQS data cleaning process, taking the PQS files from blob storage and loading the cleaned PQS table to the data warehouse.

from soload.pipelines import *

pipe = AccreditationPipeline('Dev_v01xx', msi = True)

# DEFAULT
pipe.run(spawn_new_accred_tables = False, extract_accred_tables = False,
        spawn_new_pqs_table = False, create_pqs_table = False,
        download_pqs = False, download_pqs_full_refresh = False, 
        download_accred = False, download_missing_accred_files = False,
        download_accred_full_refresh = False)

# For the daily run (runs accreditation tables, pqs data update, 
                        pqs tables, accreditation data update)
pipe.run(spawn_new_accred_tables = False, extract_accred_tables = True,
        spawn_new_pqs_table = False, create_pqs_table = True,
        download_pqs = True, download_pqs_full_refresh = False, 
        download_accred = True, download_missing_accred_files = False,
        download_accred_full_refresh = False)

# Spawning the Accreditation tables in the data warehouse (initializing the ddl)
pipe.run(spawn_new_accred_tables = True, extract_accred_tables = False,
        spawn_new_pqs_table = False, create_pqs_table = False,
        download_pqs = False, download_pqs_full_refresh = False, 
        download_accred = False, download_missing_accred_files = False,
        download_accred_full_refresh = False)

# Running the Accreditation schema tables in the data warehouse
pipe.run(spawn_new_accred_tables = False, extract_accred_tables = True,
        spawn_new_pqs_table = False, create_pqs_table = False,
        download_pqs = False, download_pqs_full_refresh = False, 
        download_accred = False, download_missing_accred_files = False,
        download_accred_full_refresh = False)

# Spawning the PQS tables in the data warehouse (initializing the ddl)
pipe.run(spawn_new_accred_tables = False, extract_accred_tables = False,
        spawn_new_pqs_table = True, create_pqs_table = False,
        download_pqs = False, download_pqs_full_refresh = False, 
        download_accred = False, download_missing_accred_files = False,
        download_accred_full_refresh = False)

# Create the PQS table in the Accreditation schema
pipe.run(spawn_new_accred_tables = False, extract_accred_tables = False,
        spawn_new_pqs_table = False, create_pqs_table = True,
        download_pqs = False, download_pqs_full_refresh = False, 
        download_accred = False, download_missing_accred_files = False,
        download_accred_full_refresh = False)

# Save the pqs files to blob storage for the data warehouse pipeline
pipe.run(spawn_new_accred_tables = False, extract_accred_tables = False,
        spawn_new_pqs_table = False, create_pqs_table = False,
        download_pqs = True, download_pqs_full_refresh = False, 
        download_accred = False, download_missing_accred_files = False,
        download_accred_full_refresh = False)

# Delete all of the pqs files in blob storage and rerun to save them
pipe.run(spawn_new_accred_tables = False, extract_accred_tables = False,
        spawn_new_pqs_table = False, create_pqs_table = False,
        download_pqs = False, download_pqs_full_refresh = True, 
        download_accred = False, download_missing_accred_files = False,
        download_accred_full_refresh = False)

# Running legal's accreditation process to save files to sharepoint
pipe.run(spawn_new_accred_tables = False, extract_accred_tables = False,
        spawn_new_pqs_table = False, create_pqs_table = False,
        download_pqs = False, download_pqs_full_refresh = False, 
        download_accred = True, download_missing_accred_files = False,
        download_accred_full_refresh = False)

# Look at the differences in files from the SharePoint and accred system
# and download the missing files. This is meant to be as a quicker option
# than the full refresh (download_accred_full_refresh) to look at some of the 
# missing files that are not captured in the daily update (download_accred)
pipe.run(spawn_new_accred_tables = False, extract_accred_tables = False,
        spawn_new_pqs_table = False, create_pqs_table = False,
        download_pqs = False, download_pqs_full_refresh = False, 
        download_accred = False, download_missing_accred_files = True,
        download_accred_full_refresh = False)

# Delete all accreditation sharepoint files and then save all files again
# This takes a long time, over 10 hours from the number of files
pipe.run(spawn_new_accred_tables = False, extract_accred_tables = False,
        spawn_new_pqs_table = False, create_pqs_table = False,
        download_pqs = False, download_pqs_full_refresh = False, 
        download_accred = False, download_missing_accred_files = False,
        download_accred_full_refresh = False)