Skip to content

SOFLOW

soflow is a pipeline orchestrator. Architect() is the baseline class for soflow.

Modules

  • soflow

Classes

  • Architect

Methods

  • build(blueprint)

How to install on a local machine

cd ~/../soflow
pip install -r requirements.txt
python setup.py install

Requirements

  • stout and soler are required.
  • blueprint: soflow requires a blueprint to determine which process to run, and in what order. A blueprint may be:
    • dict : if you wish to supply arguments to processes for building
    • set : if you wish to use all default arguments for building

Blueprints

The soflow/soflow/jobs/blueprints/ folder contains json files for pre-set blueprints which can be chained together to create a pipeline run. The available collection of blueprints is outlined below.

Blueprints are intended to be modular; this allows more flexibility for bash scripts.

Bash Scripts

The soflow/soflow/jobs/bash_scripts folder contains bash scripts that chain together json blueprints to create common pipeline runs.

Bash scripts are meant to be used as a prestructured method of calling a chain of pipelines in order. It requires an input of environment, which it then passes to each blueprint listed in the bash script. Blueprints are called in the script from their json blueprint files which can be found in soflow/soflow/jobs/blueprints/

For example:

daily_run.sh

This script chains together the blueprints for a daily run, which includes: - running the SHE pipeline in the background; - starting the Census pipeline while the SHE pipeline is running; - running the open mrs update after the Census pipeline has completed regardless of whether Census pipeline was successful; - updating Core with new rows after the open mrs update has completed and contingent on a successful open mrs run; - updating soler after the previous task completed contigent on its success; and - creating the connected has tables from the stored procedures after the previous task completed contingent on its success.

see here for information on chaining bash commands.

#!/usr/bin/bash
# This script activates the py38 environment and updates packages


# source the virtual python environment to load packages
source /home/SODA/virtualenvs/py38/bin/activate 

# calls python module, then chains together scripts and inputs environment variable {$1}

python /home/SODA/code/soflow/soflow/jobs/blueprints/run_script.py -e $1 -m True -b /home/SODA/code/soflow/soflow/jobs/blueprints/she_blueprint.json & python /home/SODA/code/soflow/soflow/jobs/blueprints/run_script.py -e $1 -m True -b /home/SODA/code/soflow/soflow/jobs/blueprints/census_blueprint.json; python /home/SODA/code/soflow/soflow/jobs/blueprints/run_script.py -e $1 -m True -b /home/SODA/code/soflow/soflow/jobs/blueprints/openmrs_update_blueprint.json && python /home/SODA/code/soflow/soflow/jobs/blueprints/run_script.py -e $1 -m True -b /home/SODA/code/soflow/soflow/jobs/blueprints/soler_update_blueprint.json && python /home/SODA/code/soflow/soflow/jobs/blueprints/run_script.py -e $1 -m True -b /home/SODA/code/soflow/soflow/jobs/blueprints/openmrs_create_connected_table_blueprint.json

echo "Daily $1 pipeline finished!"

These scripts are meant to be called as a cronjob or can be called directly in the terminal:

/path/to/script.sh Dev_v01xx

Examples

If you supply a dictionary:

>>> from soflow.soflow import Architect

# Architect().build() will run the pipeline in the order processes are supplied -- Census, then HAS.
# You do not have to provide all arguments for every process. 
# Arguments will default to required_keys (found in soflow.py) if not supplied in the blueprint.
# If a process is supplied that is not contained in required_keys, the entire build will be terminated.

# msi = True if running in the vm
# msi = False if running on a local machine

blueprint = {
    'CensusPipeline': {     # extract_source is not supplied for Census, so it defaults to True
        'remove_views': False, 
        'create_views': False
    },                          
    'OpenMRSPipeline'           # Nothing is supplied for HAS, so everything defaults to True 
}

>>> architect = Architect(environment = 'Dev_v01xx', msi = True)
>>> architect.build(blueprint)

If you supply a set:

>>> from soflow.soflow import Architect

# Architect().build() will run the pipeline in the order processes are supplied -- Census, then HAS.
# You do not have to provide any arguments for any process. 
# Arguments will default to required_keys (found in soflow.py).
# If a process is supplied that is not contained in required_keys, the entire build will be terminated.

blueprint = {'CensusPipeline', 'OpenMRSPipeline'} # Both Census and HAS will be run using default arguments

>>> architect = Architect(environment = 'Dev_v01xx', msi = True)
>>> architect.build(blueprint)

You can also run soflow directly from command line by passing arguments to run_script.py

  • Required Arguments:

    • -e --environment (str -- no quotes needed): Name of environment (Dev_v01xx or Prod_v01xx)
    • -m --msi (True/False): Whether msi is being used to connect to the environment (True is running in the vm; False if pulling credentials via stout)
    • -b --blueprint (json): json object containing the blueprint
  • Optional Arguments:

    • -l --create_log (True/False): Whether you wish to remove the current log and start new (True/False)
# Run the Census pipeline in Prod and keep the existing log in tact using msi connection
python ..path/soflow/soflow/jobs/blueprints/run_script.py -e Prod_v01xx -m True -b ../home/SODA/code/soflow/soflow/jobs/blueprints/census_blueprint.json

All Possible Arguments

Here are all possible arugments in the form that would go into a run script. Run scripts are located here: GitHub/soflow/soflow/jobs/blueprints. They are all set to False for this example.

{
    "CorePipeline":{
        "spawn_core": "False",
        "load_has": "False",
        "load_gms": "False",
        "update_has": "False",
        "update_gms": "False"
    },
    "CensusPipeline": {
        "spawn_new": "False",
        "extract_source": "False",
        "remove_views": "False",
        "create_views": "False",
        "create_verification": "False"
    }, 
    "OpenMRSPipeline": {
        "spawn_new": "False",
        "extract_legacy": "False",
        "extract_source": "False",
        "update_core": "False",
        "remove_views": "False",
        "create_views": "False",
        "create_connected_has_table": "False",
        "remove_haspublic_views": "True",
        "create_haspublic_views": "True"
    },
    "GMSPipeline": {
        "spawn_new": "False",
        "extract_source": "False",
        "remove_lookups": "False",
        "create_lookups": "False"
    },
    "SolerPersonPipeline":{
        "spawn_personclean": "False",
        "spawn_personresolved": "False", 
        "load_to_core": "False",
        "load_personclean": "False",
        "update_personresolved": "False",
        "bulk_personresolved": "False"
    },
    "SolerProgramPipeline":{
        "spawn_programresolved": "False",
        "load_to_core": "True",
        "load_soi_programs": "False"
    },
    "CommonPipeline": {
        "spawn_new": "False",
        "create_WorldGames": "False",
        "create_RegionalGames": "False",
        "create_ProgramsCDC": "False",
        "create_CountriesCDC": "False",
        "create_MultipleEventOwners": "False",
        "create_CountryISOCodes": "False"
    },
    "SHEPipeline": {
        "spawn_new": "False",
        "create_summary_table": "False",
        "create_long_summary_table": "False"
    },
    "SportPartnershipPipeline": {
        "spawn_new": "False",
        "create_sport_partnership_table": "False"
    }
}