Skip to content

Non-Core

Modules outside that help with a number of transform development and operations.

Dataset Management

Please also see Test Data Setup section for command line interface to use the datasets.

Transforms have numerous data dependencies, and as time is progressing the dependencies are increasing and they have to tracked. Further datasets are used in multiple places including the feature marketplace and compliance modules.

Abstract dataset specification. This is useful for:

(a) handling dependent datasets (b) cleaning (c) checking (d) validating & sampling (e) discovery of data

There are two abstractions: Dataset and DatasetRegistry.

Arg(name, description, *args, **kwargs)

Dataset command argument

Source code in enrichsdk/datasets/discover.py
def __init__(self, name, description, *args, **kwargs):
    self.name = name
    self.description = description

find(datasource)

Find value values

Source code in enrichsdk/datasets/discover.py
def find(self, datasource):
    """
    Find value values
    """
    raise Exception("Not implemented")

DataSource(params, *args, **kwargs)

Bases: object

Class for specifying a dataset. This is a base class meant to be derived and implemented.

Source code in enrichsdk/datasets/discover.py
def __init__(self, params, *args, **kwargs):

    self._type = params.get("type", "unknown")
    """
    Type of the dataset.
    """

    self.name = params["name"]
    """
    Unique name of the dataset, e.g., Events
    """

    self.description = params.get("description", "")
    """
    str: Description
    Example: "Daily events for Alpha Sensor"
    """

    alt_names = params.get("alt_names", [])
    if not isinstance(alt_names, list):
        alt_names = [alt_names]
    alt_names = [str(x) for x in alt_names]
    self.alt_names = alt_names
    """
    list: Alternative names for the dataset
    Example: "['txn_charges']"
    """

    self.version = params.get("version", "v1")
    """
    str: Version
    Example: "v1"
    """

    self.resolve = kwargs.get("resolve", params.get("resolve", {}))
    """
    Parameters to resolve paths
    """

    # Backward compatability
    if "paths" not in params:
        params["paths"] = []
        for name in ["local", "test", "backup"]:
            if name in params:
                params["paths"].append(
                    {
                        "name": name,
                        "nature": "s3" if name == "backup" else "local",
                        "path": params[name],
                    }
                )

    self.validate(params)

    self._type = "File"

    self.isfile = params.get("isfile", False)
    """
    Specifies whether the dataset is simple (a file for each
    run) or more complex directory hierarchy.

    Example::

        # A complex dataset
        Dataset(params={
            "name": 'athena-availability',
            "isfile": False,
            ...

    """
    self.issortable = params.get("issortable", True)

    self.subsets = params.get("subsets", [])
    """
    A dataset oftentimes has multiple components. Define
    them here. We can give a name to each.

    Example::

      {
         ...
         "subsets": [
             {
                 "name": "v1culltable",
                 "filename": "v1culltable.csv"
             },...
          ]
      }
    """

    self.paths = params.get("paths", {})
    """
    dict: Paths to be resolved
    Example::

        [
          {
            "name": "test",
            "nature": "local",
            "path": "%(data_root)s/shared/datasets/athena/v2/availability"
          }
       ]
     """

    self.registry = None
    """
    object: Dataset manager object
    """

alt_names = alt_names instance-attribute

list: Alternative names for the dataset Example: "['txn_charges']"

description = params.get('description', '') instance-attribute

str: Description Example: "Daily events for Alpha Sensor"

isfile = params.get('isfile', False) instance-attribute

Specifies whether the dataset is simple (a file for each run) or more complex directory hierarchy.

Example::

# A complex dataset
Dataset(params={
    "name": 'athena-availability',
    "isfile": False,
    ...

name = params['name'] instance-attribute

Unique name of the dataset, e.g., Events

paths = params.get('paths', {}) instance-attribute

dict: Paths to be resolved Example::

[
  {
    "name": "test",
    "nature": "local",
    "path": "%(data_root)s/shared/datasets/athena/v2/availability"
  }

]

registry = None instance-attribute

object: Dataset manager object

resolve = kwargs.get('resolve', params.get('resolve', {})) instance-attribute

Parameters to resolve paths

subsets = params.get('subsets', []) instance-attribute

A dataset oftentimes has multiple components. Define them here. We can give a name to each.

Example::

{ ... "subsets": [ { "name": "v1culltable", "filename": "v1culltable.csv" },... ] }

version = params.get('version', 'v1') instance-attribute

str: Version Example: "v1"

get_doodle_source(filename)

Name as it may be in doodle

Source code in enrichsdk/datasets/discover.py
def get_doodle_source(self, filename):
    """
    Name as it may be in doodle
    """
    params = {
        'filename': filename
    }

    if not self.has_subsets():
        return {
            'name': self.name,
            'version': self.version,
        }

    for s in self.subsets:
        try:
            if self.in_subset(s['name'], params):
                subsetname = self.resolve_subset_name(s['name'], params)
                return {
                    "name": f"{self.name}-{subsetname}",
                    "version": self.version
                }
        except:
            continue
    raise Exception("Could not construct doodle source name")

get_match_spec()

Get a serializable specification of the matching function.

Source code in enrichsdk/datasets/discover.py
def get_match_spec(self):
    """
    Get a serializable specification of the matching
    function.
    """
    return json.loads(json.dumps(self.match, cls=SafeEncoder))

get_path_by_name(name, full=False, resolve=None)

Return the path definition for a path with a given name

Source code in enrichsdk/datasets/discover.py
def get_path_by_name(self, name, full=False, resolve=None):
    """
    Return the path definition for a path with a given name
    """

    if resolve is None:
        resolve = self.resolve

    selected = None
    for p in self.paths:
        if p["name"] == name:
            selected = p
            break
    if selected is None:
        raise Exception("Path with name {} not present".format(name))

    if full:
        return selected

    path = selected["path"]

    try:
        if (
            (resolve is not None)
            and (isinstance(resolve, dict))
            and (len(resolve) > 0)
        ):
            path = path % resolve
    except:
        raise Exception("Path could not be resolved")

    return path

get_paths()

Return the available paths (full list of dictionaries, not just the names

Source code in enrichsdk/datasets/discover.py
def get_paths(self):
    """
    Return the available paths (full list of dictionaries,
    not just the names
    """
    return self.paths

get_subset_description(name)

Get the description for a subset

Source code in enrichsdk/datasets/discover.py
def get_subset_description(self, name):
    """
    Get the description for a subset
    """
    for s in self.subsets:
        if s["name"] == name:
            return s.get("description", self.description)

    raise Exception("Unknown subset: {}".format(name))

get_subset_detail(name)

Return the names of subsets available

Source code in enrichsdk/datasets/discover.py
def get_subset_detail(self, name):
    """
    Return the names of subsets available
    """
    for s in self.subsets:
        if s['name'] == name:
            return s

    raise Exception("Unknown subset: {}".format(name))

get_subset_match_spec(name, params={})

Get a serializable specification of the matching function for the subset of this dataset

Source code in enrichsdk/datasets/discover.py
def get_subset_match_spec(self, name, params={}):
    """
    Get a serializable specification of the matching
    function for the subset of this dataset
    """
    spec = self.get_match_spec()

    subset = None
    for s in self.subsets:
        subsetname = self.resolve_subset_name(name, params)
        if name == subsetname:
            subset = s
            break

    if subset is None:
        raise Exception(f"Cannot find subset {name} in {self.name}")

    if 'filename' in subset:
        spec['filename'] = subset['filename']
    elif 'pattern' in subset:
        spec['filename'] = subset['pattern']

    return spec

get_subsets()

Return the names of subsets available

Source code in enrichsdk/datasets/discover.py
def get_subsets(self):
    """
    Return the names of subsets available
    """
    return [s["name"] for s in self.subsets]

get_subsets_detail()

Return the names of subsets available

Source code in enrichsdk/datasets/discover.py
def get_subsets_detail(self):
    """
    Return the names of subsets available
    """
    return self.subsets

has_subsets()

Does this dataset have components?

Source code in enrichsdk/datasets/discover.py
def has_subsets(self):
    """
    Does this dataset have components?
    """
    return len(self.subsets) > 0

in_subset(name, spec)

Return the names of subsets available

Source code in enrichsdk/datasets/discover.py
def in_subset(self, name, spec):
    """
    Return the names of subsets available
    """

    if (not isinstance(spec, dict)) or (len(spec) == 0) or ("filename" not in spec):
        raise Exception("invalid subset check input. Expecting dict with filename")

    # Does the subset name exist?
    subset = None
    for s in self.subsets:
        if name == s["name"]:
            subset = s
            break
    if subset is None:
        return False

    #
    if (("filename" not in subset) and ("pattern" not in subset)):
        return False

    if "filename" in subset:
        pattern = subset['filename'] + "$"
    else:
        pattern = subset["pattern"]

    # What to check?
    filename = spec["filename"]

    return re.search(pattern, filename) is not None

matches(names)

Check if the dataset has a particular name

Source code in enrichsdk/datasets/discover.py
def matches(self, names):
    """
    Check if the dataset has a particular name
    """
    if not isinstance(names, list):
        names = [names]

    allnames = [self.name] + self.alt_names
    subsets = self.get_subsets()
    allnames += [self.name + "-" + s for s in subsets]

    return any([name in allnames for name in names])

set_registry(registry)

Set the registry object

Source code in enrichsdk/datasets/discover.py
def set_registry(self, registry):
    """
    Set the registry object
    """
    self.registry = registry

set_resolve(resolve)

Allow the user to specify a resolving dict

Source code in enrichsdk/datasets/discover.py
def set_resolve(self, resolve):
    """
    Allow the user to specify a resolving dict
    """
    assert isinstance(resolve, dict)
    assert len(resolve) > 0
    self.resolve.update(resolve)

Dataset(params, *args, **kwargs)

Bases: DataSource

Class for specifying a dataset. This is typically an input to the transforms.

Usage::

# A dataset that has one directory for each day. Within that
# there are atleast two sub-datasets
Dataset(params={
    "name": 'athena-availability',
    "type": "file",
    "paths": [
        {
            "name": "test",
            "nature": "local",
            "path": "%(data_root)s/shared/datasets/athena/v2/availability",

        },
        {
            "name": "local",
            "nature": "local",
            "path": "%(enrich_data_dir)s/acme/Marketing/shared/datasets/athena/v2/availability",
        },
        {
            "name": "s3",
            "nature": "s3",
            "path": "%(backup_root)s/%(node)s/data/acme/Marketing/shared/datasets/athena/v2/availability",
        },
    ],
    "match": {
        "generate": "generate_datetime_daily",
        "compare": "compare_datetime_pattern_match_range",
        "pattern": "%Y-%m-%d",
    },
    "subsets": [
        {
            "name": "CatalogScore",
            "filename": "catalogscore.csv",
            "description": "Score of all products in the catalog"
        },
        {
            "name": "ProductWeight",
            "filename": "productweight.csv",
            "description": "Assortment weight for each product"
        }
    ]
})
Source code in enrichsdk/datasets/discover.py
def __init__(self, params, *args, **kwargs):

    super().__init__(params, *args, **kwargs)

    self.match = params.get("match", {})
    """
    dict: Generating and matching rules

    Example::

        {
            "generate": "generate_datetime_daily",
            "params": {},
            "match": "match_datetime_pattern_range",
            "pattern": "plpevents-%Y%m%d-%H%M%S",
        }
    """

    # Insert instance id
    name = self.name
    for s in self.subsets:
        s['instanceid'] = sha256(f"{name}:{s['name']}".encode("utf-8")).hexdigest()[:16]

backup property

Read the path with the name 'backup'. Available for backward compatability

local property

Read the path with the name 'local'. Available for backward compatability

match = params.get('match', {}) instance-attribute

dict: Generating and matching rules

Example::

{
    "generate": "generate_datetime_daily",
    "params": {},
    "match": "match_datetime_pattern_range",
    "pattern": "plpevents-%Y%m%d-%H%M%S",
}

test property

Read the path with the name 'test'. Available for backward compatability

compare(name, start, end=None)

Given a start and an end, generate the datetime objects corresponding to each run

Args: name (str): Directory name start (datetime): Start of the time range end (datetime): End of range. If None, default = start

Source code in enrichsdk/datasets/discover.py
def compare(self, name, start, end=None):
    """
    Given a start and an end, generate the datetime
    objects corresponding to each run

    Args:
       name (str): Directory name
       start (datetime): Start of the time range
       end (datetime): End of range. If None, default = start

    """
    if (not isinstance(start, datetime)) or (
        (end is not None) and (not isinstance(end, datetime))
    ):
        raise Exception("Generate names in the range")

    # One day
    if end is None:
        end = start

    # Handle ordering...
    if end < start:
        start, end = end, start

    func = params["match"].get("compare", "compare_datetime_pattern_match_range")
    if isinstance(func, str):
        func = getattr(self, func)

    # Generate all possible names
    return func(name, start, end)

compare_datetime_pattern_match_range(name, start, end)

Given a start and an end, generate the datetime objects corresponding to each run for each day, and check whether a named file/directory exists in that list.

Args: start (datetime): Start of the time range end (datetime): End of range. If None, default = start

Source code in enrichsdk/datasets/discover.py
def compare_datetime_pattern_match_range(self, name, start, end):
    """
    Given a start and an end, generate the datetime
    objects corresponding to each run for each day, and
    check whether a named file/directory exists in that list.

    Args:
       start (datetime): Start of the time range
       end (datetime): End of range. If None, default = start
    """

    if not isinstance(name, str):
        return False

    # Generate all possible <timestamp>: <name> combinations
    names = self.generate(start, end)

    # names is a dict {'timestamp': '2021-02-02T13:24:11', 'name': '2021-02-02'}
    names = [n["name"] for n in names]
    # Check if the name is in the names or in the range. The
    # latter is required if there are multiple runs in a given day
    #
    return (name in names) or ((name >= min(names) and name <= max(names)))

generate(start, end=None, full=False, name='default', resolve=None)

Given a start and an end, generate the datetime objects corresponding to each run

Args: start (datetime): Start of the time range end (datetime): End of range. If None, default = start full (bool): Whether full path is required or only the suffix. Default is false name (str): Name of the path specification. Optional is full path is required resolve (dict): Additional path resolution parameters

Returns: list (dict): List of dictionaries (name, timestamp)

Source code in enrichsdk/datasets/discover.py
def generate(self, start, end=None, full=False, name="default", resolve=None):
    """
    Given a start and an end, generate the datetime
    objects corresponding to each run

    Args:
       start (datetime): Start of the time range
       end (datetime): End of range. If None, default = start
       full (bool): Whether full path is required or only the suffix. Default is false
       name (str): Name of the path specification. Optional is full path is required
       resolve (dict): Additional path resolution parameters

    Returns:
       list (dict): List of dictionaries (name, timestamp)

    """
    if (not isinstance(start, (datetime, date))) or (
        (end is not None) and (not isinstance(end, (datetime, date)))
    ):
        raise Exception("Start or end is not a datetime/date")

    # One day
    if end is None:
        end = start

    # Handle ordering...
    if end < start:
        start, end = end, start

    func = self.match["generate"]
    if isinstance(func, str):
        if func in shared_generators:
            func = shared_generators[func]
        elif hasattr(self, func):
            func = getattr(self, func)
        else:
            raise Exception(f"Unable to find generator: {func}")

    # Generate all possible names
    names = func(self.match, start, end)

    # Now insert the full path...
    try:
        if full:
            path_root = self.get_path_by_name(name, resolve=resolve)
            for n in names:
                n['path'] = os.path.join(path_root, n['name'])
    except:
        logger.exception("Unable to resolve names")

    # Check if the name is in the names..
    return names

listdir(name, fshandle=None, resolve=None, detail=False)

List the dataset directory

Source code in enrichsdk/datasets/discover.py
def listdir(self, name, fshandle=None, resolve=None, detail=False):
    """
    List the dataset directory
    """

    selected = None
    for p in self.paths:
        if p["name"] == name:
            selected = p
            break

    if selected is None:
        raise Exception("Unknown name of path: {}".format(name))

    if (selected["nature"] in ["s3", "gcs"]) and (
        (fshandle is None) or (not isinstance(fshandle, S3FileSystem))
    ):
        raise Exception(
            "Filesystem handle should be provided when nature is s3|gcs"
        )

    if not isinstance(detail, bool):
        raise Exception("detail is a boolean field")

    path = self.paths[name]

    try:
        if (
            (resolve is not None)
            and (isinstance(resolve, dict))
            and (len(resolve) > 0)
        ):
            path = path % resolve
    except:
        raise Exception("Path could not be resolved")

    if fshandle is None:
        return os.listdir(path)

    results = fshandle.listdir(path, detail=detail)
    if not detail:
        results = [os.path.basename(r) for r in results]

    return results

match_content(fs, localdir, backupdir, dirname)

Check whether local path is replicated in the s3/blob store

Args: fs (object): s3fs handle localdir (str): Root local dir to check backupdir (str): Root remote dir in block store to check dirname (str): path within the localdir

Source code in enrichsdk/datasets/discover.py
def match_content(self, fs, localdir, backupdir, dirname):
    """
    Check whether local path is replicated in the  s3/blob store

    Args:
       fs (object): s3fs handle
       localdir (str): Root local dir to check
       backupdir (str): Root remote dir in block store to check
       dirname (str): path within the localdir
    """

    localpath = os.path.join(localdir, dirname)
    if os.path.isfile(localpath):
        filesize = getsize(localpath)
        backupfile = os.path.join(backupdir, dirname)
        try:
            detail = fs.info(backupfile)
        except FileNotFoundError:
            return False, "Missing file in backup ({})".format(dirname)
        except Exception as e:
            return False, str(e)

        if filesize != detail["size"]:
            return False, "Filesize mismatch: {}".format(dirname)

        return True, "Matched {} (size: {}M)".format(
            dirname, round(filesize / 1000000, 1)
        )

    matched = 0
    totalsize = 0
    for root, dirs, files in os.walk(localpath):
        for name in files:
            filename = os.path.join(root, name)
            relpath = os.path.relpath(filename, start=localpath)
            backupfile = os.path.join(backupdir, dirname, relpath)
            filesize = getsize(filename)
            try:
                detail = self.fs.info(backupfile)
            except FileNotFoundError:
                return False, "Missing file in backup ({})".format(relpath)
            except Exception as e:
                return False, str(e)

            if filesize != detail["size"]:
                return False, "Filesize mismatch: {}".format(relpath)

            matched += 1
            totalsize += filesize

    return True, "Matched {} (size: {}M)".format(
        matched, round(totalsize / 1000000, 1)
    )

read_data(start_date, end_date, filename, readfunc, errors=True, name='default', params={}, resolve={})

Read a single dataset

Args: start_date (str): Starting date to scan end_date (str): Ending date of the scan filename (str): Filename within each day's directory. If None, readfunc will be called with the directory name readfunc (method): Callback errors (bool): What to do on failure. True = bailout name (str): Which subset to look at? params (dict): Params to the readfunc resolve (dict): Parameters for resolution of path

Returns: dataframe: Data read dict: Optional metadata on the read

Source code in enrichsdk/datasets/discover.py
def read_data(
    self,
    start_date,
    end_date,
    filename,
    readfunc,
    errors=True,
    name="default",
    params={},
    resolve={},
):
    """
    Read a single dataset

    Args:
         start_date (str): Starting date to scan
         end_date (str): Ending date of the scan
         filename (str): Filename within each day's directory. If None, readfunc will be called with the directory name
         readfunc (method): Callback
         errors (bool): What to do on failure. True = bailout
         name (str): Which subset to look at?
         params (dict): Params to the readfunc
         resolve (dict): Parameters for resolution of path

    Returns:
         dataframe: Data read
         dict: Optional metadata on the read

    """

    dates = self.generate(start_date, end_date)
    if len(dates) == 0:
        raise Exception("No dates found")

    # Get the path template...
    path = self.get_path_by_name(name, resolve)

    metadata = {"start": datetime.now(), "files": []}
    # Now compose all the dataframes...
    _dfs = []
    for dt in dates:

        if dt['name'] not in ['', '.']:
            fullpath = os.path.join(path, dt["name"])
        else:
            fullpath = path

        if filename is not None:
            fullpath = os.path.join(fullpath, filename)

        try:
            metadata["files"].append(fullpath)
            df = readfunc(fullpath, params, date=dt["name"])
            if (df is None) or (len(df) == 0):
                continue
        except:
            if errors:
                raise
        _dfs.append(df)

    if len(_dfs) == 0:
        raise NoDataFound()

    df = pd.concat(_dfs, ignore_index=True)
    del _dfs

    metadata["end"] = datetime.now()

    # Note the access metadata
    if ((self.registry is not None) and
        (hasattr(self.registry, "access")) and
        (callable(self.registry.access))):
        self.registry.access(self, metadata, nature="read")

    return df, metadata

read_subset(start_date, end_date, subset, readfunc, errors=True, name='default', params={}, resolve={})

Read a particular subset of the dataset

Source code in enrichsdk/datasets/discover.py
def read_subset(
        self,
        start_date,
        end_date,
        subset,
        readfunc,
        errors=True,
        name="default",
        params={},
        resolve={},
):
    """
    Read a particular subset of the dataset
    """

    filename = None
    for s in self.subsets:
        if ((s['name'] == subset) or
            ('filename' in s)):
            filename = s['filename']

    if filename is None:
        raise Exception(f"Either subset doesnt exist or define a filename: {subset}")

    return self.read_data(
        start_date,
        end_date,
        filename,
        readfunc,
        errors=errors,
        name=name,
        params=params,
        resolve=resolve)

sample(filename, safe=True, fd=None, nrows=10, encoding='utf-8')

Sample a file belonging to this dataset.Subclass and overload this function if the data is sensitive.

Args: filename (str): A file that belongs to this dataset safe (bool): Whether file is trusted fd (object): File descriptor for s3/gcs/other non-filesystems nrows (int): Number of rows to sample

Source code in enrichsdk/datasets/discover.py
def sample(self, filename, safe=True, fd=None, nrows=10, encoding='utf-8'):
    """
    Sample a file belonging to this dataset.Subclass and
    overload this function if the data is sensitive.

    Args:
        filename (str): A file that belongs to this dataset
        safe (bool): Whether file is trusted
        fd (object): File descriptor for s3/gcs/other non-filesystems
        nrows (int): Number of rows to sample
    """

    skiprows = None
    filesize = 0 # unknown
    if fd is None:
        # Text formats..
        if filename.split(".")[-1].lower() in ["csv", "tsv"]:
            fd = open(filename)
        else:
            fd = open(filename, "rb")
        filesize = os.path.getsize(filename)

    # Read the file
    if filename.lower().endswith(".csv"):
        if filesize > 10**9 : # 1GB
            skiprows=lambda i: i % 100 != 0
        df = pd.read_csv(fd, nrows=nrows, skiprows=skiprows)
    elif filename.lower().endswith(".csv.gz"):
        if filesize > 10**8 : # 100MB
            skiprows=lambda i: i % 100 != 0
        df = pd.read_csv(fd, nrows=nrows,
                         compression="gzip",
                         error_bad_lines=False,
                         encoding=encoding,
                         skiprows=skiprows)
    elif filename.lower().endswith(".tsv.gz"):
        if filesize > 10**8 : # 100MB
            skiprows=lambda i: i % 100 != 0
        df = pd.read_csv(
            fd, nrows=nrows, sep="\t",
            compression="gzip",
            error_bad_lines=False,
            encoding=encoding,
            skiprows=skiprows
        )
    elif filename.lower().endswith(".tsv"):
        if filesize > 10**9 : # 1GB
            skiprows=lambda i: i % 100 != 0
        df = pd.read_csv(fd,
                         nrows=nrows,
                         sep="\t",
                         encoding=encoding,
                         skiprows=skiprows)
    elif filename.lower().endswith(".pq") or filename.lower().endswith(
        ".pq.sample"
    ):
        df = pd.read_parquet(fd)
        if nrows is not None:
            df = df.head(nrows)
    else:
        raise Exception(f"Unknown file format: {filename}")

    return df

validate(params)

Validate dataset arguments. It should be a dictionary with atleast three elements: name, match, and paths.

Args: params (dict): Parameters for dataset

Params dict should have the following:

  • name: string
  • match: dictionary with generate function (name or lambda), are function (name or lambda), and pattern (string).
Source code in enrichsdk/datasets/discover.py
def validate(self, params):
    """
    Validate dataset arguments. It should be a dictionary
    with atleast three elements: name, match, and paths.

    Args:
      params (dict): Parameters for dataset

    Params dict should have the following:

      * name: string
      * match: dictionary with generate function (name or lambda), are function (name or lambda), and pattern (string).

    """

    super().validate(params)

    # If match is specified, then do something...
    if ("match" not in params) or (not isinstance(params["match"], dict)):
        raise Exception("Match should be a valid dictionary")

    if "pattern" not in params["match"]:
        raise Exception("Matching requirement match.pattern missing")

    pattern = params["match"]["pattern"]
    if not isinstance(pattern, str) and not callable(pattern):
        raise Exception("Invalid pattern specification")

    generate = params["match"].get("generate", "generate_datetime_daily")

    if ((isinstance(generate, str)) and
        (not hasattr(self, generate)) and
        (generate not in shared_generators)):
        raise Exception("Matching requirement {} missing/invalid".format(generate))

    compare = params["match"].get("compare", "compare_datetime_pattern_match_range")
    if ((isinstance(compare, str)) and (not hasattr(self, compare))) and (
        not callable(compare)
    ):
        raise Exception("Matching requirement {} missing/invalid".format(compare))

DatasetRegistry(*args, **kwargs)

Bases: object

Registry for datasets.

This provides a search and resolution interface.

There is a notion of a command - a scp/aws command line template. The registry allows enumeration of the commands and enables scripting.

Args: commands (list): List of command templates. Optional. If not specified, the system will use defaults. resolve (dict): Path resolution dictionary

Source code in enrichsdk/datasets/discover.py
def __init__(self, *args, **kwargs):
    """

    Args:
        commands (list): List of command templates. Optional. If not specified,
                         the system will use defaults.
        resolve (dict): Path resolution dictionary

    """
    self.commands = kwargs.get("commands", get_commands())
    self.resolve = kwargs.get("resolve", {})
    self.state = kwargs.get('state', None)
    self.transform = kwargs.get('transform', None)
    if ((self.state is None) and (hasattr(self.transform, 'state'))):
        self.state = getattr(self.transform, 'state')

    self.datasets = []
    self.validate()

add_datasets(items)

Add to the registry. Each element in the item list should be a Dataset (or subclass).

Args: items (list): A list of datasets

Source code in enrichsdk/datasets/discover.py
def add_datasets(self, items):
    """
    Add to the registry. Each element in the item list
    should be a Dataset (or subclass).

    Args:
       items (list): A list of datasets
    """
    if isinstance(items, list):
        for item in items:
            assert isinstance(item, DataSource)
            item.set_resolve(self.resolve)
            item.set_registry(self)
        self.datasets.extend(items)
    else:
        assert isinstance(items, DataSource)
        items.set_resolve(self.resolve)
        item.set_registry(self)
        self.datasets.append(items)

find(names)

Find a dataset in the registry.

Args: names (str): Name of the dataset

Source code in enrichsdk/datasets/discover.py
def find(self, names):
    """
    Find a dataset in the registry.

    Args:
        names (str): Name of the dataset
    """
    if not isinstance(names, (list, str)):
        raise Exception(f"Unexpected dataset name(s): {names}")

    for d in self.datasets:
        if d.matches(names):
            return d

    return None

get_command(name, source_type='File')

Get the command specification by specifying its name

Args: name (str): Name of the command template

Source code in enrichsdk/datasets/discover.py
def get_command(self, name, source_type="File"):
    """
    Get the command specification by specifying its name

    Args:
        name (str): Name of the command template
    """
    for c in self.commands:
        if source_type not in c["source_type"]:
            continue
        if c["name"] == name:
            return c
    raise Exception("Missing command: {}".format(name))

get_commands(source_type='File')

Get the command specifications as a list of dicts

Source code in enrichsdk/datasets/discover.py
def get_commands(self, source_type="File"):
    """
    Get the command specifications as a list of dicts

    """
    if source_type is None:
        return self.commands

    # If specified, then apply that filter..
    return [c for c in self.commands if source_type in c["source_type"]]

list()

List all datasets in the registry

Source code in enrichsdk/datasets/discover.py
def list(self):
    """
    List all datasets in the registry
    """
    return [str(d) for d in self.datasets]

set_params(params)

Deprecated. Use set_resolve

Source code in enrichsdk/datasets/discover.py
def set_params(self, params):
    """
    Deprecated. Use set_resolve
    """
    return self.set_resolve(params)

set_resolve(resolve)

Set the resolution parameters

Args: resolve (dict): Resolution parameters

Source code in enrichsdk/datasets/discover.py
def set_resolve(self, resolve):
    """
    Set the resolution parameters

    Args:
       resolve (dict): Resolution parameters

    """
    assert isinstance(resolve, dict)
    assert len(resolve) > 0
    self.resolve = resolve

DateArg(name, *args, **kwargs)

Bases: Arg

Datetime

Source code in enrichsdk/datasets/discover.py
def __init__(self, name, *args, **kwargs):
    super().__init__(name, "Date within past 1 year", *args, **kwargs)

DynamicCustomDataset

Bases: Dataset

Class to handle dynamic subset resolution based on context

get_subset_description(name)

This has been already

Source code in enrichsdk/datasets/discover.py
def get_subset_description(self, name):
    """
    This has been already
    """
    return f"Custom dataset named {name}"

in_subset(name, params)

name is 'possibly' resolved dynamically and may not match the 'name' in the subset field

Source code in enrichsdk/datasets/discover.py
def in_subset(self, name, params):
    """
    name is 'possibly' resolved dynamically and may not match the
    'name' in the subset field
    """
    return True

Doodle(cred, *args, **kwargs)

Bases: object

default

Source code in enrichsdk/datasets/doodle.py
def __init__(self, cred, *args, **kwargs):
    """
    default
    """
    self.cred = cred
    self.generators = kwargs.pop('generators', {})
    self.purpose = None
    super().__init__(*args, **kwargs)

access_server(path, params={}, data={}, method='get')

Get/Post from doodle server

Source code in enrichsdk/datasets/doodle.py
def access_server(self, path, params={}, data={}, method='get'):
    """
    Get/Post from doodle server
    """

    cred = self.cred
    if isinstance(cred, str):
        server = cred
    elif isinstance(cred, dict) and ('url' in cred):
        server = cred['url']
    elif isinstance(cred, dict) and ('uri' in cred):
        server = cred['uri']
    else:
        raise Exception("Invalid server details")

    if server.endswith("/"):
        server = server[:-1]

    if path.startswith("/"):
        path = path[1:]

    headers = {
        'Accept': 'application/json'
    }

    # Try all possible keys
    for k in ['key', 'api_key', 'apikey']:
        if k in cred:
            headers['X-API-Key'] = cred[k]
            break

    extra= { "verify": False }
    if 'basicauth' in cred:
        auth = (cred['basicauth']['username'], cred['basicauth']['password'])
        extra['auth'] = auth

    url = f"{server}/{path}"

    try:
        if method == 'post':
            response = requests.post(url, params=params, headers=headers, json=data, **extra)
        elif method == 'patch':
            response = requests.patch(url, params=params, headers=headers, json=data, **extra)
        elif method == 'get':
            response = requests.get(url, params=params, headers=headers, **extra)
        else:
            raise Exception(f"Unknown method: {method}")
    except Exception as e:
        logger.exception("Unable to access server")
        return 500, str(e)

    try:
        result = response.json()
    except:
        result = {
            'error': response.content.decode('utf-8')
        }


    return response.status_code, result

add_feature(catalog_id, source_id, details)

Update the source with latest feature information

Source code in enrichsdk/datasets/doodle.py
def add_feature(self, catalog_id, source_id, details):
    """
    Update the source with latest feature information
    """

    details['catalog_id'] = catalog_id
    details['source_id'] = source_id

    status, result = self.access_server("/api/v1/features",
                                        data=details,
                                        method="post")

    if status >= 300:
        error = json.dumps(result['detail'])
        raise Exception(error)

    return result

add_source(catalog_id, details)

Update the catalog with latest dataset information

Source code in enrichsdk/datasets/doodle.py
def add_source(self, catalog_id, details):
    """
    Update the catalog with latest dataset information
    """

    details['catalog_id'] = catalog_id
    status, result = self.access_server("/api/v1/sources",
                                        data=details,
                                        method="post")

    if status >= 300:
        logger.error("Failed to update source",
                     extra={
                         'data': str(result)
                     })
        raise Exception(f"Could not add source to {catalog_id}")

    return result

compute_source_paths(source, start, end)

Read the source data from start to end dates...

Source code in enrichsdk/datasets/doodle.py
def compute_source_paths(self, source, start, end):
    """
    Read the source data from start to end dates...
    """

    """
    Sample matching specification
    {
        "generate": "generate_datetime_daily",
        "compare": "compare_datetime_pattern_match_range",
        "pattern": "%Y-%m-%d",
        "filename": "data.csv",
        "searchpaths": [
            "enrich-terrapay/backup/tip.terrapay.com/rawdata/queries/txn_value"
        ],
        "matchingpath": "enrich-terrapay/backup/tip.terrapay.com/rawdata/queries/txn_value/2022-11-07/data.csv"
    }
    """

    try:
        match = source['details']['match']
    except:
        raise Exception("Match section is missing")

    generator = match.get('generate', None)
    if generator is None:
        raise Exception("No generator specified")
    elif ((generator not in self.generators) and
          (generator not in shared_generators)):
        raise Exception(f"Unknown generator: {generator}")

    # Special case of a fixed path
    if generator == 'static_path':
        if 'matchingpath' in match:
            return [match['matchingpath']]
        else:
            raise Exception("matchingpath is missing")


    # Rest of the cases
    generator = self.generators.get(generator, shared_generators.get(generator))

    # Get all the subdirs
    subdirs = generator(match, start, end)
    root = match['searchpaths'][0]
    filename = match['filename']

    paths = []
    for subdir in subdirs:
        paths.append(os.path.join(root, subdir['name'], filename))

    return paths

find_catalog(name, version)

Find one catalog with a precise name and version

Source code in enrichsdk/datasets/doodle.py
def find_catalog(self, name, version):
    """
    Find one catalog with a precise name and version
    """

    result = self.search_catalogs(name=name,
                                  version=version)

    if len(result) > 1:
        raise Exception(f"Multiple {len(result)} catalogs. Expecting one")

    if len(result) == 0:
        raise Exception(f"No catalog found {name}:{version}")

    return result[0]

get_catalog(catalog_id)

Get the details of one catalog

Source code in enrichsdk/datasets/doodle.py
def get_catalog(self, catalog_id):
    """
    Get the details of one catalog
    """
    status, result = self.access_server(f"/api/v1/catalogs/{catalog_id}")

    if status >= 300:
        error = json.dumps(result['detail'])
        raise Exception(error)
    return result

get_feature(feature_id)

Update the feature with latest dataset information

Source code in enrichsdk/datasets/doodle.py
def get_feature(self, feature_id):
    """
    Update the feature with latest dataset information
    """
    status, result = self.access_server(f"/api/v1/features/{feature_id}")


    if status >= 300:
        error = json.dumps(result['detail'])
        raise Exception(error)

    return result

get_source(source_id)

Update the feature with latest dataset information

Source code in enrichsdk/datasets/doodle.py
def get_source(self, source_id):
    """
    Update the feature with latest dataset information
    """
    status, result = self.access_server(f"/api/v1/sources/{source_id}")

    if status >= 300:
        error = json.dumps(result['detail'])
        raise Exception(error)
    return result

get_source_paths(start, end, name=None, version='v1', source_id=None)

Find all the source paths

Source code in enrichsdk/datasets/doodle.py
def get_source_paths(self,
                     start, end,
                     name=None,
                     version="v1",
                     source_id=None):
    """
    Find all the source paths
    """
    if ((name is None) and (source_id is None)):
        raise Exception("name or source_id must be specified")

    if name is not None:
        sources = self.search_sources(name=name, version=version)
        if len(sources) > 0:
            source = sources[0]
        else:
            source = self.get_source(source_id)
    else:
        source = self.get_source(source_id)

    if not source['active']:
        logger.error("Inactive source: {source['id']}")

    paths = self.compute_source_paths(source, start, end)

    return source, paths

get_url()

Get base url

Source code in enrichsdk/datasets/doodle.py
def get_url(self):
    """
    Get base url
    """
    cred = self.cred
    if isinstance(cred, str):
        server = cred
    elif isinstance(cred, dict) and ('url' in cred):
        server = cred['url']
    elif isinstance(cred, dict) and ('uri' in cred):
        server = cred['uri']
    else:
        raise Exception("Unable to compute the url")

    server += "/docs"
    return server

list_catalogs(only_active=True, offset=0, limit=10, order_by=None)

Search the catalogs

Source code in enrichsdk/datasets/doodle.py
def list_catalogs(self,
                  only_active=True,
                  offset: int = 0,
                  limit: int = 10,
                  order_by: str = None):
    """
    Search the catalogs
    """

    params = {
        'only_active': only_active,
        'offset': offset,
        'limit': limit,
        'order_by': order_by
    }

    status, result = self.access_server("/api/v1/catalogs",
                                        params=params)
    if status >= 300:
        raise Exception(f"Could not list catalogs")

    return result

list_features(source_id=None, catalog_id=None, only_active=True, offset=0, limit=5000, order_by=None)

List available features for a source

Source code in enrichsdk/datasets/doodle.py
def list_features(self,
                  source_id=None,
                  catalog_id=None,
                  only_active=True,
                  offset: int = 0,
                  limit: int = 5000,
                  order_by: str = None):

    """
    List available features for a source
    """


    params = {
        'source_id': source_id,
        'catalog_id': catalog_id,
        'only_active': only_active,
        'offset': offset,
        'limit': limit,
        'order_by': order_by
    }
    status, result = self.access_server("/api/v1/features",
                                        params=params)

    if status >= 300:
        error = json.dumps(result['detail'])
        raise Exception(error)

    return result

list_sources(catalog_id=None, only_active=True, offset=0, limit=1000, order_by=None)

List available sources for a catalog

Source code in enrichsdk/datasets/doodle.py
def list_sources(self,
                 catalog_id=None,
                 only_active=True,
                 offset: int = 0,
                 limit: int = 1000,
                 order_by: str = None):
    """
    List available sources for a catalog
    """

    params = {
        'catalog_id': catalog_id,
        'only_active': only_active,
        'offset': offset,
        'limit': limit,
        'order_by': order_by
    }

    status, result = self.access_server("/api/v1/sources",
                                        params=params)

    if status >= 300:
        logger.error("Failed to list sources",
                     extra={
                         'data': str(result)
                     })
        raise Exception(f"Could not find sources for catalog {catalog_id}")

    return result

search_catalogs(only_active=True, name=None, version=None, offset=0, limit=10, query=None, modified_since=None, modified_before=None, order_by=None)

Search the catalogs

Source code in enrichsdk/datasets/doodle.py
def search_catalogs(self,
                  only_active=True,
                  name: str = None,
                  version: str = None,
                  offset: int = 0,
                  limit: int = 10,
                  query: str = None,
                  modified_since: datetime = None,
                  modified_before: datetime = None,
                  order_by: str = None):
    """
    Search the catalogs
    """

    params = {
        'only_active': only_active,
        'name': name,
        'version': version,
        'offset': offset,
        'limit': limit,
        'query': query,
        'modified_since': modified_since,
        'modified_before': modified_before,
        'order_by': order_by
    }

    status, result = self.access_server(f"/api/v1/catalogs/search",
                                        params=params)

    if status >= 300:
        error = json.dumps(result['detail'])
        raise Exception(error)

    return result

search_features(only_active=True, name=None, version=None, catalog_id=None, source_id=None, offset=0, limit=10, query=None, modified_since=None, modified_before=None, order_by=None)

Search the features

Source code in enrichsdk/datasets/doodle.py
def search_features(self,
                    only_active=True,
                    name: str = None,
                    version: str = None,
                    catalog_id: str = None,
                    source_id: str = None,
                    offset: int = 0,
                    limit: int = 10,
                    query: str = None,
                    modified_since: datetime = None,
                    modified_before: datetime = None,
                    order_by: str = None):
    """
    Search the features
    """

    params = {
        'only_active': only_active,
        'name': name,
        'version': version,
        'catalog_id': catalog_id,
        'source_id': source_id,
        'offset': offset,
        'limit': limit,
        'query': query,
        'modified_since': modified_since,
        'modified_before': modified_before,
        'order_by': order_by
    }

    status, result = self.access_server(f"/api/v1/features/search",
                                        params=params)

    if status >= 300:
        error = json.dumps(result['detail'])
        raise Exception(error)

    return result

search_sources(only_active=True, name=None, version=None, catalog_id=None, offset=0, limit=10, query=None, modified_since=None, modified_before=None, order_by=None)

Search the sources

Source code in enrichsdk/datasets/doodle.py
def search_sources(self,
                   only_active=True,
                   name: str = None,
                   version: str = None,
                   catalog_id: str = None,
                   offset: int = 0,
                   limit: int = 10,
                   query: str = None,
                   modified_since: datetime = None,
                   modified_before: datetime = None,
                   order_by: str = None):
    """
    Search the sources
    """

    params = {
        'only_active': only_active,
        'name': name,
        'version': version,
        'catalog_id': catalog_id,
        'offset': offset,
        'limit': limit,
        'query': query,
        'modified_since': modified_since,
        'modified_before': modified_before,
        'order_by': order_by
    }

    status, result = self.access_server(f"/api/v1/sources/search",
                                        params=params)

    if status >= 300:
        error = json.dumps(result['detail'])
        raise Exception(error)

    return result

update_feature(feature_id, details)

Update the feature with latest dataset information

Source code in enrichsdk/datasets/doodle.py
def update_feature(self, feature_id, details):
    """
    Update the feature with latest dataset information
    """
    status, result = self.access_server(f"/api/v1/features/{feature_id}",
                                        data=details,
                                        method="post")

    if status >= 300:
        error = json.dumps(result['detail'])
        raise Exception(error)
    return result

update_source(source_id, details)

Update the catalog with latest dataset information

Source code in enrichsdk/datasets/doodle.py
def update_source(self, source_id, details):
    """
    Update the catalog with latest dataset information
    """

    status, result = self.access_server(f"/api/v1/sources/{source_id}",
                                        data=details,
                                        method="post")

    if status >= 300:
        logger.error("Failed to update source",
                     extra={
                         'data': str(result)
                     })
        raise Exception(f"Could not update source {source_id}")

    return result

TransformDoodle(transform, state, *args, **kwargs)

Bases: Doodle

Extension to Doodle to allow passing of transform information to the doodle server

Source code in enrichsdk/datasets/doodle.py
def __init__(self, transform, state, *args, **kwargs):
    """
    """
    self.transform = transform
    self.state = state
    self.action = kwargs.pop('action', 'read')

    super().__init__(*args, **kwargs)

add_source(catalog_id, details)

Insert access metadata before posting

Source code in enrichsdk/datasets/doodle.py
def add_source(self, catalog_id, details):
    """
    Insert access metadata before posting
    """
    self.insert_access_metadata(details)
    return super().add_source(catalog_id, details)

update_source(source_id, details)

Insert access metadata before posting

Source code in enrichsdk/datasets/doodle.py
def update_source(self, source_id, details):
    """
    Insert access metadata before posting
    """
    self.insert_access_metadata(details)
    return super().update_source(source_id, details)

Feature Engineering

Modules to support feature engineering of objects (dictionaries).

There are two base classes at the individual feature level and at a feature set level. There is a compute function that iterates through these for all the objects

FeatureExtractorBase(*args, **kwargs)

Extract a single feature

Example::

class SimpleFeatureExtractor(FeatureExtractorBase):

    def __init__(self, *args, **kwargs):
        self.name = "simple"

    def extract(self, name, data, key=None):
        if key is None:
            key = name
        value = jmespath.search(key, data)
        return [{
            'key': name,
            'value': value
        }]
Source code in enrichsdk/feature_compute/__init__.py
def __init__(self, *args, **kwargs):
    self.name = "base"

extract(name, data, key=None)

Given data and a name, generate some attributes. The return value should be a list of dictionaries

Args: name (str): name of the feature data (dict): A dictionary key (str): Dictionary key potentially if name is not key

Returns: list: List of dictionaries. Each dict has a "key" and "value

Source code in enrichsdk/feature_compute/__init__.py
def extract(self, name, data, key=None):
    """
    Given data and a name, generate some attributes. The
    return value should be a list of dictionaries

    Args:
       name (str): name of the feature
       data (dict): A dictionary
       key (str): Dictionary key potentially if name is not key

    Returns:
       list: List of dictionaries. Each dict has a "key" and "value
    """
    raise Exception("Not implemented")

FeaturesetExtractorBase

Compute a featureset - collection of features. To be used in conjunction with the FeatureCompute(outer) and FeatureExtractor (inner). We define and use multiple extractors::

class CustomerFeaturesetExtractor(FeaturesetExtractorBase):
    '''
    Customer  timeseries featureset extractor
    '''
    def get_extractors(self):
        return {
            "simple": SimpleFeatureExtractor(),
        }

    def get_specs(self):
        specs = [
            {
                "keys": ['days'],
                "extractor": "simple"
            },
        ]

        return specs

    def one_record(self, data):

        allfeatures = super().one_record(data)

        return allfeatures

    def clean(self, df):
         df = df.fillna("")
         return df

clean(df)

Clean the collated dataframe/list/other.

Args: df (object): output of collate

Returns: object: A cleaned collated object

Source code in enrichsdk/feature_compute/__init__.py
def clean(self, df):
    """
    Clean the collated dataframe/list/other.

    Args:
       df (object): output of collate

    Returns:
       object: A cleaned collated object
    """
    return df

collate(features)

Combine a outputs of the extractors (each of which is a dictionary) into an object. It could be anything that the cleaner can handle.

Args: features (list): List of features extracted by one_record

Returns: object: Could be a combined dictionary/dataframe/other

Source code in enrichsdk/feature_compute/__init__.py
def collate(self, features):
    """
    Combine a outputs of the extractors (each of which is a dictionary)
    into an object. It could be anything that the cleaner can handle.

    Args:
       features (list): List of features extracted by one_record

    Returns:
       object: Could be a combined dictionary/dataframe/other

    """
    return pd.DataFrame(features)

document(name, df)

Document the dataframe generated. The default is to capture schema, size etc. Over-ride to extend this documentation.

Args: df (object): output of collate name (str): name of the featureset extractor specification

Source code in enrichsdk/feature_compute/__init__.py
def document(self, name, df):
    """
    Document the dataframe generated. The default is
    to capture schema, size etc. Over-ride to extend this
    documentation.

    Args:
       df (object): output of collate
       name (str): name of the featureset extractor specification

    """
    if not isinstance(df, pd.DataFrame):
        logger.error(
            "Unable to document. Unsupported data format. Override method in subclass"
        )
    else:
        return note(df, getattr(self, "name", self.__class__.__name__))

finalize(df, computed)

Take cleaned data and generate a final object such as a dataframe

Args: df (object): output of collate computed (dict): featureset extractor name -> collated/cleaned object

Returns: object: final data object

Source code in enrichsdk/feature_compute/__init__.py
def finalize(self, df, computed):
    """
    Take cleaned data and generate a final object such as a dataframe

    Args:
       df (object): output of collate
       computed (dict): featureset extractor name -> collated/cleaned object

    Returns:
       object: final data object

    """
    return df

get_extractors()

Returns a list of extractors. This is over-ridden in the subclass. Sample::

return {
     "simple": SimpleFeatureExtractor(),
}

Returns: dict: Dictionary of name -> extractor class instance

Source code in enrichsdk/feature_compute/__init__.py
def get_extractors(self):
    """
    Returns a list of extractors. This is over-ridden in
    the subclass. Sample::

        return {
             "simple": SimpleFeatureExtractor(),
        }

    Returns:
       dict: Dictionary of name -> extractor class instance

    """
    return {}

get_specs()

Returns a list of specifications. Each specification applies to one or more features. We specify a combination of keys in the input dictionary and a corresponding extractor. The keys could be a list or a dictionary.

For example::

[
    {
        "keys": ['age', 'sex'],
        "extractor": "simple",
    },
    {
        "keys": {
             'gender':  'sex',
             'old': 'age'
         },
        "extractor": "simple",
    }
]
Source code in enrichsdk/feature_compute/__init__.py
def get_specs(self):
    """
    Returns a list of specifications. Each specification applies to
    one or more features. We specify a combination of keys
    in the input dictionary and a corresponding extractor. The keys
    could be a list or a dictionary.

    For example::

        [
            {
                "keys": ['age', 'sex'],
                "extractor": "simple",
            },
            {
                "keys": {
                     'gender':  'sex',
                     'old': 'age'
                 },
                "extractor": "simple",
            }
        ]
    """
    return []

one_record(data)

Process one record at a time. Pass it through the extractors, collect the outputs and return

Args: data (dict): One record to process

Returns: list: A list of dictionaries with features from this record

Rough logic::

 get specs
 for each spec:
         find extractor
         find name and corresponding keys
         newfeatures = call extractor(name, keys) for one row in data
         collect new features

 collapse
 return one or more 'feature row(s)'
Source code in enrichsdk/feature_compute/__init__.py
def one_record(self, data):
    """
    Process one record at a time. Pass it through the
    extractors, collect the outputs and return

    Args:
        data (dict): One record to process

    Returns:
        list: A list of dictionaries with features from this record

    Rough logic::

         get specs
         for each spec:
                 find extractor
                 find name and corresponding keys
                 newfeatures = call extractor(name, keys) for one row in data
                 collect new features

         collapse
         return one or more 'feature row(s)'
    """
    allfeatures = []

    extractors = self.get_extractors()
    specs = self.get_specs()
    for spec in specs:

        extractor = spec.get("extractor", "default")
        extractor = extractors[extractor]

        if isinstance(spec["keys"], list):
            for key in spec["keys"]:
                if key not in data:
                    continue
                features = extractor.extract(key, data)
                allfeatures.extend(features)
        elif isinstance(spec["keys"], dict):
            for name, key in spec["keys"].items():
                features = extractor.extract(name, data, key)
                allfeatures.extend(features)

    return allfeatures

compute_features(objects, extractors, read_object=None)

Compute the features

Args: objects (list): List of objects to process. Could be names extractors (dict): Name to extractor mapping read_object (method): Turn each object into a dict

Returns: dict: name to dataframe mapping

Source code in enrichsdk/feature_compute/__init__.py
def compute_features(objects, extractors, read_object=None):
    """
    Compute the features

    Args:
          objects (list): List of objects to process. Could be names
          extractors (dict): Name to extractor mapping
          read_object (method): Turn each object into a dict

    Returns:
        dict: name to dataframe mapping

    """

    featuresets = defaultdict(list)
    counts = defaultdict(int)
    invalid_objects = []

    # First turn it into a list...
    if isinstance(objects, dict):
        objects = [objects]

    for obj in objects:
        try:
            counts["obj_total"] += 1
            try:
                if callable(read_object):
                    data = read_object(obj)
                else:
                    data = object
                if not isinstance(data, (dict, list)):
                    counts["object_read_invalid"] += 1
                    invalid_objects.append(obj)
                    continue
                if not isinstance(data, list):
                    data = [data]
            except Exception as e:
                invalid_objects.append(str(obj) + ": " + str(e))
                counts["objects_error"] += 1
                continue

            for index, d in enumerate(data):
                try:
                    counts["records_total"] += 1
                    if (not isinstance(d, dict)) or (len(d) == 0):
                        logger.error(
                            "Empty or invalid data",
                            extra={"data": str(obj) + "\n" + str(d)[:100]},
                        )
                        counts["records_error_invalid"] += 1
                        continue

                    # Compute various feature sets for each patient
                    for detail in extractors:
                        try:
                            extractor = detail["extractor"]
                            name = detail["name"]

                            # Process one record...
                            features = extractor.one_record(d)

                            # Skip if no features are being generated
                            if features is None:
                                continue

                            if isinstance(features, dict):
                                features = [features]
                            featuresets[name].extend(features)
                        except:
                            counts[f"extractor_{name}_exception"] += 1
                            if counts[f"extractor_{name}_exception"] == 1:
                                logger.exception(
                                    f"Unable to process:{name}",
                                    extra={"data": str(d)[:200]},
                                )
                except:
                    # Handle exceptions in individual records
                    counts["records_error_exception"] += 1
                    logger.exception(
                        f"Error in processing {index}",
                        extra={"data": str(obj) + "\n" + str(d)[:200]},
                    )

                counts["objects_valid"] += 1

            # Cleanup
            try:
                del data
            except:
                pass

        except:
            # Handle exceptions in individual records
            counts["objects_error_exception"] += 1
            logger.exception(
                f"Error in processing object",
                extra={"data": f"{obj}\n" + json.dumps(counts, indent=4)},
            )

    logger.debug(
        "Completed reading objects",
        extra={
            "data": json.dumps(counts, indent=4)
            + "\nInvalid Objects:\n"
            + "\n".join(invalid_objects)
        },
    )

    # Now collect all features of all patient
    counts = defaultdict(int)
    times = defaultdict(dict)
    computed = {}
    for detail in extractors:

        t0 = datetime.now()
        name = detail["name"]
        extractor = detail["extractor"]

        if (name not in featuresets) or (featuresets[name] is None):
            logger.warning(f"Features missing: {name}", extra={})
            continue

        data = featuresets[name]
        msg = f"Data: {type(data)} {len(data)}\n"

        # Collect all the features into a dataframe..
        df = extractor.collate(data)
        t1 = datetime.now()

        # Clean the dataframe generated.
        df = extractor.clean(df)
        t2 = datetime.now()

        computed[name] = df
        counts[name] = df.shape[0]

        msg += f"Shape: {df.shape}\n"
        msg += f"Collation time  {round((t1-t0).total_seconds(),1)}\n"
        msg += f"Cleaning time  {round((t2-t1).total_seconds(),1)}\n"

        logger.debug(f"Completed collating {name}",
                     extra={"data": msg}
        )

    msg = "records: " + json.dumps(counts, indent=4) + "\n"
    logger.debug(
        "Completed collating all",
        extra={"data": msg}
    )

    # Now we have individual dataframes. May be the extractor
    # wants to compute some more.
    final = {}
    counts = defaultdict(int)
    for detail in extractors:
        name = detail["name"]
        extractor = detail["extractor"]
        df = computed.get(name, None)
        df = extractor.finalize(df, computed)
        if df is None:
            logger.error(f"{name}: Invalid result", extra={})
            continue
        final[name] = df
        counts[name] = df.shape[0]

    logger.debug(
        "Completed finalization",
        extra={"data": "records: " + json.dumps(counts, indent=4)},
    )

    # Now document the outputs generated...
    for name, df in final.items():
        logger.debug(
            f"Featureset: {name}_features", extra={"data": extractor.document(name, df)}
        )

    return final

note(df, title)

Quick summary of a dataframe including shape, column, sample etc.

Args: df (dataframe): Input dataframe title (str): Title

Returns: str: A formatted text to be used for logging

Source code in enrichsdk/feature_compute/__init__.py
def note(df, title):
    """
     Quick summary of a dataframe including shape, column, sample etc.

     Args:
        df (dataframe): Input dataframe
        title (str): Title

    Returns:
        str: A formatted text to be used for logging

    """
    msg = title + "\n"
    msg += "--------" + "\n"
    msg += "Timestamp: " + str(datetime.now()) + "\n"
    msg += "\nShape: " + str(df.shape) + "\n"
    msg += "\nColumns: " + ", ".join(df.columns) + "\n"
    if len(df) > 0:
        msg += "\nSample:" + "\n"
        msg += df.sample(min(2, len(df))).T.to_string() + "\n" + "\n"
    msg += "\nDtypes" + "\n"
    msg += df.dtypes.to_string() + "\n"
    msg += "------" + "\n"
    return msg

Feature Store

download(backend, service=None, featuregroup_id=None, run_id=None, data=None, debug=False)

Post featurestore to server

Args: backend (object): Backend class enrichsdk.api.Backend service (object): Dictionary with name, path featuregroup_id (int): Id of the featuregroup to download run_id (int): Id of the run to download data (dict): Dictionary to be posted (if filename not specified)

Returns dict: Response dictionary from the server

Source code in enrichsdk/featurestore/__init__.py
def download(
    backend, service=None, featuregroup_id=None, run_id=None, data=None, debug=False
):
    """
    Post featurestore to server

    Args:
      backend (object): Backend class enrichsdk.api.Backend
      service (object): Dictionary with name, path
      featuregroup_id (int): Id of the featuregroup to download
      run_id (int): Id of the run to download
      data (dict): Dictionary to be posted (if filename not specified)

    Returns
      dict: Response dictionary from the server

    """

    if (
        (service is None)
        or (not isinstance(service, dict))
        or ("name" not in service)
        or (service["name"] not in ["run", "spec"])
    ):
        raise Exception("Unsupported service")

    if featuregroup_id is not None:
        suffix = "/download/{}/?format=json".format(featuregroup_id)
    elif run_id is not None:
        suffix = "/download/{}/?format=json".format(run_id)
    else:
        raise Exception("One of featuregroup_id or run_id should be specified")

    url = service["path"] + suffix

    if not url.startswith("http"):
        url = backend.get_api_url(url, use_prefix=False)

    if debug:
        print("Backend URL: {}".format(url))

    response = backend.call(url, method="get")

    return response

generate(backend, service=None, debug=False)

Generate sample specification files

Source code in enrichsdk/featurestore/__init__.py
def generate(backend, service=None, debug=False):
    """
    Generate sample specification files
    """

    if (
        (service is None)
        or (not isinstance(service, dict))
        or ("name" not in service)
        or (service["name"] not in ["run", "spec"])
    ):
        raise Exception("Unsupported service")

    name = service["name"]
    if name == "run":
        obj = FeatureGroupRun()
    elif name == "spec":
        obj = FeatureGroupSpec()
    else:
        raise Exception("Unknown service name: {}".format(name))

    return obj.export(dummy=True)

post(backend, service=None, filename=None, data=None, debug=False)

Post featurestore to server

Args: backend (object): Backend class enrichsdk.api.Backend service (object): Dictionary with name, path filename (str): Path to the input file to the posted data (dict): Dictionary to be posted (if filename not specified)

Returns dict: Response dictionary from the server

Source code in enrichsdk/featurestore/__init__.py
def post(backend, service=None, filename=None, data=None, debug=False):
    """
    Post featurestore to server

    Args:
      backend (object): Backend class enrichsdk.api.Backend
      service (object): Dictionary with name, path
      filename (str): Path to the input file to the posted
      data (dict): Dictionary to be posted (if filename not specified)

    Returns
      dict: Response dictionary from the server

    """

    if (
        (service is None)
        or (not isinstance(service, dict))
        or ("name" not in service)
        or (service["name"] not in ["run", "spec"])
    ):
        raise Exception("Unsupported service")

    url = service["path"] + "/add/?format=json"
    if not url.startswith("http"):
        url = backend.get_api_url(url, use_prefix=False)

    if debug:
        print("Backend URL: {}".format(url))

    if filename is None and data is None:
        raise Exception("Post requires either filename or data")

    try:
        if data is None:
            data = json.load(open(filename))
    except:
        raise Exception("Require a valid json file to post to server")

    name = service["name"]
    if name == "run":
        obj = FeatureGroupRun()
    elif name == "spec":
        obj = FeatureGroupSpec()
    else:
        raise Exception("Unknown service name: {}".format(name))

    # Add attributes
    for k, v in data.items():
        obj.add(k, v)

    # Check if the schema is valid
    obj.validate()

    data = obj.export()
    if debug:
        print("Data")
        print(json.dumps(data, indent=4))

    response = backend.call(url, method="post", data=data)
    return response

search(backend, service=None, debug=False, params={})

Search featurestore for specific featuregroups

Args: backend (object): Backend class enrichsdk.api.Backend service (object): Dictionary with name, path args (dict): Search criteria as key, value paits debug (bool): Debug run or not

Returns dict: Response dictionary from the server

Source code in enrichsdk/featurestore/__init__.py
def search(backend, service=None, debug=False, params={}):
    """
    Search featurestore for specific featuregroups

    Args:
      backend (object): Backend class enrichsdk.api.Backend
      service (object): Dictionary with name, path
      args (dict): Search criteria as key, value paits
      debug (bool): Debug run or not

    Returns
      dict: Response dictionary from the server

    """

    if (
        (service is None)
        or (not isinstance(service, dict))
        or ("name" not in service)
        or (service["name"] not in ["run", "spec"])
    ):
        raise Exception("Unsupported service")

    suffix = "/search/?format=json"
    url = service["path"] + suffix

    if not url.startswith("http"):
        url = backend.get_api_url(url, use_prefix=False)

    if debug:
        print("Backend URL: {}".format(url))

    response = backend.call(url, method="get", params=params)

    return response

Data Quality

Module to set and implement expectations on dataframes. Note that this is development mode and meant to be a preview.

As systems are getting automated, and processing more data everyday, it is hard to keep track of correctness of the entire system. We use expectations to make sure that decision modules such as ML/statistics code is being correctly fed.

An expectation is a set of structural rules that datasets should satisfy. It could be superficial such as names or order of columns. It could go deeper and specify the statistical attributes of the input.

This module help implement a specification. An example is shown below::

    expectations = [
        {
            'expectation': 'table_columns_exist',
            'params': {
                'columns': ['Make', 'YEAR', 'Model', 'hello']
            }
        },
        {
            'expectation': 'table_columns_exist_with_position',
            'params': {
                'columns': {
                    'Make': 1
                }
            }
        }
    ]

Each expectation has a name, and arbitrary parameters. The first expectation shown above will for example, check if the specified columns exist in the dataframe being evaluated. The result will be as shown below::

    {
        "expectation": "table_columns_exist_with_position",
        "passed": true,
        "description": "Check whether columns exist (in particular order)",
        "version": "v1",
        "timestamp": "2020-01-04T11:43:03+05:30",
        "frame": "v2scoring",
        "meta": {
            "node": "data.acmeinc.com",
            "pipeline": "AcmeLending",
            "transform": "ScoringModule",
            "runid": "scoring-20200104-114137",
            "start_time": "2020-01-04T11:41:37+05:30"
        }
    }

This is stored as part of the state and dumped in the metadata for each run.

The expectations are used as follows::

    expectationsfile=os.path.join(thisdir, "expectations.py")
    checker = TransformExpectation(self,
                                   mode='validation',
                                   filename=expectationsfile)

    result = checker.validate(df,selector=selector)
    state.add_expectations(self, name, result)

Expectations ^^^^^^^^^^^^

Base classes for expectation and manager, and builtin expectations.

ExpectationBase(*args, **kwargs)

Bases: object

Base class for expectations

Initialize the base class for expectations

Source code in enrichsdk/quality/base.py
def __init__(self, *args, **kwargs):
    """
    Initialize the base class for expectations
    """
    self.name = "unknown_table_check"
    self.description = "Unknown description"

match(config)

Check if the expectations matches

Args: config: Expectation specification

Returns: True if this class can handle expectation

Source code in enrichsdk/quality/base.py
def match(self, config):
    """
    Check if the expectations matches

    Args:
       config: Expectation specification

    Returns:
       True if this class can handle expectation

    """
    if not isinstance(config, dict):
        return False

    return config.get("expectation", None) == self.name

validate_args(config)

Validate arguments passed to this expectation

Check if the expectation is correctly specified beyond the name

Args: config: Expectation specification

Returns: True if this class can handle this specification

Source code in enrichsdk/quality/base.py
def validate_args(self, config):
    """
    Validate arguments passed to this expectation

    Check if the expectation is correctly specified beyond the
    name

    Args:
       config: Expectation specification

    Returns:
       True if this class can handle this specification

    """
    pass

ExpectationResultBase(*args, **kwargs)

Bases: object

Class to return one or more results from an expectation validation step.

Source code in enrichsdk/quality/base.py
def __init__(self, *args, **kwargs):
    self.results = []

add_result(expectation, description, passed, extra={})

Add an entry to the validation result object

Args: expectation: Name of the expectation description: One line summary of the check passed: True if the check has passed extra: Dictionary with extra context including reason

Source code in enrichsdk/quality/base.py
def add_result(self, expectation, description, passed, extra={}):
    """
    Add an entry to the validation result object

    Args:
       expectation: Name of the expectation
       description: One line summary of the check
       passed: True if the check has passed
       extra: Dictionary with extra context including reason

    """
    if not isinstance(passed, bool):
        raise IncorrectImplementationExpectation(
            "Expectation requires 'passed' to be boolean"
        )

    if not isinstance(expectation, str):
        raise IncorrectImplementationExpectation(
            "Expectation require name to be string"
        )

    if not isinstance(description, str):
        raise IncorrectImplementationExpectation(
            "Expectation require description to be string"
        )

    if not isinstance(extra, dict):
        raise IncorrectImplementationExpectation(
            "Expectation require 'extra' to be dict"
        )

    entry = {
        "expectation": expectation,
        "description": description,
        "passed": passed,
        "extra": extra,
    }

    self.results.append(entry)

TableColumnsExistExpectations(*args, **kwargs)

Bases: ExpectationBase

Check whether table has required columns

Configuration has to specify a list of columns::

   {
     'expectation': 'table_columns_exist',
     'params': {
         'columns': ['alpha', 'beta']
         }
     }
   }
Source code in enrichsdk/quality/expectations.py
def __init__(self, *args, **kwargs):
    super().__init__(*args, **kwargs)
    self.name = "table_columns_exist"
    self.description = "Check whether columns exist (in any order)"

generate(df)

Not implemented

Source code in enrichsdk/quality/expectations.py
def generate(self, df):
    """
    Not implemented
    """
    return {
        "expectation": self.name,
        "description": self.description,
        "params": {"columns": list(df.columns)},
    }

validate(df, config)

Check if the specified columns are present in the dataframe

Source code in enrichsdk/quality/expectations.py
def validate(self, df, config):
    """
    Check if the specified columns are present in the dataframe
    """

    result = ExpectationResultBase()
    update = partial(
        result.add_result, expectation=self.name, description=self.description
    )

    if not isinstance(df, pd.DataFrame):
        update(passed=False, extra={"reason": "Not a dataframe"})
        return result

    columns = config["params"]["columns"]
    if isinstance(columns, str):
        columns = [columns]

    missing = [c for c in columns if c not in df.columns]
    if len(missing) > 0:
        update(
            passed=False,
            extra={"reason": "Missing columns: {}".format(",".join(missing))},
        )
        return result

    update(passed=True)
    return result

TableColumnsPositionExpectations(*args, **kwargs)

Bases: ExpectationBase

Check whether table has the right columns in right positions.

Configuration has to specify the columns and their corresponding positions in the test dataframe::

   {
     'expectation': 'table_columns_exist_with_position',
     'params': {
         'columns': {
            'alpha': 0,
            'beta': 2
         }
     }
   }
Source code in enrichsdk/quality/expectations.py
def __init__(self, *args, **kwargs):
    super().__init__(*args, **kwargs)
    self.name = "table_columns_exist_with_position"
    self.description = "Check whether columns exist (in particular order)"

generate(df)

Not implementated yet.

Source code in enrichsdk/quality/expectations.py
def generate(self, df):
    """
    Not implementated yet.
    """
    columns = df.columns
    columns = {col: position for col, position in enumerate(columns)}
    return {
        "expectation": self.name,
        "description": self.description,
        "params": {"columns": columns},
    }

validate(df, config)

Validate the names and positions of columns

Source code in enrichsdk/quality/expectations.py
def validate(self, df, config):
    """
    Validate the names and positions of columns
    """

    result = ExpectationResultBase()
    update = partial(
        result.add_result, expectation=self.name, description=self.description
    )

    if not isinstance(df, pd.DataFrame):
        update(passed=False, extra={"reason": "Not a dataframe"})
        return result

    columns = config["params"]["columns"]
    if not isinstance(columns, dict):
        raise InvalidConfigurationExpectation("Columns should be a dict")

    invalid = []
    existing = list(df.columns)
    for col, position in columns.items():
        if (
            (not isinstance(position, int))
            or (position >= len(existing))
            or (existing[position] != col)
        ):
            invalid.append(col)

    if len(invalid) > 0:
        update(
            passed=False,
            extra={
                "reason": "Missing/Wrong position columns: {}".format(
                    ",".join(invalid)
                )
            },
        )
        return result

    update(passed=True)
    return result

Transforms ^^^^^^^^^^

This class bridges expectations library and the Enrich pipelines.

TransformExpectation(transform, mode=None, filename=None, expectations=None)

Bases: object

Class to provide a bridging interface between light weight expectations implemented here and the pipeline

Initialize the class

Args: transform: Transform using this expectation mode: Mode of operation (validation|generation) filename: Path to expectations file expectations: Explicitly provide expectations

Returns: Instantiated class

Source code in enrichsdk/quality/transforms.py
def __init__(self, transform, mode=None, filename=None, expectations=None):
    """
    Initialize the class

    Args:
       transform: Transform using this expectation
       mode: Mode of operation (validation|generation)
       filename: Path to expectations file
       expectations: Explicitly provide expectations

    Returns:
       Instantiated class

    """
    self.transform = transform
    assert mode in ["validation", "generation", None]

    if (filename is None) and (expectations is None):
        raise InvalidExpectation("Missing expectations and filename")
    elif (filename is not None) and (expectations is not None):
        raise InvalidExpectation("Cant specify both expectations and filename")

    if filename is not None:
        filename = transform.config.get_file(filename)

    self.mode = mode
    if self.mode == "validation":
        if filename is not None:
            if not os.path.exists(filename):
                raise InvalidExpectation("Missing expectations file")
        elif expectations is not None:
            if not isinstance(expectations, list):
                raise InvalidExpectation("Expectations should be a list")

    if expectations is not None:
        self.expectations = expectations
    else:
        self.expectations = []

    try:
        if (mode == "validation") and (filename is not None):
            self.expectations = self.load_expectations_file(filename)
    except:
        raise InvalidExpectation("Invalid expectation file")

generate(df)

Render a specified template using the context

Args: df: Dataframe to profiled

Returns: Rendered html template that can be embedded

Source code in enrichsdk/quality/transforms.py
def generate(self, df):

    """
    Render a specified template using the context

    Args:
       df: Dataframe to profiled

    Returns:
        Rendered html template that can be embedded

    """
    if self.mode != "generate":
        raise InvalidExpectation("Mode is not generate")

    pass

load_expectations_file(filename)

Load expectations whether specified as json, pickle, or py file into a instance variable.

Args: filename: Name of the expectations file

Returns: None:

Source code in enrichsdk/quality/transforms.py
def load_expectations_file(self, filename):
    """Load expectations whether specified as json, pickle, or py
    file into a instance variable.

    Args:
        filename: Name of the expectations file

    Returns:
        None:

    """
    self.filename = filename
    if filename.endswith(".json"):
        return json.load(open(filename))

    if filename.endswith(".pkl"):
        return pickle.load(open(filename, "rb"))

    if filename.endswith(".py"):
        try:

            modname = os.path.basename(filename)
            dirname = os.path.dirname(filename)

            if modname.endswith(".py"):
                modname = modname.replace(".py", "")

            file_, path_, desc_ = imp.find_module(modname, [dirname])
            package = imp.load_module(modname, file_, path_, desc_)
            return package.expectations
        except:
            logger.exception(
                "Skipping. Expectations file could not be read",
                extra=self.get_extra(
                    {
                        "transform": self.transform,
                        "data": "Filename: {}".format(filename),
                    }
                ),
            )
            raise InvalidExpectation(
                "Invalid specifications file: {}".format(modname)
            )

    raise UnsupportedExpectation("Unsupported expectations specification file")

validate(df, selector=None)

Run the loaded expectations

Args: df: Dataframe to evaluated selector: Function to select the

Returns: result: A list of dictionaries, each has evaluation result

Source code in enrichsdk/quality/transforms.py
def validate(self, df, selector=None):
    """
    Run the loaded expectations

    Args:
       df: Dataframe to evaluated
       selector: Function to select the

    Returns:
       result: A list of dictionaries, each has evaluation result
    """

    if self.mode != "validation":
        raise InvalidExpectation("Dataframe validation not enabled")

    mgr = ExpectationManagerBase()
    mgr.initialize()

    expectations = self.expectations
    if callable(selector):
        expectations = selector(expectations)

    result = mgr.validate(df, expectations)

    try:
        json.dumps(result)
    except:
        logger.exception(
            "Error while processing expectations",
            extra={"transform": self.transform.name},
        )
        return []

    return result

Exceptions ^^^^^^^^^^

Exceptions used by the quality module.

IncorrectImplementationExpectation

Bases: Exception

Expectation implementation class didnt return right result

InvalidConfigurationExpectation

Bases: Exception

Expectation implementation class didnt return right result

InvalidExpectation

Bases: Exception

Invalid specification of expectations

NoExpectations

Bases: Exception

Empty list of expectations to evaluate

NotInitialized

Bases: Exception

Initialize to discover all available expectation implementations. It has not happened.

UnsupportedExpectation

Bases: Exception

Expectation name or content not implemented/supported.

Notebook

This is intended to be used in the notebook context. It provides a number of capabilities including:

  1. Obfuscated credentials
  2. Resource management
  3. Searching of notebooks

Services to allow enrichsdk to be used in notebooks. It provides:

  1. Security - access to obfuscated credentials
  2. Resource - limit resource usage
  3. Indexing - search notebooks
  4. Metadata - Generate metadata to be included in the output files

Notebook(data_root=None)

Bases: object

This class allows one to read and search in notebooks

Source code in enrichsdk/notebook/__init__.py
def __init__(self, data_root=None):
    if data_root is not None:
        if not os.path.exists(data_root):
            raise Exception("Invalid or missing path or file: {}".format(data_root))
        else:
            self.data_root = data_root
            self.output_dir = os.path.join(data_root, "output")
    else:
        if "JUPYTERLAB_HOME" not in os.environ:
            warnings.warn(
                "Notebook using JUPYTERLAB_HOME when being executed outside jupyter environment. Either use data_root, define the variable or dont use this class"
            )
            raise Exception("JUPYTERLAB_HOME not defined")

        self.data_root = os.path.join(os.environ["JUPYTERLAB_HOME"], "data")
        self.output_dir = os.path.join(
            os.environ["JUPYTERLAB_HOME"], "data", "output"
        )

get_file(path, create_dir=False)

Parameters:

Name Type Description Default
path required
create_dir False
Example

%(data_root)/acme/Projects/commands This is resolved into /home/ubuntu/enrich/data/notebooks/acme/Projects/commands

Returns:

Type Description
Full path from abstract specification
Source code in enrichsdk/notebook/__init__.py
def get_file(self, path, create_dir=False):
    """
    Parameters
    ----------
    path: Path specification
    create_dir: Should the parent director for the file be created

    Example
    ----------
    %(data_root)/acme/Projects/commands
    This is resolved into `/home/ubuntu/enrich/data/notebooks/acme/Projects/commands`

    Returns
    -------
    Full path from abstract specification
    """
    try:
        path = path.replace(
            "%(", "%%("
        )  # This will make the strftime handling safe
        path = datetime.now().strftime(path)
        path = path % {
            "data_root": self.data_root,
            "dt": datetime.now().date().isoformat(),
            "datetime": datetime.now().replace(microsecond=0).isoformat(),
        }

        if create_dir:
            try:
                os.makedirs(os.path.dirname(path))
            except:
                pass

        return path
    except Exception as e:
        print("Exception in get_file() - {}".format(e))

get_metadata(filename=None, file_type=None)

Get reusable metadata dict with some standard fields.

Parameters:

Name Type Description Default
filename None

Returns:

Name Type Description
metadata Dictionary with a number of fields
Source code in enrichsdk/notebook/__init__.py
def get_metadata(self, filename=None, file_type=None):
    """
    Get reusable metadata dict with some standard fields.

    Parameters
    ---------
    filename: Name of the file covered by this metadata

    Returns
    -------
    metadata: Dictionary with a number of fields
    """
    metadata = OrderedDict()

    metadata["schema"] = "standalone:notebook"
    metadata["version"] = "1.0"
    metadata["timestamp"] = datetime.now().replace(microsecond=0).isoformat()
    metadata["platform"] = {
        "node": platform.node(),
        "os": platform.system(),
        "release": platform.release(),
        "processor": platform.processor(),
        "python": platform.python_version(),
        "distribution": distro.linux_distribution(),
    }
    metadata["filename"] = filename
    metadata["filetype"] = mimetypes.guess_type(filename)[0]
    metadata["filesize"] = os.stat(filename).st_size
    if file_type in ["csv", "tsv"]:
        if file_type == "csv":
            df = pd.read_csv(filename)
        elif file_type == "tsv":
            df = pd.read_csv(filename, sep="\t")
        metadata["rows"] = df.shape[0]
        metadata["columns"] = df.shape[1]
        metadata["headers"] = list(df.columns)

    return metadata

read_notebook(notebook_path, as_version=nbformat.NO_CONVERT)

Parameters:

Name Type Description Default
notebook_path required
as_version nbformat.NO_CONVERT

Returns:

Name Type Description
NotebookNode Dictionary representation of file
Source code in enrichsdk/notebook/__init__.py
def read_notebook(self, notebook_path, as_version=nbformat.NO_CONVERT):
    """
    Parameters
    ----------
    notebook_path: Absolute path of ipynb file
    as_version: Version of Notebook. Default = nbformat.NO_CONVERT

    Returns
    -------
    NotebookNode: Dictionary representation of file
    """
    try:
        notebook = nbformat.read(notebook_path, as_version=as_version)
        return notebook
    except Exception as e:
        print("Exception reading Notebook - {}".format(e))

save(file_name, metadata_path=None)

Parameters:

Name Type Description Default
file_name required

Returns:

Type Description
None. Saves the data file specified and metadata about the file into Enrich's data dir
Source code in enrichsdk/notebook/__init__.py
def save(self, file_name, metadata_path=None):
    """
    Parameters
    ----------
    file_name: Relative path of file including filename with extension

    Returns
    -------
    None. Saves the data file specified and metadata about the file into Enrich's data dir
    """
    try:
        metadata_file = None
        abs_file_name = self.get_file(file_name)
        file_type = abs_file_name.split(".")[-1]

        # Validate that file is "csv" or "tsv". More file types can be added to list
        if os.path.exists(abs_file_name) and file_type in accepted_types():
            metadata_info = self.get_metadata(abs_file_name, file_type)
            if metadata_path is None:
                metadata_file = os.path.join(self.output_dir, "metadata.json")
            elif isinstance(metadata_path, str):
                resolved_metadata_path = self.get_file(metadata_path)
                create_dir("/".join(resolved_metadata_path.split("/")[:-1]))
                metadata_file = resolved_metadata_path
            elif callable(metadata_path):
                data_dump_dir = metadata_path()

            with open(metadata_file, "w") as f:
                json.dump(metadata_info, f, indent=4)
    except Exception as e:
        print("Exception in save() - {}".format(e))

search_notebooks(user_notebook_dir, keyword)

Parameters:

Name Type Description Default
notebook_path required
keyword required

Returns:

Type Description
List of path strings to Notebooks that satisfy the search
Source code in enrichsdk/notebook/__init__.py
def search_notebooks(self, user_notebook_dir, keyword):
    """
    Parameters
    ----------
    notebook_path: Absolute path of ipynb file
    keyword: String to search for within notebook

    Returns
    -------
    List of path strings to Notebooks that satisfy the search
    """
    try:
        notebooks = []
        ipynb_files = glob.glob(
            os.path.join(notebook_dir, "/**/*.ipynb"), recursive=True
        )
        for ipynb in ipynb_files:
            present = self.search_within_notebook(ipynb, keyword)
            if present:
                notebooks.append(ipynb)
        return notebooks

    except Exception as e:
        print("Exception searching in notebook directory - {}".format(e))

search_within_notebook(notebook_path, keyword)

Parameters:

Name Type Description Default
notebook_path required
keyword required

Returns:

Type Description
Boolean, true if keyword is found in ipynb file; False if not.
Source code in enrichsdk/notebook/__init__.py
def search_within_notebook(self, notebook_path, keyword):
    """
    Parameters
    ----------
    notebook_path: Absolute path of ipynb file
    keyword: String to search for within notebook

    Returns
    -------
    Boolean, true if keyword is found in ipynb file; False if not.
    """
    try:

        notebook = self.read_notebook(notebook_path)
        if keyword in notebook:
            return True
        return False
    except Exception as e:
        print("Exception searching in notebook - {}".format(e))

set_resource_limits(params={})

Set resource limits for the notebook. This applies only to this notebook.

Parameters:

Name Type Description Default
params

of available memory to be used.

{}
Source code in enrichsdk/notebook/__init__.py
def set_resource_limits(self, params={}):
    """
    Set resource limits for the notebook. This applies
    only to this notebook.

    Parameters
    ----------
    params: Only one member (memory) is accessed. It is fraction
            of available memory to be used.

    """

    # Default fraction of max memory to be used
    fraction = params.get("memory", 0.8)

    # Get available virtual memory
    vmem = psutil.virtual_memory()

    max_vmem = int(fraction * vmem.available)

    # use AS as alternative to VMEM if the attribute isn't defined.
    # http://stackoverflow.com/a/30269998/5731870
    if hasattr(resource, "RLIMIT_VMEM"):
        resource.setrlimit(resource.RLIMIT_VMEM, (max_vmem, max_vmem))
    elif hasattr(resource, "RLIMIT_AS"):
        resource.setrlimit(resource.RLIMIT_AS, (max_vmem, max_vmem))

    soft, hard = resource.getrlimit(resource.RLIMIT_AS)
    soft = round(soft / (1.0 * 1024 * 1024))
    hard = round(hard / (1.0 * 1024 * 1024))

    return "Memory: Soft ({}MB) Hard ({}MB)".format(soft, hard)