Skip to content

SDK

This developer kit is meant for advanced users of the Enrich Full-Stack Feature Store. This includes command line scripts to create and manage Enrich modules and APIs to write new modules on the platform:

 _____            _      _       ____  ____  _  __
| ____|_ __  _ __(_) ___| |__   / ___||  _ \| |/ /
|  _| | '_ \| '__| |/ __| '_ \  \___ \| | | | ' /
| |___| | | | |  | | (__| | | |  ___) | |_| | . \
|_____|_| |_|_|  |_|\___|_| |_| |____/|____/|_|\_\

get_version()

Revision number of this branch/checkout, if available. None if no revision number can be determined.

Source code in enrichsdk/__init__.py
def get_version():
    """
    Revision number of this branch/checkout, if available. None if
    no revision number can be determined.
    """

    base = VERSION
    if __build__:
        base = "%s (%s)" % (base, __build__)
    return base

Pipelines

The core capability of the Enrich platform is the ability to define and execute complex computational pipelines. These pipelines load and transform data as required. Enrich is non-opinionated about what kinds of transforms should be written and for what purpose. What Enrich provides is a framework to define, compose, and productionize the data transformation code.

DSL

Each pipeline is defined as in a DSL. It is designed to run in batch mode. It specifies:

  1. Composition Rules: What and how to compose the plugin components (transforms, models, and renderers)
  2. Plugin configuration: Where to find these components and the parameters for each
  3. Metadata: Locations of input, output, and run metadata including those that are shared across runs and across applications.
  4. Post-processing: Any actions such as notifications

An example:

{
    "version: "1.0",
    "name": "DailySummary",
    "description": "Summarize historical and daily increment of data along various dimensions",
    "customer_root": "%(enrich_customers_dir)s/acme/Marketing",
    "data_root": "%(enrich_data_dir)s/acme/Marketing",
    "runid": "dailysummary-%Y%m%d-%H%M%S",
    "output": "%(data_root)s/output/%(name)s",
    "log": "%(output)s/%(runid)s/log.json",
    "paths": {
         "libraries": ["%(customer_root)s/lib"]
         "packages": ["%(customer_root)s/pkg"]
    },
    "notification": {
         "enable": true,
         "email": ["Some Email <someemail@acme.com>"],
     "errors": "enabled"
    },
    "enable_extra_args": true,
    "transforms": {
        "enabled": [
            {
                "transform": "SensorPreprocess",
                "enable": true,
                "dependencies": {
                   "sales": "AcmeSalesPrep"
                },
                "args": {
                    "cache": "%(data_root)s/shared/1121",
                    "aws": {
                        "credentials": "acme-datalake",
                        "bucket": "acme-store",
                        "path": "rawdata/acme/1121/daily"
                    }
                }
            },
          ...
      ]
    }
}
  • version

    Version of the specification format. Current it is at "1.0"

  • name

    Name of the pipeline. It should be short and without spaces to enable use on the GUI, automatic documentation etc.

  • description

    Human readable summary of this pipeline. Should be ideally less than 80 characters

  • customer_root

    Where should be find the other related modules of this pipeline such as services, assets etc.

  • data_root

    Where should the output of pipeline runs go? Right now only local filesystem paths are supported.

  • runid

    Unique name for each run. This is a template that is instantiated for each run. Ideally this should have a meaningful prefix followed by timestamp

  • output

    Unique path to store the run output

  • log

    Where should the log be stored. In theory each run's log can be anywhere but we keep all the files of a run in a single directory %(output)s

  • paths

    Where should the pipeline execution engine look for to find dependent libraries and packages.

  • paths > libraries

    Transforms can be simple libraries or full packages. This paths specify where to find the transforms.

    Note

    Assets within the package directory are loaded automatically

  • paths > packages

    Enrich recursively searches for transforms in the package directories and loads all of them. Scribble\'s Contrib packages that has a number of prebuilt components is added by default.

  • notification

    Enrich supports notification generation for the pipeline execution. The notification block has a flag specifying whether or not notifications are enabled, if so, a list of email addresses to which the notification should be sent. It has one further variable "errors" that specifies whether messages should be sent if the pipeline observes an exception. This is meant to avoid notifications during development of the pipeline when many messages are likely to be generated.

  • enable_extra_args

    Transforms may be require run-time args such as duration of the data to look at. This flag specifies the pipeline executor should look for and gather all the attributes and pass them to the right transform. The Enrich dashboard takes care of collecting these attributes and pass them over.

  • transforms

    This section specifies a list of transforms. The "enabled" specifies the list of active transforms. Individual transforms in the enabled list can be further enabled/disabled.

    The transform specification has several elements:

    • name: Name of the transform as specified in transform code
    • enable: Whether this transform is enabled
    • dependencies: A dictionary specifying the name of dataframe and the processing transform. The dependencies are analyzed to correctly schedule the transform.
    • args: Arguments to the trasform. Variables in the args dictionary could use any of the other variables specified in the pipeline. The transform specifies what the args should be.

Execution

The enrich pipeline execution engine takes the specification and executes it. It does so by load, instantiating, and configuring transform plugins, and execute them based on their readiness. During the whole process, it keeps track of state and execution failures, and makes sure that the execution is always tracked and controlled.

The rough execution process includes:

  1. Initialization

    1. Boostrap the execution state
    2. Clean the configuration
    3. Store metadata
  2. Prepare

    1. Set execution parameters such as directories
    2. Load all available plugins and ready them
  3. Load transforms

    This involves finding and instantiating the plugins:

    1. Initialize the plugin
    2. Configure the plugin:

      1. Validate configuration before loading including args
      2. Update internal parameters based on the configuration
      3. Extract, clean, and load args
      4. Load dependencies

    Note that a single transform could be instantiated multiple times with different roles and parameters.

  4. Validate the configuration

    1. Pipeline
    2. All transforms
  5. Build and execute DAG based on the dependencies:

    1. Start with transforms marked as 'source'
    2. Check the pre-conditions for every transform and execute when ready
    3. Check post-conditions after execution
    4. End with transforms marked as 'sink'
    5. Dump metadata
  6. Send notifications

    1. Only email supported for now. Slack will be supported in future