Skip to content

Tasks

The Enrich system comes with the ability to define and run background tasks. Tasks help with miscellaneous short term activities such as a backups.

The task execution looks like pipeline execution. It is designed to be run headless with the Enrich GUI providing the management interface. It also works well with thirdparty task management frameworks such as AirFlow.

The task ecosystem has three components:

  1. Tasklib - A task library that implements a given task.
  2. Configuration - This is task specification similar to pipeline configuration. It has global parameters such as output directories, a list of tasks and their corresponding configurations.
  3. Runner - The executor of the configuration. This is implemented at the enrich server end. This takes the configuration, loads and configures appropriate libraries, and triggers the tasks. The enrich system will also take care of logging and tracking all tasks.

Modules

Task(*args, **kwargs)

Bases: object

This is the task base class. This is 'executed' by a 'runner'

Source code in enrichsdk/tasks/__init__.py
def __init__(self, *args, **kwargs):
    self.config = kwargs.pop("config")
    self.name = "BaseTask"
    self.description = "Baseclass of tasks"
    self.version = "1.0"
    self.testdata = {"data": {}}

    self.supported_extra_args = []
    """Extra arguments that can be passed on the commandline to 
    the module. 

    self.extra_args = [
           {
              "name": "jobid",
              "description": "Export JobID",
              "type": "str",
              "required": True,
              "default": "22832",
           }
    ]
    """

supported_extra_args = [] instance-attribute

Extra arguments that can be passed on the commandline to the module.

self.extra_args = [ { "name": "jobid", "description": "Export JobID", "type": "str", "required": True, "default": "22832", } ]

configure(spec)

Configure this task

Source code in enrichsdk/tasks/__init__.py
def configure(self, spec):
    """
    Configure this task
    """

    # => Check whether internal configuration is valid
    self.preload_validate_self()

    # => Check whether passed configuration is fine
    self.preload_validate_conf(spec)

    # Make a copy of the spec
    self.spec = copy.deepcopy(spec)

    # Override attributes from the spec
    for attr in ["name", "output_version", "test", "enable"]:
        if (attr in spec) and hasattr(self, attr):
            setattr(self, attr, spec[attr])

    # Clean and load args
    args = spec.get("args", {})
    self.args = self.preload_clean_args(args)

get_credentials(name)

Helper function to access siteconf for credentials

Source code in enrichsdk/tasks/__init__.py
def get_credentials(self, name):
    """
    Helper function to access siteconf for credentials
    """
    if not hasattr(self.config, "siteconf"):
        raise Exception("No siteconf")

    siteconf = self.config.siteconf
    if not isinstance(siteconf, dict):
        raise Exception("Invalid siteconf format")

    try:
        credentials = siteconf["credentials"][name]
    except:
        logger.exception(
            "Cannot acquire credentials",
            extra=self.config.get_extra({"transform": self.name}),
        )
        raise Exception("Missing/invalid credentials")

    return credentials

get_default_metadata(state)

Get reuse metadata dict with some standard fields.

Source code in enrichsdk/tasks/__init__.py
def get_default_metadata(self, state):
    """
    Get reuse metadata dict with some standard fields.
    """
    metadata = {
        "schema": "standalone:task",
        "version": "1.0",
        "timestamp": datetime.now().replace(microsecond=0).isoformat(),
        "task": {
            "usecase": self.config.usecase["org"]["name"],
            # Backward compatability
            "customer": self.config.usecase["org"]["name"],
            "name": state.name,
            "description": self.config.description,
            "host": state.stats["platform"]["name"],
            "runid": state.runid,
            "pid": state.pid,
            "start_time": state.start_time.replace(microsecond=0).isoformat(),
            "end_time": state.end_time.replace(microsecond=0).isoformat(),
            "versionmap": self.config.get_versionmap(),
            "stats": state.stats,
            "cmdline": list(sys.argv),
        },
    }

    return metadata

get_supported_extra_args()

What extra command line arguments are supported

Returns:

Name Type Description
list List of arg specifications

see supported_extra_args

Source code in enrichsdk/tasks/__init__.py
def get_supported_extra_args(self):
    """
    What extra command line arguments are supported

    Returns
    -------
    list: List of arg specifications

    see supported_extra_args
    """
    return copy.copy(self.supported_extra_args)

initialize()

Initialize the task by connecting to database etc.

Source code in enrichsdk/tasks/__init__.py
def initialize(self):
    """
    Initialize the task by connecting to database etc.
    """
    pass

preload_clean_args(args)

Clean the arguments before using them

Parameters:

Name Type Description Default
args required
Source code in enrichsdk/tasks/__init__.py
def preload_clean_args(self, args):
    """
    Clean the arguments before using them

    Parameters
    ----------
    args: Args specified in the config file
    """

    if not self.config.enable_extra_args:
        return args

    # Override the defaults with the parameters
    # Three sources for a given args attribute in the order of
    # priority
    #
    # 1. command line
    # 2. Configuration
    # 3. Default

    readonly = self.config.readonly
    cmdargs = self.config.get_cmdline_args()
    supported_extra_args = self.supported_extra_args

    # Collect the final values for all the defaults
    final = {}
    for a in supported_extra_args:
        name = a["name"]
        cmdlabel = "{}:{}".format(self.name, name)
        default = a["default"]
        required = a["required"]
        if required and not readonly:
            # Required and specified on the command line...
            if cmdlabel not in cmdargs:
                logger.error(
                    "Invalid configuration",
                    extra={
                        "transform": self.name,
                        "data": "Variable {} for task {} is required but missing in command line".format(
                            name, self.name
                        ),
                    },
                )
                raise Exception("Invalid configuration")
            final[name] = cmdargs[cmdlabel]
        elif cmdlabel in cmdargs:
            # Optional but specified on the command line
            final[name] = cmdargs[cmdlabel]
        elif name in args:
            # Priority 2
            final[name] = args[name]
        else:
            # Priority 3
            final[name] = default

    # => Log what has happened...
    msg = ""
    for k, v in final.items():
        msg += "{}: {}\n".format(k, v)
    logger.debug(
        "Configuration has been overridden",
        extra={"transform": self.name, "data": msg},
    )

    args.update(final)

    return args

preload_validate_conf(conf)

Check whether this configuration is even valid?

Parameters:

Name Type Description Default
conf required
Source code in enrichsdk/tasks/__init__.py
def preload_validate_conf(self, conf):
    """
    Check whether this configuration is even valid?

    Parameters
    ----------
    conf: Module configuration specified in the config file
    """

    if not (isinstance(conf, dict) and len(conf) > 0):
        logger.error(
            "Conf is not a valid dictionary",
            extra=self.config.get_extra({"transform": self.name}),
        )
        raise Exception("Conf is not a valid dictionary")

    if self.version != conf.get("version", "1.0"):
        logger.error(
            "Version mismatch",
            extra=self.config.get_extra({"transform": self.name}),
        )
        raise Exception("Version mismatch between config and module")

preload_validate_self()

Check whether definition of the task is fine

Source code in enrichsdk/tasks/__init__.py
def preload_validate_self(self):
    """
    Check whether definition of the task is fine

    """
    supported_extra_args = self.supported_extra_args
    if not isinstance(supported_extra_args, list):
        logger.error(
            "Invalid configuration. Expected {}, actual {}".format(
                "list", str(type(supported_extra_args))
            ),
            EXTRA=self.config.get_extra(
                {
                    "transform": self.name,
                }
            ),
        )
        raise Exception("Supported extra argument should be a list of dicts")

    for i, a in enumerate(supported_extra_args):
        if not isinstance(a, dict):
            logger.error(
                "Invalid configuration. Entry {} of supported_extra_args: Expected {}, actual {}".format(
                    i, "dict", str(type(a))
                ),
                EXTRA=self.config.get_extra(
                    {
                        "transform": self.name,
                    }
                ),
            )
            raise Exception("Supported extra argument should be a list of dicts")

        required = ["name", "description", "default", "required"]
        missing = [x for x in required if x not in a]
        if len(missing) > 0:
            logger.error(
                "Invalid configuration. Entry {} of supported_extra_args: Missing attributes: {}".format(
                    i, ", ".join(missing)
                ),
                extra=self.config.get_extra(
                    {"transform": self.name, "data": str(a)}
                ),
            )
            raise Exception("Missing attributes in one of the supported extra args")

run(*args, **kwargs)

Main execution func. Override this

Source code in enrichsdk/tasks/__init__.py
def run(self, *args, **kwargs):
    """
    Main execution func. Override this
    """
    raise Exception("Should be implemented by the derived class")

validate(what, state)

Validate various aspects of the task state, configuration, and data. Dont override this function. Override specific functions such as validate_args.

Args: what (str): What should be validated? args, conf, results etc.

Returns:

 Nothing

Raises: Exception ("Validation error")

Source code in enrichsdk/tasks/__init__.py
def validate(self, what, state):
    """
    Validate various aspects of the task state,
    configuration, and data. Dont override this function. Override
    specific functions such as `validate_args`.

    Args:
        what (str): What should be validated? args, conf, results etc.

    Returns:

         Nothing

    Raises:
         Exception ("Validation error")

    """

    func = getattr(self, "validate_" + what, None)
    if hasattr(func, "__call__"):
        return func(what, state)
    else:
        logger.error(
            "Cannot find function to validate {}".format(what),
            extra=self.config.get_extra({"transform": self.name}),
        )
        raise Exception("Validation error")

Set of predefined tasks

BackupCoreTask(*args, **kwargs)

Bases: CloudMixin, Task

Backsup code and data as required

Source code in enrichsdk/tasks/sdk.py
def __init__(self, *args, **kwargs):
    super().__init__(*args, **kwargs)
    self.name = "BackupCoreTask"
    self.dirspecs = "backupdirs"
    self.direction = "tocloud"

CloudMixin

Bases: object

get_command(d)

Get the command that must be run

Source code in enrichsdk/tasks/sdk.py
def get_command(self, d):
    """
    Get the command that must be run
    """

    # To or from cloud
    direction = self.direction

    # Extra params..
    params = self.get_params()

    if "aws" in self.args:
        prefix = "s3"
        bucket = self.args["aws"]["bucket"]
    elif "gcp" in self.args:
        prefix = "gs"
        bucket = self.args["gcp"]["bucket"]
    else:
        raise Exception("Unsupported source/destination")

    # Resolve the source and destination
    if direction == "tocloud":
        src = self.config.get_file(d["src"])
        dst = d["dst"] % params
        dst = "{}://{}/{}".format(prefix, bucket, dst)
    else:
        dst = os.path.abspath(self.config.get_file(d["dst"]))
        src = d["src"] % params
        src = "{}://{}/{}".format(prefix, bucket, src)

    if "aws" in self.args:

        # aws s3 sync s3://mybucket s3://mybucket2
        if direction == "tocloud" and os.path.isfile(src):
            awscmd = ["aws", "s3", "cp", "--quiet"]
        else:
            awscmd = ["aws", "s3", "sync", "--quiet"]

        if self.config.dryrun:
            awscmd.append("--dryrun")

        awscmd.extend([src, dst])
        return awscmd

    if "gcp" in self.args:

        # gsutil sync gs://mybucket gs://mybucket2
        if direction == "tocloud" and os.path.isfile(src):
            gscmd = ["gsutil", "cp"]
        else:
            gscmd = ["gsutil", "-m", "-q", "rsync", "-r"]

        if self.config.dryrun:
            gscmd.append("-n")

        # Add source and destination
        gscmd.extend([src, dst])

        return gscmd

    raise Exception("Unsupported source/destination")

get_env()

Get environment for the cloud provider

Source code in enrichsdk/tasks/sdk.py
def get_env(self):
    """
    Get environment for the cloud provider
    """

    args = self.args
    env = dict(os.environ)
    if "aws" in args:
        cred = self.get_credentials(args["aws"]["credentials"])
        if cred is not None:
            if ("access_key" in cred) and (isinstance(cred["access_key"], str)):
                env["AWS_ACCESS_KEY_ID"] = cred["access_key"]
            if ("secret_key" in cred) and (isinstance(cred["secret_key"], str)):
                env["AWS_SECRET_ACCESS_KEY"] = cred["secret_key"]
    else:
        env["BOTO_CONFIG"] = self.config.get_file(args["gcp"]["boto"])

    env["TZ"] = args.get("timezone", "Asia/Kolkata")
    return env

run_sync(state)

Run the backup task

Source code in enrichsdk/tasks/sdk.py
    def run_sync(self, state):
        """
        Run the backup task
        """

        env = self.get_env()

        dirspecs = self.args[self.dirspecs]
        for d in dirspecs:
            enable = d.get("enable", True)
            if not enable:
                continue

            name = d["name"]

            # => What should I be running?
            cmd = self.get_command(d)

            # Now run the command
            p = subprocess.Popen(
                cmd, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE
            )
            out, err = p.communicate()
            out = str(out, "utf-8")
            err = str(err, "utf-8")

            msg = """\
Cmd: {}
Error
-----
{}

Output:
-------
{}"""
            msg = msg.format(" ".join(cmd), err, out)

            # Log cloud-specific information
            if "BOTO_CONFIG" in env:
                msg += "Boto Config: {}".format(env["BOTO_CONFIG"])

            logger.debug(
                "Backing up {}".format(name),
                extra=self.config.get_extra({"transform": self.name, "data": msg}),
            )

        logger.debug(
            "Sync'd all dirs",
            extra=self.config.get_extra(
                {
                    "transform": self.name,
                }
            ),
        )

validate_args(what, state)

Validate args.

Looks like 'aws' and 'backupdirs' specification

Example::

"args": {
     "backupdirs": [
        {
            "enable": true,
            "name": "Logs",
            "src": "%(enrich_root)s/logs/",
            "dst": "backup/logs/"
         },
                    ...
      ],
      "aws": {
          "credentials": "enrich-acme",
          "bucket": "client-enrich"
      }
}
Source code in enrichsdk/tasks/sdk.py
def validate_args(self, what, state):
    """
    Validate args.

    Looks like 'aws' and 'backupdirs' specification

    Example::

        "args": {
             "backupdirs": [
                {
                    "enable": true,
                    "name": "Logs",
                    "src": "%(enrich_root)s/logs/",
                    "dst": "backup/logs/"
                 },
                            ...
              ],
              "aws": {
                  "credentials": "enrich-acme",
                  "bucket": "client-enrich"
              }
        }

    """
    args = self.args

    # Check
    fail, msg = self.validate_args_cloud(what, state)

    if fail:
        logger.error(
            "Invalid configuration",
            extra=self.config.get_extra({"transform": self.name, "data": msg}),
        )
        raise Exception("Invalid configuration")

SyncLocalCoreTask(*args, **kwargs)

Bases: CloudMixin, Task

Syncs a local directory with content of a s3 bucket

Source code in enrichsdk/tasks/sdk.py
def __init__(self, *args, **kwargs):
    super().__init__(*args, **kwargs)
    self.name = "SyncLocalCoreTask"
    self.dirspecs = "localdirs"
    self.direction = "fromcloud"

Example

Imagine a daily task that sync's data from a API before triggering a pipeline on that data. The task requires a combination of a simple library apisynctask and a task configuration file APISync.json

The directory structure looks like this:

$ cd acme/Marketing
$ find tasks 
tasks/
tasks/lib
tasks/lib/apisynctask
tasks/lib/apisynctask/__init__.py
tasks/conf
tasks/conf/APISync.json

The task configuration looks like this:

$ cat tasks/conf/APISync.json
{
    "name": "APISync",
    "description": "Download data from API provider", 
    "customer_root": "%(enrich_customers_dir)s/acme/Marketing", 
    "data_root": "%(enrich_data_dir)s/acme/Marketing",
    "runid": "dailysync-%Y%m%d-%H%M%S",
    "output": "%(data_root)s/output/tasks/%(name)s", 
    "log": "%(output)s/%(runid)s/log.json", 
    "paths": { 
         "libraries": ["%(customer_root)s/tasks/lib"]
    },
    "notification": {
        "email": ["Team <marketing-tech@acmeinc.com>"]
    },
    "tasks": [
        {
            "name": "APISyncTask",
            "enable": true,
            "args": { 
            "site-params": { 
                "1831": { 
                "startdate": "2017-10-20"
                }
            },
            "sensor": { 
                "credentials": "apisync-creds"
            }
            }
        }
        ]
}

The library could be:

import json 
import datetime 
import requests 
from requests.auth import HTTPBasicAuth
import logging 
from enrichsdk.tasks import Task

logger = logging.getLogger("app")

class APISyncTask(Task): 
    NAME = "APISyncTask"

    def __init__(self, *args, **kwargs): 
        super().__init__(*args, **kwargs) 
        self.name = "APISyncTask" 
        self.urls = { 
            "events": "https://api.devicevendor.com/?siteid={}"
        }

    def validate(self): 
        """
    Validate the parameters
    """
        if len(self.args) == 0: 
            raise Exception("Invalid args. siteid must be specified")

        required = {
            'site-params': [],
        }

        missing = [] 
        for r in required: 
            if r not in self.args: 
                missing.append(r) 
                continue 
            details = required[r] 
            if details is None or len(details) == 0: 
                continue 
            for d in details: 
                if d not in self.args[r]: 
                    missing.append("{}: {}".format(r, d))

        if len(missing) > 0: 
            raise Exception("Missing arguments: {}".format(",".join(missing)))

def download_files(self, state): 
    """
    Download the files from the device vendor's site 
    """

    # => Get credentials...
        cred_name = self.args['sensor']['credentials'] 
        credentials = self.get_credentials(cred_name)       
        date = datetime.now().strftime("%Y-%m-%d")

    # => Download data from each site 
        for siteid in self.args['site-params']: 
              url = self.urls['events'].format(siteid) 
              r = requests.get(url, 
                             auth=HTTPBasicAuth(credentials['username'],
                                                credentials['password']))
              # Write the file...
              content = r.content        
              ...

              # Notes to be shared 
              state.make_note("Downloaded: {}".format(siteid))

    def run(self, state): 
        """
    Run the task 
    """
        self.download_files(state)

        logger.debug("Completed run", 
                     extra=self.config.get_extra({
                         'transform': self.name, 
                     }))