Tasks→
The Enrich system comes with the ability to define and run background tasks. Tasks help with miscellaneous short term activities such as a backups.
The task execution looks like pipeline execution. It is designed to be run headless with the Enrich GUI providing the management interface. It also works well with thirdparty task management frameworks such as AirFlow.
The task ecosystem has three components:
- Tasklib - A task library that implements a given task.
- Configuration - This is task specification similar to pipeline configuration. It has global parameters such as output directories, a list of tasks and their corresponding configurations.
- Runner - The executor of the configuration. This is implemented at the enrich server end. This takes the configuration, loads and configures appropriate libraries, and triggers the tasks. The enrich system will also take care of logging and tracking all tasks.
Modules→
Task(*args, **kwargs)
→
Bases: object
This is the task base class. This is 'executed' by a 'runner'
Source code in enrichsdk/tasks/__init__.py
supported_extra_args = []
instance-attribute
→
Extra arguments that can be passed on the commandline to the module.
self.extra_args = [ { "name": "jobid", "description": "Export JobID", "type": "str", "required": True, "default": "22832", } ]
configure(spec)
→
Configure this task
Source code in enrichsdk/tasks/__init__.py
get_credentials(name)
→
Helper function to access siteconf for credentials
Source code in enrichsdk/tasks/__init__.py
get_default_metadata(state)
→
Get reuse metadata dict with some standard fields.
Source code in enrichsdk/tasks/__init__.py
get_supported_extra_args()
→
What extra command line arguments are supported
Returns:
Name | Type | Description |
---|---|---|
list |
List of arg specifications
|
see supported_extra_args
initialize()
→
preload_clean_args(args)
→
Clean the arguments before using them
Parameters:
Name | Type | Description | Default |
---|---|---|---|
args |
required |
Source code in enrichsdk/tasks/__init__.py
preload_validate_conf(conf)
→
Check whether this configuration is even valid?
Parameters:
Name | Type | Description | Default |
---|---|---|---|
conf |
required |
Source code in enrichsdk/tasks/__init__.py
preload_validate_self()
→
Check whether definition of the task is fine
Source code in enrichsdk/tasks/__init__.py
run(*args, **kwargs)
→
validate(what, state)
→
Validate various aspects of the task state,
configuration, and data. Dont override this function. Override
specific functions such as validate_args
.
Args: what (str): What should be validated? args, conf, results etc.
Returns:
Nothing
Raises: Exception ("Validation error")
Source code in enrichsdk/tasks/__init__.py
Set of predefined tasks
BackupCoreTask(*args, **kwargs)
→
CloudMixin
→
Bases: object
get_command(d)
→
Get the command that must be run
Source code in enrichsdk/tasks/sdk.py
get_env()
→
Get environment for the cloud provider
Source code in enrichsdk/tasks/sdk.py
run_sync(state)
→
Run the backup task
Source code in enrichsdk/tasks/sdk.py
validate_args(what, state)
→
Validate args.
Looks like 'aws' and 'backupdirs' specification
Example::
"args": {
"backupdirs": [
{
"enable": true,
"name": "Logs",
"src": "%(enrich_root)s/logs/",
"dst": "backup/logs/"
},
...
],
"aws": {
"credentials": "enrich-acme",
"bucket": "client-enrich"
}
}
Source code in enrichsdk/tasks/sdk.py
SyncLocalCoreTask(*args, **kwargs)
→
Example→
Imagine a daily task that sync's data from a API before triggering a pipeline on that data. The task requires a combination of a simple library apisynctask and a task configuration file APISync.json
The directory structure looks like this:
$ cd acme/Marketing
$ find tasks
tasks/
tasks/lib
tasks/lib/apisynctask
tasks/lib/apisynctask/__init__.py
tasks/conf
tasks/conf/APISync.json
The task configuration looks like this:
$ cat tasks/conf/APISync.json
{
"name": "APISync",
"description": "Download data from API provider",
"customer_root": "%(enrich_customers_dir)s/acme/Marketing",
"data_root": "%(enrich_data_dir)s/acme/Marketing",
"runid": "dailysync-%Y%m%d-%H%M%S",
"output": "%(data_root)s/output/tasks/%(name)s",
"log": "%(output)s/%(runid)s/log.json",
"paths": {
"libraries": ["%(customer_root)s/tasks/lib"]
},
"notification": {
"email": ["Team <marketing-tech@acmeinc.com>"]
},
"tasks": [
{
"name": "APISyncTask",
"enable": true,
"args": {
"site-params": {
"1831": {
"startdate": "2017-10-20"
}
},
"sensor": {
"credentials": "apisync-creds"
}
}
}
]
}
The library could be:
import json
import datetime
import requests
from requests.auth import HTTPBasicAuth
import logging
from enrichsdk.tasks import Task
logger = logging.getLogger("app")
class APISyncTask(Task):
NAME = "APISyncTask"
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.name = "APISyncTask"
self.urls = {
"events": "https://api.devicevendor.com/?siteid={}"
}
def validate(self):
"""
Validate the parameters
"""
if len(self.args) == 0:
raise Exception("Invalid args. siteid must be specified")
required = {
'site-params': [],
}
missing = []
for r in required:
if r not in self.args:
missing.append(r)
continue
details = required[r]
if details is None or len(details) == 0:
continue
for d in details:
if d not in self.args[r]:
missing.append("{}: {}".format(r, d))
if len(missing) > 0:
raise Exception("Missing arguments: {}".format(",".join(missing)))
def download_files(self, state):
"""
Download the files from the device vendor's site
"""
# => Get credentials...
cred_name = self.args['sensor']['credentials']
credentials = self.get_credentials(cred_name)
date = datetime.now().strftime("%Y-%m-%d")
# => Download data from each site
for siteid in self.args['site-params']:
url = self.urls['events'].format(siteid)
r = requests.get(url,
auth=HTTPBasicAuth(credentials['username'],
credentials['password']))
# Write the file...
content = r.content
...
# Notes to be shared
state.make_note("Downloaded: {}".format(siteid))
def run(self, state):
"""
Run the task
"""
self.download_files(state)
logger.debug("Completed run",
extra=self.config.get_extra({
'transform': self.name,
}))