Skip to content

Transforms

enrichsdk.contrib.transforms

Standard transforms that can be directly included in any pipeline.

FileOperations(*args, **kwargs)

Bases: FileOperationsBase

FileOperations performs a number of operations on files generated by pipelines.

The transform takes a list of actions. The only action type supported for now is copy. Each copy task requires source, destination, and instruction on what to do with existing file.

Example::

{
        "transform": "FileOperations",
        "enable": true,
        "dependencies": {
           ....
        },
        "args": {
            "actions": [
                  {
                    "action": "copy",
                    "src": "%(output)s/%(runid)s/profile.sqlite",
                    "dst": "%(data_root)s/shared/campaigns/profile_daily/profile.sqlite",
                    "backupsuffix": ".backup"
                  }
             ]
        }
}
Source code in enrichsdk/contrib/transforms/fileops/__init__.py
def __init__(self, *args, **kwargs):
    super().__init__(*args, **kwargs)
    self.name = "FileOperations"
    self.description = "File operations such as move/copy etc."

    test_root = os.environ["ENRICH_TEST"]
    self.testdata = {
        "data_root": os.path.join(test_root, self.name),
        "outputdir": os.path.join(test_root, self.name, "output"),
        "inputdir": test_root,
        "statedir": os.path.join(test_root, self.name, "state"),
        "global": {"args": {"rundate": "2020-01-10"}},
        "conf": {
            "args": {
                "actions": [
                    {
                        "action": "copy",
                        "src": "%(output)s/%(runid)s/outputs/cars.csv",
                        "dst": "%(data_root)s/shared/%(rundate)s/hello.csv",
                    }
                ]
            }
        },
        "data": {},
    }

instantiable() classmethod

Make this class instantiable

Source code in enrichsdk/contrib/transforms/fileops/__init__.py
@classmethod
def instantiable(cls):
    """
    Make this class instantiable
    """
    return True

JSONSink(*args, **kwargs)

Bases: Sink

Store a 'dict' frame that is present in the state into a file.

Params are meant to be passed as parameter to update_frame.

Example configuration::

 "args": {
     "sink": {
         'test': {
             'frametype': 'dict',
             'filename': '%(output)s/%(runid)s/mytestoutput.json',
             'params': {}
         }
     }
 }
Source code in enrichsdk/contrib/transforms/jsonsink/__init__.py
def __init__(self, *args, **kwargs):
    super().__init__(*args, **kwargs)
    self.name = "JSONSink"

    self.testdata = {
        "conf": {
            "args": {
                "sink": {
                    "frame1": {
                        "frametype": "dict",
                        "filename": "%(output)s/%(runid)s/mytestoutput.json",
                        "params": {},
                    }
                }
            }
        },
        "data": {
            "frame1": {
                "filename": "outputjson.json",
                "frametype": "dict",
                "transform": "TestJSON",
                "params": {},
            }
        },
    }

preload_clean_args(args)

Clean when the spec is loaded...

Source code in enrichsdk/contrib/transforms/jsonsink/__init__.py
def preload_clean_args(self, args):
    """
    Clean when the spec is loaded...
    """

    if "sink" not in args:
        args = {"sink": args}

    args = super().preload_clean_args(args)

    assert "sink" in args
    assert isinstance(args["sink"], dict)
    assert len(args["sink"]) > 0

    sink = args["sink"]
    for name, detail in sink.items():

        if ("frametype" not in detail) or (detail["frametype"] != "dict"):
            logger.error(
                "Invalid configuration. Only JSON/Dictionaries are supported by this sink transform",
                extra=self.config.get_extra({"transform": self.name}),
            )
            raise Exception("Invalid configuration")

        if (
            ("filename" not in detail)
            or (not isinstance(detail["filename"], str))
            or ("params" not in detail)
            or (not isinstance(detail["params"], dict))
        ):
            logger.error(
                "Invalid args. Filename (string) and params (dict) are required",
                extra=self.config.get_extra({"transform": self.name}),
            )
            raise Exception("Invalid configuration")

        filename = detail["filename"]
        if not filename.lower().endswith(".json"):
            logger.error(
                "Input file must a .json file",
                extra=self.config.get_extra({"transform": self.name}),
            )
            raise Exception("Invalid configuration")

        detail["root"] = self.config.enrich_data_dir

        tags = detail.get("tags", [])
        if isinstance(tags, str):
            tags = [tags]
        detail["tags"] = tags

        # => Materialize the path...
        detail["filename"] = self.config.get_file(
            detail["filename"], extra={"frame_name": name}
        )

    return args

process(state)

Store the dictionary 'frames' in the state in files.

Source code in enrichsdk/contrib/transforms/jsonsink/__init__.py
def process(self, state):
    """
    Store the dictionary 'frames' in the state in files.
    """
    logger.debug(
        "{} - process".format(self.name),
        extra=self.config.get_extra({"transform": self.name}),
    )

    available_frames = state.get_frame_list()

    # => First construct input for the pandasframe
    extra = {}
    args_input = {}
    write_input = {}
    framecls = self.config.get_dataframe("dict")

    sink = self.args["sink"]
    for pattern in sink:
        # The pattern could be precise dataframe name or could be
        # regular expression.
        regex = re.compile("^{}$".format(pattern))
        frames = [
            m.group(0) for f in available_frames for m in [regex.search(f)] if m
        ]
        if len(frames) == 0:
            logger.warning("Pattern has not matched any frames: {}".format(pattern))
            continue

        for f in frames:

            # Get the details of this frame
            detail = state.get_frame(f)

            # Handle frametype
            frametype = detail["frametype"]
            if frametype != "dict":
                logger.warning(
                    "Pattern has matched non-dict frame: {}".format(f),
                    extra=self.config.get_extra({"transform": self.name}),
                )
                continue

            # Now construct the output file name
            filename = sink[pattern]["filename"]
            filename = self.config.get_file(
                filename, create_dir=True, extra={"frame_name": f}
            )

            extra[f] = {
                "notes": self.collapse_notes(detail),
                "descriptions": self.collapse_descriptions(detail),
            }

            params = sink[pattern].get("params", {})
            write_input[f] = {
                "frametype": detail["frametype"],
                "filename": filename,
                "pattern": pattern,
                "df": detail["df"],
                "params": params,
            }

            args_input[f] = copy.copy(sink[pattern])
            args_input[f]["filename"] = filename

    framecls.write(args_input, write_input)

    for name in write_input:

        detail = write_input[name]

        # => Insert columns and tags
        pattern = detail["pattern"]
        detail["params"]["tags"] = sink[pattern]["tags"]

        # Incorporate columns, notes and description
        detail["params"].update(extra[name])

        detail["params"] = [
            detail["params"],
            {
                "type": "lineage",
                "transform": self.name,
                "dependencies": [
                    {"type": "dataframe", "nature": "input", "objects": [name]},
                    {
                        "type": "file",
                        "nature": "output",
                        "objects": [detail["params"]["filename"]],
                    },
                ],
            },
        ]

        # Insert additional detail
        detail["transform"] = self.name
        detail["history"] = [{"transform": self.name, "log": "Wrote output"}]

        state.update_frame(name, detail)

    logger.debug(
        "Finished writing data",
        extra=self.config.get_extra({"transform": self.name}),
    )

    ###########################################
    # => Return
    ###########################################
    return state

validate_args(what, state)

An extra check on the arguments to make sure it is consistent with the specification

Source code in enrichsdk/contrib/transforms/jsonsink/__init__.py
def validate_args(self, what, state):
    """
    An extra check on the arguments to make sure
    it is consistent with the specification
    """
    args = self.args
    assert "sink" in args
    assert isinstance(args["sink"], dict)
    assert len(args["sink"]) > 0

    sink = args["sink"]
    for name, detail in sink.items():
        assert ("frametype" in detail) and (detail["frametype"] == "dict")
        assert "filename" in detail
        assert "params" in detail

validate_results(what, state)

Check to make sure that the execution completed correctly

Source code in enrichsdk/contrib/transforms/jsonsink/__init__.py
def validate_results(self, what, state):
    """
    Check to make sure that the execution completed correctly
    """
    pass

JSONSource(*args, **kwargs)

Bases: Source

Load a file into a 'dict' frame in the state.

Params are meant to be passed as parameter to update_frame.

Example configuration::

 ...

 "args": {
     "source": {
         'hello': {
             'frametype': 'dict',
             'filename': '%(data_root)s/shared/hello.json',
             'params': {}
         }
     }
 }
Source code in enrichsdk/contrib/transforms/jsonsource/__init__.py
def __init__(self, *args, **kwargs):
    super().__init__(*args, **kwargs)
    self.name = "JSONSource"

    self.testdata = {
        "conf": {
            "args": {
                "source": {
                    "hello": {
                        "filename": "%(data_root)s/shared/hello.json",
                        "frametype": "dict",
                        "params": {},
                    }
                }
            }
        },
        "data": {},
    }

preload_clean_args(args)

Check if the args are consistent with the specification.

Source code in enrichsdk/contrib/transforms/jsonsource/__init__.py
def preload_clean_args(self, args):
    """
    Check if the args are consistent with the
    specification.
    """

    # Backward compatabilty
    if "source" not in args:
        args = {"source": args}

    args = super().preload_clean_args(args)

    # Sanity check...
    assert isinstance(args, dict)
    assert "source" in args
    assert isinstance(args["source"], dict)

    source = args["source"]
    for name, detail in source.items():

        if ("frametype" not in detail) or (detail["frametype"] != "dict"):
            logger.error(
                "Invalid configuration. Only JSON source supported by this source transform",
                extra=self.config.get_extra({"transform": self.name}),
            )
            raise Exception("Invalid configuration")

        if (
            ("filename" not in detail)
            or (not isinstance(detail["filename"], str))
            or ("params" not in detail)
            or (not isinstance(detail["params"], dict))
        ):
            logger.error(
                "Invalid args. Filename (string) and params (dict) are required",
                extra=self.config.get_extra({"transform": self.name}),
            )
            raise Exception("Invalid configuration")

        filename = detail["filename"]
        if not filename.lower().endswith(".json"):
            logger.error(
                "Input file must a .json file",
                extra=self.config.get_extra({"transform": self.name}),
            )
            raise Exception("Invalid configuration")

        # => Materialize the path...
        detail["filename"] = self.config.get_file(detail["filename"])

    return args

process(state)

Load the json files into 'dict' frames and store them in the state.

Source code in enrichsdk/contrib/transforms/jsonsource/__init__.py
def process(self, state):
    """
    Load the json files into 'dict' frames and store them in the state.
    """
    logger.debug(
        "{} - process".format(self.name),
        extra=self.config.get_extra({"transform": self.name}),
    )

    source = self.args["source"]
    for name, detail in source.items():

        filename = detail["filename"]
        data = json.load(open(filename))

        updated_detail = {
            "df": data,
            "transform": self.name,
            "frametype": "dict",
            "params": [
                {
                    "type": "compute",
                },
                {
                    "type": "lineage",
                    "transform": self.name,
                    "dependencies": [
                        {"type": "file", "nature": "input", "objects": [filename]}
                    ],
                },
            ],
            "history": [
                # Add a log entry describing the change
                {"transform": self.name, "log": "Loaded json file"}
            ],
        }

        # Update the state.
        state.update_frame(name, updated_detail, create=True)

    ###########################################
    # => Return
    ###########################################
    return state

validate_args(what, state)

Double check the arguments

Source code in enrichsdk/contrib/transforms/jsonsource/__init__.py
def validate_args(self, what, state):
    """
    Double check the arguments
    """
    assert isinstance(self.args, dict)
    assert "source" in self.args
    assert isinstance(self.args["source"], dict)

    source = self.args["source"]
    for name, detail in source.items():
        assert ("frametype" in detail) and (detail["frametype"] == "dict")
        assert "filename" in detail
        assert "params" in detail

validate_results(what, state)

Check to make sure that the execution completed correctly

Source code in enrichsdk/contrib/transforms/jsonsource/__init__.py
def validate_results(self, what, state):
    """
    Check to make sure that the execution completed correctly
    """

    source = self.args["source"]
    for name, detail in source.items():
        if not state.reached_stage(name, self.name):
            raise Exception("Could not find new frame created for {}".format(name))

            detail = state.get_frame(name)
            df = detail["df"]

            # Check if it is a valid dictionary...
            assert isinstance(df, dict)

PQExport(*args, **kwargs)

Bases: Sink

Parquet export for dataframes.

The configuration requires a list of exports, each of which specifies a pattern for the frame name::

 'conf': {
    'args': {
        "exports": [
          {
              "name": "%(frame)s_pq",
              "type": "pq", # optional. Default is pq
              "frames": ["cars"],
              "filename": "%(output)s/%(runid)s/%(frame)s.pq",
              "params": {
                  # parquet parameters.
                  # "compression": 'gzip'
                  # "engine": 'auto'
                  # "index" :None,
                  # "partition_cols": None
              }
           }
        ]
    }
}
Source code in enrichsdk/contrib/transforms/pqexport/__init__.py
def __init__(self, *args, **kwargs):
    super().__init__(*args, **kwargs)
    self.name = "PQExport"
    self.roles_supported = ["Export"]
    self.roles_current = "Export"

    data_root = os.path.join(os.environ["ENRICH_TEST"], self.name)
    self.testdata = {
        "data_root": data_root,
        "outputdir": os.path.join(data_root, "runs"),
        "inputdir": os.path.join(os.environ["ENRICH_TEST"]),
        "statedir": os.path.join(os.environ["ENRICH_TEST"], self.name, "state"),
        "conf": {
            "args": {
                "exports": [
                    {
                        "name": "%(frame)s_pq",
                        # "type": "pq",
                        "frames": ["cars"],
                        "filename": "%(output)s/%(runid)s/%(frame)s.pq",
                        "sample": True,
                        "params": {
                            # "compression": 'gzip'
                            # "engine": 'auto'
                            # "index" :None,
                            # "partition_cols": None
                        },
                    }
                ]
            }
        },
        "data": {
            "cars": {
                "transform": "CarSales",
                "filename": "sales.csv",
                "params": {"sep": ","},
            }
        },
    }

process(state)

Export frames as parquet files as shown in the example.

Source code in enrichsdk/contrib/transforms/pqexport/__init__.py
def process(self, state):
    """
    Export frames as parquet files as shown in the example.
    """

    # Sanity check...
    for e in self.args["exports"]:

        namebase = e["name"]
        params = e.get("params", {})
        sample = e.get("sample", True)

        # Collect all the frames data
        for f in e["frames"]:

            detail = state.get_frame(f)
            if detail is None:
                raise Exception("Frame not present in state: {}".format(f))

            if detail["frametype"] != "pandas":
                raise Exception("Frame not a pandas dataframe: {}".format(f))
                continue

            df = detail["df"]

            # Resolve the locations
            filename = os.path.abspath(
                self.config.get_file(e["filename"], extra={"frame": f})
            )
            relpath = self.config.get_relative_path(
                filename, what="enrich_data_dir"
            )

            # Check over-rides
            overrides = self.frame_get_overrides(detail)
            save = overrides.get("save", True)
            if save:
                try:
                    os.makedirs(os.path.dirname(filename))
                except:
                    pass
                df.to_parquet(filename, **params)

                if sample:
                    size = min(1000, df.shape[0])
                    df.sample(size).to_parquet(filename + ".sample", **params)

            else:
                logger.warn(
                    "Did not save {} due to overrides".format(f),
                    extra=self.config.get_extra(
                        {
                            "transform": self.name,
                            "data": "Overrides: {}".format(overrides),
                        }
                    ),
                )

            if not os.path.exists(filename):
                logger.error(
                    "PQ file not created or missing",
                    extra=self.config.get_extra(
                        {
                            "transform": self.name,
                            "data": "Filename: {}\nOverride Present: {}".format(
                                filename, override_present
                            ),
                        }
                    ),
                )
                raise Exception("PQ file missing")

            # => Create state detail
            state_detail = {
                "df": None,
                "frametype": "db",
                "transform": self.name,
                "params": [
                    {
                        "filename": filename,
                        "action": "output",
                        "frametype": "binary",
                        "columns": self.collapse_columns(detail),
                        "descriptions": ["Parquet export of {} frame".format(f)],
                        "components": [
                            {
                                "filename": relpath,
                                "type": "pq",
                                "rows": df.shape[0],
                                "columns": df.shape[1],
                                "sha256sum": get_checksum(filename),
                                "filesize": "{0:0.1f} MB".format(
                                    get_file_size(filename) / (1024 * 1024)
                                ),
                                "modified_time": str(
                                    time.ctime(os.path.getmtime(filename))
                                ),
                                "create_time": str(
                                    time.ctime(os.path.getctime(filename))
                                ),
                            }
                        ],
                    },
                    {
                        "type": "lineage",
                        "transform": self.name,
                        "dependencies": [
                            {
                                "type": "dataframe",
                                "nature": "input",
                                "objects": [f],
                            },
                            {
                                "type": "file",
                                "nature": "output",
                                "objects": [filename],
                            },
                        ],
                    },
                ],
                "history": [{"transform": self.name, "log": "Write PQite export"}],
            }
            try:
                name = namebase % {"frame": f}
                state.update_frame(name, state_detail, create=True)
                state.make_note("Generated PQ export for {}".format(f))
            except:
                logger.exception(
                    "Unable to store state",
                    extra=self.config.get_extra({"transform": self.name}),
                )
                raise Exception("Error while storing")

SQLExport(*args, **kwargs)

Bases: Sink

Export dataframes into the SQL database. Args specify what and how the export should happen.

The transform args provides the specification:

  * exports: A list of files that must be exported. Each is a
    dictionary with the following elements:

      * name: Name of this export. Used for internal tracking and notifications.
      * filename: Output filename. Can refer to other global attributes such as `data_root`, `enrich_root_dir` etc
      * type: Type of the export. Only `sqlite` supported for now
      * frames: List of frames of the type `pandas` that should
        exported as part of this file
      * indexes: Columns on which indexes should be created. Note that these are common across the frames. We check if the column is present in the frame and create the index

Example::

....
"transforms": {
    "enabled": [
       ...
       {
         "transform": "SQLExport",
          "args": {
              "exports": [
                {
                   "type": "sqlite",
                   "filename": "%(output)s/cars.sqlite",
                   "frames": ["cars", "alpha"]
                },
               ...
              ]
            },
           ...
       }
    ...
   }
 }
Source code in enrichsdk/contrib/transforms/sqlexport/__init__.py
def __init__(self, *args, **kwargs):
    super().__init__(*args, **kwargs)
    self.name = "SQLExport"
    self.description = "Export frames into SQLite/Other formats"
    self.roles_supported = ["Export"]
    self.roles_current = "Export"

    self.testdata = {
        "conf": {
            "args": {
                "exports": [
                    {
                        "name": "customerinfo",
                        "filename": "%(output)s/%(runid)s/customerinfo.sqlite",
                        "type": "sqlite",
                        "frames": ["customerinfo"],
                    }
                ]
            }
        },
        "data": {
            "customerinfo": {
                "transform": "MemberLoyaltyMetadata",
                "filename": "customerinfo.csv",
                "params": {"sep": ",", "dtype": {"MEMBERSHIP_ID_SUFFIX": "str"}},
            }
        },
    }

preload_clean_args(args)

Enforce the args specification given in the example above

Source code in enrichsdk/contrib/transforms/sqlexport/__init__.py
def preload_clean_args(self, args):
    """
    Enforce the args specification given in the
    example above
    """
    args = super().preload_clean_args(args)

    if len(args) == 0:
        raise Exception("Empty args provided")

    if ("exports" not in args) or (not isinstance(args["exports"], list)):
        raise Exception("SQLExport requires a series of exports (a list)")

    for e in args["exports"]:

        if (
            (not isinstance(e, dict))
            or ("filename" not in e)
            or ("name" not in e)
            or ("type" not in e)
            or ("frames" not in e)
        ):
            raise Exception(
                "Each element of the export should be a dictionary with filename, type, and frames"
            )

        if ("replace" in e) and (not isinstance(e["replace"], bool)):
            raise Exception("replace is a boolean variable")

        if e["type"] != "sqlite":
            raise Exception("Only sqlite exports are supported in current version")

        e["filename"] = os.path.abspath(self.config.get_file(e["filename"]))
        e["relpath"] = os.path.relpath(e["filename"], self.config.data_root)

    return args

process(state)

Execute the export specification.

Source code in enrichsdk/contrib/transforms/sqlexport/__init__.py
def process(self, state):
    """
    Execute the export specification.
    """

    exports =  self.args["exports"]


    # Look at messages sent from transforms and collect the
    # frames.
    extra = defaultdict(list)
    while True:
        msg = state.msgpop(self.name)
        if msg is None:
            break
        if not isinstance(msg, dict):
            continue
        data = msg.get('data', None)
        if ((isinstance(data, dict)) and
            ('frames' in data)):
            name = data.get('name', 'default')
            frames = data['frames']
            extra[name].extend(frames)

    # Sanity check...
    for e in exports:

        name = e.get('name', 'default')

        # Collect all the frames data
        missing = []
        invalid = []
        frames = {}

        allframes = e["frames"] + extra.get(name, [])
        for f in allframes:
            detail = state.get_frame(f)
            if detail is None:
                missing.append(f)
                continue

            if detail["frametype"] != "pandas":
                invalid.append(f)
                continue

            frames[f] = detail

        if len(missing) > 0 or len(invalid) > 0:
            logger.error(
                "Unable to export frames",
                extra=self.config.get_extra(
                    {
                        "transform": self.name,
                        "data": "Invalid: {}\nMissing: {}".format(invalid, missing),
                    }
                ),
            )
            raise Exception("Error while exporting")

        filename = e["filename"]
        filename = os.path.abspath(self.config.get_file(filename))
        relpath = self.config.get_relative_path(filename, what="enrich_data_dir")

        name = e.get("name", os.path.basename(filename))
        replace = e.get("replace", False)
        indexes = e.get("indexes", [])

        try:
            os.makedirs(os.path.dirname(filename))
        except:
            pass

        # Creating a database file
        conn = sqlite3.connect(filename)
        c = conn.cursor()

        for f in frames:

            # => Write the frames
            overrides = self.frame_get_overrides(frames[f])
            override_present = len(overrides) > 0
            save = overrides.get("save", True)
            if save:
                try:

                    # Status flag
                    failed = False

                    df = frames[f]["df"]

                    # Drop existing table if necessary...
                    if replace:
                        c.execute(f"DROP TABLE IF EXISTS {f}")

                    # => First create the table schema
                    ddl = pd.io.sql.get_schema(df, f)
                    c.execute(ddl)  # CREATE table

                    # => Index created.
                    indexmsg = "No indexes" if len(indexes) == 0 else ""
                    for col in indexes:
                        if col in df.columns:
                            try:
                                c.execute(f"CREATE INDEX {col}_{f}_index ON {f} ({col})")
                                indexmsg += f"[{col}] Index created\n"
                            except Exception as e:
                                indexmsg += f"[{col}] Exception {e}\n"
                        else:
                                indexmsg += f"[{col}] Index skipped. Column not present\n"

                    # => Dump the dataframe to a csv
                    fd, tmpfile = tempfile.mkstemp(prefix="sqlexport")
                    df.to_csv(tmpfile, header=False, index=False)

                    # => Load it into sqlite
                    cmd = [
                        "/usr/bin/sqlite3",
                        filename,
                        "-cmd",
                        ".separator ,",
                        ".import {} {}".format(tmpfile, f),
                    ]

                    process = subprocess.Popen(
                        cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE
                    )
                    out, err = process.communicate()
                    err = err.decode("utf-8")
                    out = out.decode("utf-8")

                    # Dump what we have seen
                    if len(err) > 0:
                        failed = True
                        logfunc = partial(
                            logger.error, "Unable to export {}".format(f)
                        )
                    else:
                        logfunc = partial(logger.debug, "Exported {}".format(f))
                    logfunc(
                        extra=self.config.get_extra(
                            {
                                "transform": self.name,
                                "data": "Filename:{}\nIndex:{}\nOutput\n-----\n{}\n\nErr\n----\n{}".format(
                                    filename, indexmsg, out, err
                                ),
                            }
                        )
                    )

                    # => Update the state for this transform..
                    if not failed:
                        state.update_frame(
                            f,
                            {
                                "df": frames[f]["df"],
                                "frametype": frames[f]["frametype"],
                                "transform": self.name,
                                "history": [{"log": "Exported to SQLite"}],
                                "params": {
                                    "type": "lineage",
                                    "transform": self.name,
                                    "dependencies": [
                                        {
                                            "type": "dataframe",
                                            "nature": "input",
                                            "objects": [f],
                                        },
                                        {
                                            "type": "file",
                                            "nature": "output",
                                            "objects": [filename],
                                        },
                                    ],
                                },
                            },
                        )

                except:
                    logger.exception(
                        "Unable to export dataframe {}".format(f),
                        extra=self.config.get_extra(
                            {
                                "transform": self.name,
                            }
                        ),
                    )

                # Cleanup...
                try:
                    if os.path.exists(tmpfile):
                        os.remove(tmpfile)
                except:
                    pass

                # Dont proceed
                if failed:
                    raise Exception("Error while exporting {}".format(f))

            else:
                logger.warn(
                    "Did not save {} due to overrides".format(f),
                    extra=self.config.get_extra(
                        {
                            "transform": self.name,
                            "data": "Overrides: {}".format(overrides),
                        }
                    ),
                )

        conn.close()
        if not os.path.exists(filename):
            logger.error(
                "SQLite file not created or missing",
                extra=self.config.get_extra(
                    {
                        "transform": self.name,
                        "data": "Filename: {}\nOverride Present: {}".format(
                            filename, override_present
                        ),
                    }
                ),
            )
            raise Exception("SQLite file missing")

        # Now create the state entry
        detail = {
            "df": None,
            "frametype": "db",
            "transform": self.name,
            "params": {
                "filename": filename,
                "action": "output",
                "frametype": "db",
                "descriptions": [
                    "SQLite export of {} frames ({})".format(
                        len(frames), ",".join(frames)
                    )
                ],
                "notes": ["Frames included: {}".format(",".join(frames))],
                "components": [
                    {
                        "filename": relpath,
                        "type": "sqlite",
                        "sha256sum": get_checksum(filename),
                        "filesize": "{0:0.3f} MB".format(
                            get_file_size(filename) / (1024 * 1024)
                        ),
                        "modified_time": str(
                            time.ctime(os.path.getmtime(filename))
                        ),
                        "create_time": str(time.ctime(os.path.getctime(filename))),
                    }
                ],
            },
            "history": [{"transform": self.name, "log": "Write SQLite export"}],
        }
        try:
            state.update_frame(name, detail, create=True)
            state.make_note("Generated database export")
        except:
            logger.exception(
                "Unable to store state",
                extra=self.config.get_extra({"transform": self.name}),
            )
            raise Exception("Error while storing")

validate_results(what, state)

Check to make sure that the execution completed correctly

Source code in enrichsdk/contrib/transforms/sqlexport/__init__.py
def validate_results(self, what, state):
    """
    Check to make sure that the execution completed correctly
    """
    pass

TableSink(*args, **kwargs)

Bases: Sink

Transform to dump dataframes in state into files.

Parameters specific to this module include:

* sink: A dictionary of dataframe names and how to output them. It has a number of attributes:

    * type: Output type. Only 'table' value is supported for this
      option right now.
    * filename: Output filename. You can use default parameters such
      runid

  The name of the dataframe can be a regular expression allowing you
  specify a simple rule for arbitrary number of frames.

Example::

....
"transforms": {
    "enabled": [
        ...
        {
            "transform": "TableSink",
            "args": {
                "sink": {
                    "article": {
                        "frametype": "pandas",
                        "filename": "%(output)s/%(runid)s/article.csv",
                        "params": {
                            "sep": "|"
                        }
                    },
                    ...
                }
            }
            ...
        }
    ]
 }
Source code in enrichsdk/contrib/transforms/tablesink/__init__.py
def __init__(self, *args, **kwargs):
    super().__init__(*args, **kwargs)
    self.name = "TableSink"

    self.testdata = {
        "conf": {
            "args": {
                "sink": {
                    "cars": {
                        "frametype": "pandas",
                        "filename": "%(output)s/cars_revised.csv",
                        "params": {"sep": ","},
                    }
                }
            }
        },
        "data": {
            "carmodel": {
                "transform": "CarModel",
                "filename": "cars.csv",
                "params": {"sep": ","},
                "state": {
                    "params": [
                        {
                            "type": "args",
                            "transform": "TableSink",
                            "args": {"save": False, "rows": 124},
                        }
                    ]
                },
            }
        },
    }

preload_clean_args(args)

Check to make sure that the arguments is consistent with the specification mentioned above

Source code in enrichsdk/contrib/transforms/tablesink/__init__.py
def preload_clean_args(self, args):
    """
    Check to make sure that the arguments
    is consistent with the specification mentioned
    above
    """

    # Sanity check...
    assert isinstance(args, dict)
    assert len(args) > 0

    # Backward compatability
    if "sink" not in args:
        args = {"sink": args}

    args = super().preload_clean_args(args)

    assert "sink" in args
    sink = args["sink"]
    assert isinstance(sink, dict)
    assert len(sink) > 0

    for pattern, detail in sink.items():

        if (
            ("type" in detail)
            and ("frametype" not in detail)
            and (detail["type"] == "table")
        ):
            detail["frametype"] = "pandas"

        if ("frametype" not in detail) or (detail["frametype"] != "pandas"):
            logger.error(
                "Invalid configuration. Only pandas table source supported by this sink transform",
                extra=self.config.get_extra({"transform": self.name}),
            )
            raise Exception("Invalid configuration")

        if (
            ("filename" not in detail)
            or (not isinstance(detail["filename"], str))
            or ("params" not in detail)
            or (not isinstance(detail["params"], dict))
        ):
            logger.error(
                "Invalid args. Filename (string) and params (dict) are required",
                extra=self.config.get_extra({"transform": self.name}),
            )
            raise Exception("Invalid configuration")

        detail["root"] = self.config.enrich_data_dir

        tags = detail.get("tags", [])
        if isinstance(tags, str):
            tags = [tags]
        detail["tags"] = tags

        sortcols = detail.get("sort", [])
        if isinstance(sortcols, str):
            sortcols = [sortcols]
        detail["sort"] = sortcols

    return args

process(state)

Execute the tablesink specification

Source code in enrichsdk/contrib/transforms/tablesink/__init__.py
def process(self, state):
    """
    Execute the tablesink specification
    """

    # => What exists in the state?
    available_frames = state.get_frame_list()

    # => First construct input for the pandasframe
    extra = {}
    write_input = {}
    args_input = {}
    framecls = self.config.get_dataframe("pandas")
    skipped = []

    sink = self.args["sink"]
    for pattern in sink:
        # The pattern could be precise dataframe name or could be
        # regular expression.
        regex = re.compile("^{}$".format(pattern))
        frames = [
            m.group(0) for f in available_frames for m in [regex.search(f)] if m
        ]

        for f in frames:
            # For each dataframe that is in the system

            detail = state.get_frame(f)

            # => Are there any extra instructions?
            overrides = self.frame_get_overrides(detail)

            # => Materialize the path...
            filename = sink[pattern]["filename"]
            filename = self.config.get_file(
                filename, create_dir=True, extra={"frame_name": f}
            )
            # Collect all column information
            extra[f] = {
                "columns": self.collapse_columns(detail),
                "notes": self.collapse_notes(detail),
                "descriptions": self.collapse_descriptions(detail),
                "overrides": overrides,
            }

            # Which dataframe
            df = detail["df"]

            # Get the
            frametype = detail["frametype"]

            # Order the dataframe if it is needed
            sortcols = sink[pattern]["sort"]
            if len(sortcols) > 0:
                df = framecls.sort_values(df, sortcols, ascending=False)
            params = sink[pattern].get("params", {})

            # Should I be writing this csv?
            save = params.get("save", True)
            save = overrides.get("save", save)

            if not save:
                skipped.append(f)

            write_input[f] = {
                "save": save,
                "frametype": frametype,
                "pattern": pattern,
                "df": df,
                "filename": filename,
                "params": params,
            }

            args_input[f] = copy.copy(sink[pattern])
            args_input[f]["filename"] = filename

    if len(skipped) > 0:
        logger.warning(
            "Not saving {} tables".format(len(skipped)),
            extra={"transform": self.name, "data": skipped},
        )

    # => Write output details
    framecls.write(args_input, write_input)

    for name in write_input:

        detail = write_input[name]

        # => Insert columns and tags
        pattern = detail["pattern"]

        #
        detail["params"]["tags"] = sink[pattern]["tags"]

        # Incorporate columns, notes and description
        additional_params = extra[name]
        overrides = additional_params.pop("overrides", {})

        detail["params"].update(additional_params)

        # Insert any overrides provided in the state
        if "rows" in overrides:
            detail["params"]["components"][0]["rows"] = overrides["rows"]

        detail["params"] = [
            detail["params"],
            {
                "type": "lineage",
                "transform": self.name,
                "dependencies": [
                    {"type": "dataframe", "nature": "input", "objects": [name]},
                    {
                        "type": "file",
                        "nature": "output",
                        "objects": [detail["filename"]],
                    },
                ],
            },
        ]

        # Insert additional detail
        detail["transform"] = self.name
        detail["history"] = [{"transform": self.name, "log": "Wrote output"}]

        state.update_frame(name, detail)

    logger.debug(
        "Finished writing data",
        extra=self.config.get_extra({"transform": self.name}),
    )

validate_args(what, state)

Extra validation of the arguments

Source code in enrichsdk/contrib/transforms/tablesink/__init__.py
def validate_args(self, what, state):
    """
    Extra validation of the arguments
    """
    sink = self.args["sink"]

    assert isinstance(sink, dict)
    for pattern, detail in sink.items():
        assert ("frametype" in detail) and (detail["frametype"] == "pandas")
        assert "filename" in detail
        assert "params" in detail

TableSource(*args, **kwargs)

Bases: Source

Load csv/other files into pandas dataframes.

Parameters specific to this module include:

* source: A dictionary of dataframe names and how to
  load them. It has a number of attributes:

    * type: Output type. Only 'table' value is
      supported for this option.
    * filename: Output filename. You can use default
      parameters such  runid
    * params: Params are arguments to [pandas read_csv](https://pandas.pydata.org/pandas-docs/stable/generated/pandas.read_csv.html)

Example::

....
"transforms": {
    "enabled": [
        {
            "transform": "TableSink",
            "args": {
                "source": {
                    "article": {
                        "type": "file",
                        "filename": "%(data)s/ArticleData.csv",
                        "params": {
                            "delimiter": "|",
                            "dtype": {
                                "sku": "category",
                                "mc_code": "int64",
                                "sub_class": "category",
                                "priority": "float64"
                                ...
                            }
                        }
                    }
                }
              ...
            }
        }
    ...
   ]
 }
Source code in enrichsdk/contrib/transforms/tablesource/__init__.py
def __init__(self, *args, **kwargs):
    super().__init__(*args, **kwargs)
    self.name = "TableSource"

clean(state)

This is meant for subclass to do some additional processing.

Source code in enrichsdk/contrib/transforms/tablesource/__init__.py
def clean(self, state):
    """
    This is meant for subclass to do some additional processing.
    """
    pass

preload_clean_args(args)

Clean when the spec is loaded...

Source code in enrichsdk/contrib/transforms/tablesource/__init__.py
def preload_clean_args(self, args):
    """
    Clean when the spec is loaded...
    """

    # Backward compatability
    if "source" not in args:
        args = {"source": args}

    args = super().preload_clean_args(args)

    # Sanity check...
    assert isinstance(args, dict)
    assert "source" in args
    assert isinstance(args["source"], dict)

    for name, detail in args["source"].items():

        # Insert the frame into the args for backward
        # compatability.
        if (
            ("type" in detail)
            and ("frametype" not in detail)
            and (detail["type"] == "table")
        ):
            detail["frametype"] = "pandas"

        if ("frametype" not in detail) or (detail["frametype"] != "pandas"):
            logger.error(
                "Invalid configuration. Only pandas table source supported by this source transform",
                extra=self.config.get_extra({"transform": self.name}),
            )
            raise Exception("Invalid configuration")

        if (
            ("filename" not in detail)
            or (not isinstance(detail["filename"], str))
            or ("params" not in detail)
            or (not isinstance(detail["params"], dict))
        ):
            logger.error(
                "Invalid args. Filename (string) and params (dict) are required",
                extra=self.config.get_extra({"transform": self.name}),
            )
            raise Exception("Invalid configuration")

        mapping = {
            "str": str,
            "float": np.float64,
            "float64": np.float64,
            "np.float64": np.float64,
            "np.int64": np.int64,
            "int": np.int64,
            "int64": np.int64,
            "datetime": datetime,
            "category": "category",
        }

        # => Materialize the path...
        detail["filename"] = self.config.get_file(detail["filename"])
        detail["root"] = self.config.enrich_data_dir
        params = detail["params"]
        if "dtype" in params:
            for attr in params["dtype"]:
                if params["dtype"][attr] in mapping:
                    params["dtype"][attr] = mapping[params["dtype"][attr]]
                else:
                    params["dtype"][attr] = eval(params["dtype"][attr])

    return args

process(state)

Load file...

Source code in enrichsdk/contrib/transforms/tablesource/__init__.py
def process(self, state):
    """
    Load file...
    """
    # Load all the dataframes. This will use the full enrich
    # deployment's beefed up read function.
    framecls = self.config.get_dataframe("pandas")
    source = self.args["source"]

    dfstates = framecls.read(source, {})
    for dfname, dfstate in dfstates.items():

        # => Insert column description
        columns = dfstate["params"]["columns"]
        for c in columns:
            columns[c]["description"] = self.get_column_description(dfname, c)

        params = dfstate["params"]
        if "filename" in params:
            filename = params["filename"]
        elif "filename" in source.get(dfname, {}):
            filename = self.args[dfname]["filename"]
        else:
            filename = "Unknown"

        detail = {
            "df": dfstate["df"],
            "transform": self.name,
            "frametype": "pandas",
            "params": [
                params,
                {
                    "type": "lineage",
                    "transform": self.name,
                    "dependencies": [
                        {"type": "file", "nature": "input", "objects": [filename]}
                    ],
                },
            ],
            "history": [
                {
                    "transform": self.name,
                    "log": "Read data using {}".format(framecls.__class__.__name__),
                }
            ],
        }
        try:
            state.update_frame(dfname, detail, create=True)
        except:
            logger.exception(
                "Unable to store state",
                extra=self.config.get_extra({"transform": self.name}),
            )
            raise Exception("Error while loading")

    # Clean the uploaded data...
    self.clean(state)

fileops

FileOperations(*args, **kwargs)

Bases: FileOperationsBase

FileOperations performs a number of operations on files generated by pipelines.

The transform takes a list of actions. The only action type supported for now is copy. Each copy task requires source, destination, and instruction on what to do with existing file.

Example::

{
        "transform": "FileOperations",
        "enable": true,
        "dependencies": {
           ....
        },
        "args": {
            "actions": [
                  {
                    "action": "copy",
                    "src": "%(output)s/%(runid)s/profile.sqlite",
                    "dst": "%(data_root)s/shared/campaigns/profile_daily/profile.sqlite",
                    "backupsuffix": ".backup"
                  }
             ]
        }
}
Source code in enrichsdk/contrib/transforms/fileops/__init__.py
def __init__(self, *args, **kwargs):
    super().__init__(*args, **kwargs)
    self.name = "FileOperations"
    self.description = "File operations such as move/copy etc."

    test_root = os.environ["ENRICH_TEST"]
    self.testdata = {
        "data_root": os.path.join(test_root, self.name),
        "outputdir": os.path.join(test_root, self.name, "output"),
        "inputdir": test_root,
        "statedir": os.path.join(test_root, self.name, "state"),
        "global": {"args": {"rundate": "2020-01-10"}},
        "conf": {
            "args": {
                "actions": [
                    {
                        "action": "copy",
                        "src": "%(output)s/%(runid)s/outputs/cars.csv",
                        "dst": "%(data_root)s/shared/%(rundate)s/hello.csv",
                    }
                ]
            }
        },
        "data": {},
    }
instantiable() classmethod

Make this class instantiable

Source code in enrichsdk/contrib/transforms/fileops/__init__.py
@classmethod
def instantiable(cls):
    """
    Make this class instantiable
    """
    return True

jsonsink

JSONSink(*args, **kwargs)

Bases: Sink

Store a 'dict' frame that is present in the state into a file.

Params are meant to be passed as parameter to update_frame.

Example configuration::

 "args": {
     "sink": {
         'test': {
             'frametype': 'dict',
             'filename': '%(output)s/%(runid)s/mytestoutput.json',
             'params': {}
         }
     }
 }
Source code in enrichsdk/contrib/transforms/jsonsink/__init__.py
def __init__(self, *args, **kwargs):
    super().__init__(*args, **kwargs)
    self.name = "JSONSink"

    self.testdata = {
        "conf": {
            "args": {
                "sink": {
                    "frame1": {
                        "frametype": "dict",
                        "filename": "%(output)s/%(runid)s/mytestoutput.json",
                        "params": {},
                    }
                }
            }
        },
        "data": {
            "frame1": {
                "filename": "outputjson.json",
                "frametype": "dict",
                "transform": "TestJSON",
                "params": {},
            }
        },
    }
preload_clean_args(args)

Clean when the spec is loaded...

Source code in enrichsdk/contrib/transforms/jsonsink/__init__.py
def preload_clean_args(self, args):
    """
    Clean when the spec is loaded...
    """

    if "sink" not in args:
        args = {"sink": args}

    args = super().preload_clean_args(args)

    assert "sink" in args
    assert isinstance(args["sink"], dict)
    assert len(args["sink"]) > 0

    sink = args["sink"]
    for name, detail in sink.items():

        if ("frametype" not in detail) or (detail["frametype"] != "dict"):
            logger.error(
                "Invalid configuration. Only JSON/Dictionaries are supported by this sink transform",
                extra=self.config.get_extra({"transform": self.name}),
            )
            raise Exception("Invalid configuration")

        if (
            ("filename" not in detail)
            or (not isinstance(detail["filename"], str))
            or ("params" not in detail)
            or (not isinstance(detail["params"], dict))
        ):
            logger.error(
                "Invalid args. Filename (string) and params (dict) are required",
                extra=self.config.get_extra({"transform": self.name}),
            )
            raise Exception("Invalid configuration")

        filename = detail["filename"]
        if not filename.lower().endswith(".json"):
            logger.error(
                "Input file must a .json file",
                extra=self.config.get_extra({"transform": self.name}),
            )
            raise Exception("Invalid configuration")

        detail["root"] = self.config.enrich_data_dir

        tags = detail.get("tags", [])
        if isinstance(tags, str):
            tags = [tags]
        detail["tags"] = tags

        # => Materialize the path...
        detail["filename"] = self.config.get_file(
            detail["filename"], extra={"frame_name": name}
        )

    return args
process(state)

Store the dictionary 'frames' in the state in files.

Source code in enrichsdk/contrib/transforms/jsonsink/__init__.py
def process(self, state):
    """
    Store the dictionary 'frames' in the state in files.
    """
    logger.debug(
        "{} - process".format(self.name),
        extra=self.config.get_extra({"transform": self.name}),
    )

    available_frames = state.get_frame_list()

    # => First construct input for the pandasframe
    extra = {}
    args_input = {}
    write_input = {}
    framecls = self.config.get_dataframe("dict")

    sink = self.args["sink"]
    for pattern in sink:
        # The pattern could be precise dataframe name or could be
        # regular expression.
        regex = re.compile("^{}$".format(pattern))
        frames = [
            m.group(0) for f in available_frames for m in [regex.search(f)] if m
        ]
        if len(frames) == 0:
            logger.warning("Pattern has not matched any frames: {}".format(pattern))
            continue

        for f in frames:

            # Get the details of this frame
            detail = state.get_frame(f)

            # Handle frametype
            frametype = detail["frametype"]
            if frametype != "dict":
                logger.warning(
                    "Pattern has matched non-dict frame: {}".format(f),
                    extra=self.config.get_extra({"transform": self.name}),
                )
                continue

            # Now construct the output file name
            filename = sink[pattern]["filename"]
            filename = self.config.get_file(
                filename, create_dir=True, extra={"frame_name": f}
            )

            extra[f] = {
                "notes": self.collapse_notes(detail),
                "descriptions": self.collapse_descriptions(detail),
            }

            params = sink[pattern].get("params", {})
            write_input[f] = {
                "frametype": detail["frametype"],
                "filename": filename,
                "pattern": pattern,
                "df": detail["df"],
                "params": params,
            }

            args_input[f] = copy.copy(sink[pattern])
            args_input[f]["filename"] = filename

    framecls.write(args_input, write_input)

    for name in write_input:

        detail = write_input[name]

        # => Insert columns and tags
        pattern = detail["pattern"]
        detail["params"]["tags"] = sink[pattern]["tags"]

        # Incorporate columns, notes and description
        detail["params"].update(extra[name])

        detail["params"] = [
            detail["params"],
            {
                "type": "lineage",
                "transform": self.name,
                "dependencies": [
                    {"type": "dataframe", "nature": "input", "objects": [name]},
                    {
                        "type": "file",
                        "nature": "output",
                        "objects": [detail["params"]["filename"]],
                    },
                ],
            },
        ]

        # Insert additional detail
        detail["transform"] = self.name
        detail["history"] = [{"transform": self.name, "log": "Wrote output"}]

        state.update_frame(name, detail)

    logger.debug(
        "Finished writing data",
        extra=self.config.get_extra({"transform": self.name}),
    )

    ###########################################
    # => Return
    ###########################################
    return state
validate_args(what, state)

An extra check on the arguments to make sure it is consistent with the specification

Source code in enrichsdk/contrib/transforms/jsonsink/__init__.py
def validate_args(self, what, state):
    """
    An extra check on the arguments to make sure
    it is consistent with the specification
    """
    args = self.args
    assert "sink" in args
    assert isinstance(args["sink"], dict)
    assert len(args["sink"]) > 0

    sink = args["sink"]
    for name, detail in sink.items():
        assert ("frametype" in detail) and (detail["frametype"] == "dict")
        assert "filename" in detail
        assert "params" in detail
validate_results(what, state)

Check to make sure that the execution completed correctly

Source code in enrichsdk/contrib/transforms/jsonsink/__init__.py
def validate_results(self, what, state):
    """
    Check to make sure that the execution completed correctly
    """
    pass

jsonsource

JSONSource(*args, **kwargs)

Bases: Source

Load a file into a 'dict' frame in the state.

Params are meant to be passed as parameter to update_frame.

Example configuration::

 ...

 "args": {
     "source": {
         'hello': {
             'frametype': 'dict',
             'filename': '%(data_root)s/shared/hello.json',
             'params': {}
         }
     }
 }
Source code in enrichsdk/contrib/transforms/jsonsource/__init__.py
def __init__(self, *args, **kwargs):
    super().__init__(*args, **kwargs)
    self.name = "JSONSource"

    self.testdata = {
        "conf": {
            "args": {
                "source": {
                    "hello": {
                        "filename": "%(data_root)s/shared/hello.json",
                        "frametype": "dict",
                        "params": {},
                    }
                }
            }
        },
        "data": {},
    }
preload_clean_args(args)

Check if the args are consistent with the specification.

Source code in enrichsdk/contrib/transforms/jsonsource/__init__.py
def preload_clean_args(self, args):
    """
    Check if the args are consistent with the
    specification.
    """

    # Backward compatabilty
    if "source" not in args:
        args = {"source": args}

    args = super().preload_clean_args(args)

    # Sanity check...
    assert isinstance(args, dict)
    assert "source" in args
    assert isinstance(args["source"], dict)

    source = args["source"]
    for name, detail in source.items():

        if ("frametype" not in detail) or (detail["frametype"] != "dict"):
            logger.error(
                "Invalid configuration. Only JSON source supported by this source transform",
                extra=self.config.get_extra({"transform": self.name}),
            )
            raise Exception("Invalid configuration")

        if (
            ("filename" not in detail)
            or (not isinstance(detail["filename"], str))
            or ("params" not in detail)
            or (not isinstance(detail["params"], dict))
        ):
            logger.error(
                "Invalid args. Filename (string) and params (dict) are required",
                extra=self.config.get_extra({"transform": self.name}),
            )
            raise Exception("Invalid configuration")

        filename = detail["filename"]
        if not filename.lower().endswith(".json"):
            logger.error(
                "Input file must a .json file",
                extra=self.config.get_extra({"transform": self.name}),
            )
            raise Exception("Invalid configuration")

        # => Materialize the path...
        detail["filename"] = self.config.get_file(detail["filename"])

    return args
process(state)

Load the json files into 'dict' frames and store them in the state.

Source code in enrichsdk/contrib/transforms/jsonsource/__init__.py
def process(self, state):
    """
    Load the json files into 'dict' frames and store them in the state.
    """
    logger.debug(
        "{} - process".format(self.name),
        extra=self.config.get_extra({"transform": self.name}),
    )

    source = self.args["source"]
    for name, detail in source.items():

        filename = detail["filename"]
        data = json.load(open(filename))

        updated_detail = {
            "df": data,
            "transform": self.name,
            "frametype": "dict",
            "params": [
                {
                    "type": "compute",
                },
                {
                    "type": "lineage",
                    "transform": self.name,
                    "dependencies": [
                        {"type": "file", "nature": "input", "objects": [filename]}
                    ],
                },
            ],
            "history": [
                # Add a log entry describing the change
                {"transform": self.name, "log": "Loaded json file"}
            ],
        }

        # Update the state.
        state.update_frame(name, updated_detail, create=True)

    ###########################################
    # => Return
    ###########################################
    return state
validate_args(what, state)

Double check the arguments

Source code in enrichsdk/contrib/transforms/jsonsource/__init__.py
def validate_args(self, what, state):
    """
    Double check the arguments
    """
    assert isinstance(self.args, dict)
    assert "source" in self.args
    assert isinstance(self.args["source"], dict)

    source = self.args["source"]
    for name, detail in source.items():
        assert ("frametype" in detail) and (detail["frametype"] == "dict")
        assert "filename" in detail
        assert "params" in detail
validate_results(what, state)

Check to make sure that the execution completed correctly

Source code in enrichsdk/contrib/transforms/jsonsource/__init__.py
def validate_results(self, what, state):
    """
    Check to make sure that the execution completed correctly
    """

    source = self.args["source"]
    for name, detail in source.items():
        if not state.reached_stage(name, self.name):
            raise Exception("Could not find new frame created for {}".format(name))

            detail = state.get_frame(name)
            df = detail["df"]

            # Check if it is a valid dictionary...
            assert isinstance(df, dict)

pqexport

PQExport(*args, **kwargs)

Bases: Sink

Parquet export for dataframes.

The configuration requires a list of exports, each of which specifies a pattern for the frame name::

 'conf': {
    'args': {
        "exports": [
          {
              "name": "%(frame)s_pq",
              "type": "pq", # optional. Default is pq
              "frames": ["cars"],
              "filename": "%(output)s/%(runid)s/%(frame)s.pq",
              "params": {
                  # parquet parameters.
                  # "compression": 'gzip'
                  # "engine": 'auto'
                  # "index" :None,
                  # "partition_cols": None
              }
           }
        ]
    }
}
Source code in enrichsdk/contrib/transforms/pqexport/__init__.py
def __init__(self, *args, **kwargs):
    super().__init__(*args, **kwargs)
    self.name = "PQExport"
    self.roles_supported = ["Export"]
    self.roles_current = "Export"

    data_root = os.path.join(os.environ["ENRICH_TEST"], self.name)
    self.testdata = {
        "data_root": data_root,
        "outputdir": os.path.join(data_root, "runs"),
        "inputdir": os.path.join(os.environ["ENRICH_TEST"]),
        "statedir": os.path.join(os.environ["ENRICH_TEST"], self.name, "state"),
        "conf": {
            "args": {
                "exports": [
                    {
                        "name": "%(frame)s_pq",
                        # "type": "pq",
                        "frames": ["cars"],
                        "filename": "%(output)s/%(runid)s/%(frame)s.pq",
                        "sample": True,
                        "params": {
                            # "compression": 'gzip'
                            # "engine": 'auto'
                            # "index" :None,
                            # "partition_cols": None
                        },
                    }
                ]
            }
        },
        "data": {
            "cars": {
                "transform": "CarSales",
                "filename": "sales.csv",
                "params": {"sep": ","},
            }
        },
    }
process(state)

Export frames as parquet files as shown in the example.

Source code in enrichsdk/contrib/transforms/pqexport/__init__.py
def process(self, state):
    """
    Export frames as parquet files as shown in the example.
    """

    # Sanity check...
    for e in self.args["exports"]:

        namebase = e["name"]
        params = e.get("params", {})
        sample = e.get("sample", True)

        # Collect all the frames data
        for f in e["frames"]:

            detail = state.get_frame(f)
            if detail is None:
                raise Exception("Frame not present in state: {}".format(f))

            if detail["frametype"] != "pandas":
                raise Exception("Frame not a pandas dataframe: {}".format(f))
                continue

            df = detail["df"]

            # Resolve the locations
            filename = os.path.abspath(
                self.config.get_file(e["filename"], extra={"frame": f})
            )
            relpath = self.config.get_relative_path(
                filename, what="enrich_data_dir"
            )

            # Check over-rides
            overrides = self.frame_get_overrides(detail)
            save = overrides.get("save", True)
            if save:
                try:
                    os.makedirs(os.path.dirname(filename))
                except:
                    pass
                df.to_parquet(filename, **params)

                if sample:
                    size = min(1000, df.shape[0])
                    df.sample(size).to_parquet(filename + ".sample", **params)

            else:
                logger.warn(
                    "Did not save {} due to overrides".format(f),
                    extra=self.config.get_extra(
                        {
                            "transform": self.name,
                            "data": "Overrides: {}".format(overrides),
                        }
                    ),
                )

            if not os.path.exists(filename):
                logger.error(
                    "PQ file not created or missing",
                    extra=self.config.get_extra(
                        {
                            "transform": self.name,
                            "data": "Filename: {}\nOverride Present: {}".format(
                                filename, override_present
                            ),
                        }
                    ),
                )
                raise Exception("PQ file missing")

            # => Create state detail
            state_detail = {
                "df": None,
                "frametype": "db",
                "transform": self.name,
                "params": [
                    {
                        "filename": filename,
                        "action": "output",
                        "frametype": "binary",
                        "columns": self.collapse_columns(detail),
                        "descriptions": ["Parquet export of {} frame".format(f)],
                        "components": [
                            {
                                "filename": relpath,
                                "type": "pq",
                                "rows": df.shape[0],
                                "columns": df.shape[1],
                                "sha256sum": get_checksum(filename),
                                "filesize": "{0:0.1f} MB".format(
                                    get_file_size(filename) / (1024 * 1024)
                                ),
                                "modified_time": str(
                                    time.ctime(os.path.getmtime(filename))
                                ),
                                "create_time": str(
                                    time.ctime(os.path.getctime(filename))
                                ),
                            }
                        ],
                    },
                    {
                        "type": "lineage",
                        "transform": self.name,
                        "dependencies": [
                            {
                                "type": "dataframe",
                                "nature": "input",
                                "objects": [f],
                            },
                            {
                                "type": "file",
                                "nature": "output",
                                "objects": [filename],
                            },
                        ],
                    },
                ],
                "history": [{"transform": self.name, "log": "Write PQite export"}],
            }
            try:
                name = namebase % {"frame": f}
                state.update_frame(name, state_detail, create=True)
                state.make_note("Generated PQ export for {}".format(f))
            except:
                logger.exception(
                    "Unable to store state",
                    extra=self.config.get_extra({"transform": self.name}),
                )
                raise Exception("Error while storing")

sqlexport

SQLExport(*args, **kwargs)

Bases: Sink

Export dataframes into the SQL database. Args specify what and how the export should happen.

The transform args provides the specification:

  * exports: A list of files that must be exported. Each is a
    dictionary with the following elements:

      * name: Name of this export. Used for internal tracking and notifications.
      * filename: Output filename. Can refer to other global attributes such as `data_root`, `enrich_root_dir` etc
      * type: Type of the export. Only `sqlite` supported for now
      * frames: List of frames of the type `pandas` that should
        exported as part of this file
      * indexes: Columns on which indexes should be created. Note that these are common across the frames. We check if the column is present in the frame and create the index

Example::

....
"transforms": {
    "enabled": [
       ...
       {
         "transform": "SQLExport",
          "args": {
              "exports": [
                {
                   "type": "sqlite",
                   "filename": "%(output)s/cars.sqlite",
                   "frames": ["cars", "alpha"]
                },
               ...
              ]
            },
           ...
       }
    ...
   }
 }
Source code in enrichsdk/contrib/transforms/sqlexport/__init__.py
def __init__(self, *args, **kwargs):
    super().__init__(*args, **kwargs)
    self.name = "SQLExport"
    self.description = "Export frames into SQLite/Other formats"
    self.roles_supported = ["Export"]
    self.roles_current = "Export"

    self.testdata = {
        "conf": {
            "args": {
                "exports": [
                    {
                        "name": "customerinfo",
                        "filename": "%(output)s/%(runid)s/customerinfo.sqlite",
                        "type": "sqlite",
                        "frames": ["customerinfo"],
                    }
                ]
            }
        },
        "data": {
            "customerinfo": {
                "transform": "MemberLoyaltyMetadata",
                "filename": "customerinfo.csv",
                "params": {"sep": ",", "dtype": {"MEMBERSHIP_ID_SUFFIX": "str"}},
            }
        },
    }
preload_clean_args(args)

Enforce the args specification given in the example above

Source code in enrichsdk/contrib/transforms/sqlexport/__init__.py
def preload_clean_args(self, args):
    """
    Enforce the args specification given in the
    example above
    """
    args = super().preload_clean_args(args)

    if len(args) == 0:
        raise Exception("Empty args provided")

    if ("exports" not in args) or (not isinstance(args["exports"], list)):
        raise Exception("SQLExport requires a series of exports (a list)")

    for e in args["exports"]:

        if (
            (not isinstance(e, dict))
            or ("filename" not in e)
            or ("name" not in e)
            or ("type" not in e)
            or ("frames" not in e)
        ):
            raise Exception(
                "Each element of the export should be a dictionary with filename, type, and frames"
            )

        if ("replace" in e) and (not isinstance(e["replace"], bool)):
            raise Exception("replace is a boolean variable")

        if e["type"] != "sqlite":
            raise Exception("Only sqlite exports are supported in current version")

        e["filename"] = os.path.abspath(self.config.get_file(e["filename"]))
        e["relpath"] = os.path.relpath(e["filename"], self.config.data_root)

    return args
process(state)

Execute the export specification.

Source code in enrichsdk/contrib/transforms/sqlexport/__init__.py
def process(self, state):
    """
    Execute the export specification.
    """

    exports =  self.args["exports"]


    # Look at messages sent from transforms and collect the
    # frames.
    extra = defaultdict(list)
    while True:
        msg = state.msgpop(self.name)
        if msg is None:
            break
        if not isinstance(msg, dict):
            continue
        data = msg.get('data', None)
        if ((isinstance(data, dict)) and
            ('frames' in data)):
            name = data.get('name', 'default')
            frames = data['frames']
            extra[name].extend(frames)

    # Sanity check...
    for e in exports:

        name = e.get('name', 'default')

        # Collect all the frames data
        missing = []
        invalid = []
        frames = {}

        allframes = e["frames"] + extra.get(name, [])
        for f in allframes:
            detail = state.get_frame(f)
            if detail is None:
                missing.append(f)
                continue

            if detail["frametype"] != "pandas":
                invalid.append(f)
                continue

            frames[f] = detail

        if len(missing) > 0 or len(invalid) > 0:
            logger.error(
                "Unable to export frames",
                extra=self.config.get_extra(
                    {
                        "transform": self.name,
                        "data": "Invalid: {}\nMissing: {}".format(invalid, missing),
                    }
                ),
            )
            raise Exception("Error while exporting")

        filename = e["filename"]
        filename = os.path.abspath(self.config.get_file(filename))
        relpath = self.config.get_relative_path(filename, what="enrich_data_dir")

        name = e.get("name", os.path.basename(filename))
        replace = e.get("replace", False)
        indexes = e.get("indexes", [])

        try:
            os.makedirs(os.path.dirname(filename))
        except:
            pass

        # Creating a database file
        conn = sqlite3.connect(filename)
        c = conn.cursor()

        for f in frames:

            # => Write the frames
            overrides = self.frame_get_overrides(frames[f])
            override_present = len(overrides) > 0
            save = overrides.get("save", True)
            if save:
                try:

                    # Status flag
                    failed = False

                    df = frames[f]["df"]

                    # Drop existing table if necessary...
                    if replace:
                        c.execute(f"DROP TABLE IF EXISTS {f}")

                    # => First create the table schema
                    ddl = pd.io.sql.get_schema(df, f)
                    c.execute(ddl)  # CREATE table

                    # => Index created.
                    indexmsg = "No indexes" if len(indexes) == 0 else ""
                    for col in indexes:
                        if col in df.columns:
                            try:
                                c.execute(f"CREATE INDEX {col}_{f}_index ON {f} ({col})")
                                indexmsg += f"[{col}] Index created\n"
                            except Exception as e:
                                indexmsg += f"[{col}] Exception {e}\n"
                        else:
                                indexmsg += f"[{col}] Index skipped. Column not present\n"

                    # => Dump the dataframe to a csv
                    fd, tmpfile = tempfile.mkstemp(prefix="sqlexport")
                    df.to_csv(tmpfile, header=False, index=False)

                    # => Load it into sqlite
                    cmd = [
                        "/usr/bin/sqlite3",
                        filename,
                        "-cmd",
                        ".separator ,",
                        ".import {} {}".format(tmpfile, f),
                    ]

                    process = subprocess.Popen(
                        cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE
                    )
                    out, err = process.communicate()
                    err = err.decode("utf-8")
                    out = out.decode("utf-8")

                    # Dump what we have seen
                    if len(err) > 0:
                        failed = True
                        logfunc = partial(
                            logger.error, "Unable to export {}".format(f)
                        )
                    else:
                        logfunc = partial(logger.debug, "Exported {}".format(f))
                    logfunc(
                        extra=self.config.get_extra(
                            {
                                "transform": self.name,
                                "data": "Filename:{}\nIndex:{}\nOutput\n-----\n{}\n\nErr\n----\n{}".format(
                                    filename, indexmsg, out, err
                                ),
                            }
                        )
                    )

                    # => Update the state for this transform..
                    if not failed:
                        state.update_frame(
                            f,
                            {
                                "df": frames[f]["df"],
                                "frametype": frames[f]["frametype"],
                                "transform": self.name,
                                "history": [{"log": "Exported to SQLite"}],
                                "params": {
                                    "type": "lineage",
                                    "transform": self.name,
                                    "dependencies": [
                                        {
                                            "type": "dataframe",
                                            "nature": "input",
                                            "objects": [f],
                                        },
                                        {
                                            "type": "file",
                                            "nature": "output",
                                            "objects": [filename],
                                        },
                                    ],
                                },
                            },
                        )

                except:
                    logger.exception(
                        "Unable to export dataframe {}".format(f),
                        extra=self.config.get_extra(
                            {
                                "transform": self.name,
                            }
                        ),
                    )

                # Cleanup...
                try:
                    if os.path.exists(tmpfile):
                        os.remove(tmpfile)
                except:
                    pass

                # Dont proceed
                if failed:
                    raise Exception("Error while exporting {}".format(f))

            else:
                logger.warn(
                    "Did not save {} due to overrides".format(f),
                    extra=self.config.get_extra(
                        {
                            "transform": self.name,
                            "data": "Overrides: {}".format(overrides),
                        }
                    ),
                )

        conn.close()
        if not os.path.exists(filename):
            logger.error(
                "SQLite file not created or missing",
                extra=self.config.get_extra(
                    {
                        "transform": self.name,
                        "data": "Filename: {}\nOverride Present: {}".format(
                            filename, override_present
                        ),
                    }
                ),
            )
            raise Exception("SQLite file missing")

        # Now create the state entry
        detail = {
            "df": None,
            "frametype": "db",
            "transform": self.name,
            "params": {
                "filename": filename,
                "action": "output",
                "frametype": "db",
                "descriptions": [
                    "SQLite export of {} frames ({})".format(
                        len(frames), ",".join(frames)
                    )
                ],
                "notes": ["Frames included: {}".format(",".join(frames))],
                "components": [
                    {
                        "filename": relpath,
                        "type": "sqlite",
                        "sha256sum": get_checksum(filename),
                        "filesize": "{0:0.3f} MB".format(
                            get_file_size(filename) / (1024 * 1024)
                        ),
                        "modified_time": str(
                            time.ctime(os.path.getmtime(filename))
                        ),
                        "create_time": str(time.ctime(os.path.getctime(filename))),
                    }
                ],
            },
            "history": [{"transform": self.name, "log": "Write SQLite export"}],
        }
        try:
            state.update_frame(name, detail, create=True)
            state.make_note("Generated database export")
        except:
            logger.exception(
                "Unable to store state",
                extra=self.config.get_extra({"transform": self.name}),
            )
            raise Exception("Error while storing")
validate_results(what, state)

Check to make sure that the execution completed correctly

Source code in enrichsdk/contrib/transforms/sqlexport/__init__.py
def validate_results(self, what, state):
    """
    Check to make sure that the execution completed correctly
    """
    pass

tablesink

TableSink(*args, **kwargs)

Bases: Sink

Transform to dump dataframes in state into files.

Parameters specific to this module include:

* sink: A dictionary of dataframe names and how to output them. It has a number of attributes:

    * type: Output type. Only 'table' value is supported for this
      option right now.
    * filename: Output filename. You can use default parameters such
      runid

  The name of the dataframe can be a regular expression allowing you
  specify a simple rule for arbitrary number of frames.

Example::

....
"transforms": {
    "enabled": [
        ...
        {
            "transform": "TableSink",
            "args": {
                "sink": {
                    "article": {
                        "frametype": "pandas",
                        "filename": "%(output)s/%(runid)s/article.csv",
                        "params": {
                            "sep": "|"
                        }
                    },
                    ...
                }
            }
            ...
        }
    ]
 }
Source code in enrichsdk/contrib/transforms/tablesink/__init__.py
def __init__(self, *args, **kwargs):
    super().__init__(*args, **kwargs)
    self.name = "TableSink"

    self.testdata = {
        "conf": {
            "args": {
                "sink": {
                    "cars": {
                        "frametype": "pandas",
                        "filename": "%(output)s/cars_revised.csv",
                        "params": {"sep": ","},
                    }
                }
            }
        },
        "data": {
            "carmodel": {
                "transform": "CarModel",
                "filename": "cars.csv",
                "params": {"sep": ","},
                "state": {
                    "params": [
                        {
                            "type": "args",
                            "transform": "TableSink",
                            "args": {"save": False, "rows": 124},
                        }
                    ]
                },
            }
        },
    }
preload_clean_args(args)

Check to make sure that the arguments is consistent with the specification mentioned above

Source code in enrichsdk/contrib/transforms/tablesink/__init__.py
def preload_clean_args(self, args):
    """
    Check to make sure that the arguments
    is consistent with the specification mentioned
    above
    """

    # Sanity check...
    assert isinstance(args, dict)
    assert len(args) > 0

    # Backward compatability
    if "sink" not in args:
        args = {"sink": args}

    args = super().preload_clean_args(args)

    assert "sink" in args
    sink = args["sink"]
    assert isinstance(sink, dict)
    assert len(sink) > 0

    for pattern, detail in sink.items():

        if (
            ("type" in detail)
            and ("frametype" not in detail)
            and (detail["type"] == "table")
        ):
            detail["frametype"] = "pandas"

        if ("frametype" not in detail) or (detail["frametype"] != "pandas"):
            logger.error(
                "Invalid configuration. Only pandas table source supported by this sink transform",
                extra=self.config.get_extra({"transform": self.name}),
            )
            raise Exception("Invalid configuration")

        if (
            ("filename" not in detail)
            or (not isinstance(detail["filename"], str))
            or ("params" not in detail)
            or (not isinstance(detail["params"], dict))
        ):
            logger.error(
                "Invalid args. Filename (string) and params (dict) are required",
                extra=self.config.get_extra({"transform": self.name}),
            )
            raise Exception("Invalid configuration")

        detail["root"] = self.config.enrich_data_dir

        tags = detail.get("tags", [])
        if isinstance(tags, str):
            tags = [tags]
        detail["tags"] = tags

        sortcols = detail.get("sort", [])
        if isinstance(sortcols, str):
            sortcols = [sortcols]
        detail["sort"] = sortcols

    return args
process(state)

Execute the tablesink specification

Source code in enrichsdk/contrib/transforms/tablesink/__init__.py
def process(self, state):
    """
    Execute the tablesink specification
    """

    # => What exists in the state?
    available_frames = state.get_frame_list()

    # => First construct input for the pandasframe
    extra = {}
    write_input = {}
    args_input = {}
    framecls = self.config.get_dataframe("pandas")
    skipped = []

    sink = self.args["sink"]
    for pattern in sink:
        # The pattern could be precise dataframe name or could be
        # regular expression.
        regex = re.compile("^{}$".format(pattern))
        frames = [
            m.group(0) for f in available_frames for m in [regex.search(f)] if m
        ]

        for f in frames:
            # For each dataframe that is in the system

            detail = state.get_frame(f)

            # => Are there any extra instructions?
            overrides = self.frame_get_overrides(detail)

            # => Materialize the path...
            filename = sink[pattern]["filename"]
            filename = self.config.get_file(
                filename, create_dir=True, extra={"frame_name": f}
            )
            # Collect all column information
            extra[f] = {
                "columns": self.collapse_columns(detail),
                "notes": self.collapse_notes(detail),
                "descriptions": self.collapse_descriptions(detail),
                "overrides": overrides,
            }

            # Which dataframe
            df = detail["df"]

            # Get the
            frametype = detail["frametype"]

            # Order the dataframe if it is needed
            sortcols = sink[pattern]["sort"]
            if len(sortcols) > 0:
                df = framecls.sort_values(df, sortcols, ascending=False)
            params = sink[pattern].get("params", {})

            # Should I be writing this csv?
            save = params.get("save", True)
            save = overrides.get("save", save)

            if not save:
                skipped.append(f)

            write_input[f] = {
                "save": save,
                "frametype": frametype,
                "pattern": pattern,
                "df": df,
                "filename": filename,
                "params": params,
            }

            args_input[f] = copy.copy(sink[pattern])
            args_input[f]["filename"] = filename

    if len(skipped) > 0:
        logger.warning(
            "Not saving {} tables".format(len(skipped)),
            extra={"transform": self.name, "data": skipped},
        )

    # => Write output details
    framecls.write(args_input, write_input)

    for name in write_input:

        detail = write_input[name]

        # => Insert columns and tags
        pattern = detail["pattern"]

        #
        detail["params"]["tags"] = sink[pattern]["tags"]

        # Incorporate columns, notes and description
        additional_params = extra[name]
        overrides = additional_params.pop("overrides", {})

        detail["params"].update(additional_params)

        # Insert any overrides provided in the state
        if "rows" in overrides:
            detail["params"]["components"][0]["rows"] = overrides["rows"]

        detail["params"] = [
            detail["params"],
            {
                "type": "lineage",
                "transform": self.name,
                "dependencies": [
                    {"type": "dataframe", "nature": "input", "objects": [name]},
                    {
                        "type": "file",
                        "nature": "output",
                        "objects": [detail["filename"]],
                    },
                ],
            },
        ]

        # Insert additional detail
        detail["transform"] = self.name
        detail["history"] = [{"transform": self.name, "log": "Wrote output"}]

        state.update_frame(name, detail)

    logger.debug(
        "Finished writing data",
        extra=self.config.get_extra({"transform": self.name}),
    )
validate_args(what, state)

Extra validation of the arguments

Source code in enrichsdk/contrib/transforms/tablesink/__init__.py
def validate_args(self, what, state):
    """
    Extra validation of the arguments
    """
    sink = self.args["sink"]

    assert isinstance(sink, dict)
    for pattern, detail in sink.items():
        assert ("frametype" in detail) and (detail["frametype"] == "pandas")
        assert "filename" in detail
        assert "params" in detail

tablesource

TableSource(*args, **kwargs)

Bases: Source

Load csv/other files into pandas dataframes.

Parameters specific to this module include:

* source: A dictionary of dataframe names and how to
  load them. It has a number of attributes:

    * type: Output type. Only 'table' value is
      supported for this option.
    * filename: Output filename. You can use default
      parameters such  runid
    * params: Params are arguments to [pandas read_csv](https://pandas.pydata.org/pandas-docs/stable/generated/pandas.read_csv.html)

Example::

....
"transforms": {
    "enabled": [
        {
            "transform": "TableSink",
            "args": {
                "source": {
                    "article": {
                        "type": "file",
                        "filename": "%(data)s/ArticleData.csv",
                        "params": {
                            "delimiter": "|",
                            "dtype": {
                                "sku": "category",
                                "mc_code": "int64",
                                "sub_class": "category",
                                "priority": "float64"
                                ...
                            }
                        }
                    }
                }
              ...
            }
        }
    ...
   ]
 }
Source code in enrichsdk/contrib/transforms/tablesource/__init__.py
def __init__(self, *args, **kwargs):
    super().__init__(*args, **kwargs)
    self.name = "TableSource"
clean(state)

This is meant for subclass to do some additional processing.

Source code in enrichsdk/contrib/transforms/tablesource/__init__.py
def clean(self, state):
    """
    This is meant for subclass to do some additional processing.
    """
    pass
preload_clean_args(args)

Clean when the spec is loaded...

Source code in enrichsdk/contrib/transforms/tablesource/__init__.py
def preload_clean_args(self, args):
    """
    Clean when the spec is loaded...
    """

    # Backward compatability
    if "source" not in args:
        args = {"source": args}

    args = super().preload_clean_args(args)

    # Sanity check...
    assert isinstance(args, dict)
    assert "source" in args
    assert isinstance(args["source"], dict)

    for name, detail in args["source"].items():

        # Insert the frame into the args for backward
        # compatability.
        if (
            ("type" in detail)
            and ("frametype" not in detail)
            and (detail["type"] == "table")
        ):
            detail["frametype"] = "pandas"

        if ("frametype" not in detail) or (detail["frametype"] != "pandas"):
            logger.error(
                "Invalid configuration. Only pandas table source supported by this source transform",
                extra=self.config.get_extra({"transform": self.name}),
            )
            raise Exception("Invalid configuration")

        if (
            ("filename" not in detail)
            or (not isinstance(detail["filename"], str))
            or ("params" not in detail)
            or (not isinstance(detail["params"], dict))
        ):
            logger.error(
                "Invalid args. Filename (string) and params (dict) are required",
                extra=self.config.get_extra({"transform": self.name}),
            )
            raise Exception("Invalid configuration")

        mapping = {
            "str": str,
            "float": np.float64,
            "float64": np.float64,
            "np.float64": np.float64,
            "np.int64": np.int64,
            "int": np.int64,
            "int64": np.int64,
            "datetime": datetime,
            "category": "category",
        }

        # => Materialize the path...
        detail["filename"] = self.config.get_file(detail["filename"])
        detail["root"] = self.config.enrich_data_dir
        params = detail["params"]
        if "dtype" in params:
            for attr in params["dtype"]:
                if params["dtype"][attr] in mapping:
                    params["dtype"][attr] = mapping[params["dtype"][attr]]
                else:
                    params["dtype"][attr] = eval(params["dtype"][attr])

    return args
process(state)

Load file...

Source code in enrichsdk/contrib/transforms/tablesource/__init__.py
def process(self, state):
    """
    Load file...
    """
    # Load all the dataframes. This will use the full enrich
    # deployment's beefed up read function.
    framecls = self.config.get_dataframe("pandas")
    source = self.args["source"]

    dfstates = framecls.read(source, {})
    for dfname, dfstate in dfstates.items():

        # => Insert column description
        columns = dfstate["params"]["columns"]
        for c in columns:
            columns[c]["description"] = self.get_column_description(dfname, c)

        params = dfstate["params"]
        if "filename" in params:
            filename = params["filename"]
        elif "filename" in source.get(dfname, {}):
            filename = self.args[dfname]["filename"]
        else:
            filename = "Unknown"

        detail = {
            "df": dfstate["df"],
            "transform": self.name,
            "frametype": "pandas",
            "params": [
                params,
                {
                    "type": "lineage",
                    "transform": self.name,
                    "dependencies": [
                        {"type": "file", "nature": "input", "objects": [filename]}
                    ],
                },
            ],
            "history": [
                {
                    "transform": self.name,
                    "log": "Read data using {}".format(framecls.__class__.__name__),
                }
            ],
        }
        try:
            state.update_frame(dfname, detail, create=True)
        except:
            logger.exception(
                "Unable to store state",
                extra=self.config.get_extra({"transform": self.name}),
            )
            raise Exception("Error while loading")

    # Clean the uploaded data...
    self.clean(state)

enrichsdk.contrib.lib.transforms

AnomaliesBase(*args, **kwargs)

Bases: Compute

Compute anomalies given a dataframe with columns

Features of transform baseclass include:

* Flexible configuration
* Highlevel specification of columns combinations and detection strategy
Source code in enrichsdk/contrib/lib/transforms/anomalies/__init__.py
def __init__(self, *args, **kwargs):
    super().__init__(*args, **kwargs)
    self.name = "AnomaliesBase"
    self.description = "Compute anomalies in column(s) of a dataframe"
    self.testdata = {
        "data_root": os.path.join(os.environ["ENRICH_TEST"], self.name),
        "statedir": os.path.join(os.environ["ENRICH_TEST"], self.name, "state"),
        "conf": {"args": {}},
        "data": {},
    }

get_dataset_s3(spec, paths)

Gets all files from paths and puts them together into a single dataframe. If self.args['cache']==True, then this consolidated dataframe is cached / read from cache as applicable.

Source code in enrichsdk/contrib/lib/transforms/anomalies/__init__.py
def get_dataset_s3(self, spec, paths):
    '''
    Gets all files from paths and puts them together
    into a single dataframe. If self.args['cache']==True,
    then this consolidated dataframe is cached / read from cache
    as applicable.
    '''
    msg = ""

    run_date    = self.args['run_date']
    config      = spec['config']
    dataset     = config['dataset']

    cache = self.args.get("cache", False)
    cachename = f"{dataset}-{run_date}"
    cachefile = f"cache/{self.name}-rawdata-cache-" + cachename + ".csv"

    # read from cache if available
    if cache:
        try:
            os.makedirs(os.path.dirname(cachefile))
        except:
            pass
        if os.path.exists(cachefile):
            msg = f"Location: {cachefile}" + "\n"
            df = pd.read_csv(cachefile)
            logger.debug(f"Read cached {dataset}", extra={"transform": self.name})
            return df

    # read from S3
    dfs = []
    for path in paths:
        _df = self.read_s3_data(path)
        if _df is None:
            msg += f"Path not found, skipping: {path}" + "\n"
            continue
        msg += f"Read from path: {path}" + "\n"
        dfs.append(_df)
    df = pd.concat(dfs)

    logger.debug(f"Read fresh {dataset}", extra={"transform": self.name})

    # Cache it for future use
    if cache:
        df.to_csv(cachefile, index=False)

    # Insert lineage if possible
    lineage = None
    if (len(paths) > 0):
        lineage = {
            "type": "lineage",
            "transform": self.name,
            "dependencies": [
                {
                    "type": "file",
                    "nature": "input",
                    "objects": [paths],
                },
            ],
        }

    if not self.state.has_frame(spec['name']):
        self.update_frame(spec, f"Dataset: {dataset}", df, lineage)

    return df

get_handlers(spec)

Define various callbacks that take a dataframe, spec and compute.

Source code in enrichsdk/contrib/lib/transforms/anomalies/__init__.py
def get_handlers(self, spec):
    """
    Define various callbacks that take a dataframe, spec
    and compute.
    """
    return {}

get_profile()

Read the profile json from API

Source code in enrichsdk/contrib/lib/transforms/anomalies/__init__.py
def get_profile(self):
    """
    Read the profile json from API
    """

    if (not hasattr(self, "args")):
        raise Exception(
            "'args' transform attribute should be defined to use default get_profile method"
        )
    for p in ['apicred']:
        if self.args.get(p) == None:
            raise Exception(
                f"'{p}' attribute in args should be defined to use default get_profile method"
                )

    # call the API to get the anomaly specs
    anomalyspecs, is_valid, msg = load_profile_api(self.args)
    logger.debug(
        f"Loading profile from API",
        extra={"transform": self.name, "data": msg},
    )
    if is_valid == False:
        raise Exception(f"Error loading profile")

    specs = anomalyspecs["specs"]
    logger.debug(
        f"Found {len(specs)} specs",
        extra={"transform": self.name, "data": json.dumps(anomalyspecs, indent=4)},
    )

    return anomalyspecs

preprocess_spec(spec)

to be overloaded in the derived class

Source code in enrichsdk/contrib/lib/transforms/anomalies/__init__.py
def preprocess_spec(self, spec):
    '''
    to be overloaded in the derived class
    '''
    return spec

process(state)

Run the computation and update the state

Source code in enrichsdk/contrib/lib/transforms/anomalies/__init__.py
def process(self, state):
    """
    Run the computation and update the state
    """
    logger.debug(
        "Start execution", extra=self.config.get_extra({"transform": self.name})
    )

    # Will be used in other places..
    self.state = state

    # Get the anomaly profile
    is_valid, profile, msg = profilespec.get_profile(self, "policyapp.outliersv2")
    if is_valid:
        logger.debug(
            f"Loaded profilespec",
            extra={"transform": self.name, "data": msg}
        )
    else:
        logger.error(
            f"Could not load profilespec",
            extra={"transform": self.name, "data": msg}
        )
        raise Exception("could not load profilespec")

    specs = profile.get("specs", None)
    if specs is None:
        raise Exception("Could not find 'specs' in profile")

    # Now go through each spec and generate anomaly reports
    for spec in specs:

        ## first, some checks on the spec
        do_process_spec = True
        name = spec.get('name', 'NO_SPEC_NAME')

        enabled = spec.get("active", True)
        if not enabled:
            logger.debug(
                f"Spec not enabled, skipping.",
                extra={"transform": self.name, "data": json.dumps(spec, indent=4)}
            )
            do_process_spec = False
            continue

        for f in ["name", "config"]:
            if f not in spec:
                logger.exception(
                    f"Spec has no {f} param, skipping.",
                    extra={"transform": self.name, "data": json.dumps(spec, indent=4)}
                )
                do_process_spec = False
                break
        if do_process_spec == False:
            continue

        config = spec['config']

        for f in ["source_id"]:
            if f not in config:
                logger.exception(
                    f"Spec config has no {f} param, skipping.",
                    extra={"transform": self.name, "data": json.dumps(spec, indent=4)}
                )
                do_process_spec = False
                break

        if not do_process_spec:
            continue

        ## pre-process the spec
        try:
            spec = self.preprocess_spec(spec)
            logger.debug(f"Preproccessed spec: {spec['name']}",
                         extra={
                             'transform': self.name,
                             'data': json.dumps(spec, indent=4)
                         })

            ## we can now proceed with processing the spec
            # frist, load the source data
            data = self.load_dataset(spec)

            ## process the spec to detect outliers
            data = self.process_spec(spec, data)

            if ((not isinstance(data, dict)) or
                (len(data) == 0)):
                continue

            # write the detected outliers
            self.store_result(spec, data)
        except:
            logger.exception(f"Failed to process {name}",
                             extra={
                                 'transform': self.name
                             })

    # Done
    logger.debug(
        "Complete execution", extra=self.config.get_extra({"transform": self.name})
    )

    ###########################################
    # => Return
    ###########################################
    return state

process_spec_default(data, spec)

Handle one specification at a time..

Source code in enrichsdk/contrib/lib/transforms/anomalies/__init__.py
def process_spec_default(self, data, spec):
    """
    Handle one specification at a time..
    """

    partialsamplerate = 0.05
    samplerate_lut = {
        "all": 1.0,
        "partial": partialsamplerate,
        "none": 0.0
    }
    tolerances = {
        "low": 1,
        "medium": 2,
        "high": 3,
    }

    def anomaly_note(row, threshold):
        distance = row[f"__anomaly_distance__"]
        if distance > threshold:
            return f"{(round(distance/threshold,2))}x outside expected sample deviation"
        return f"within expected sample deviation"


    msg = ""
    msg += f"Using default centroid distance anomaly detector" + "\n"

    config = spec["config"]
    msg += f"Config: {json.dumps(config, indent=4)}" + "\n"

    # Get hold of the data first...
    name = spec["name"]
    orig_df = data
    total_samples = len(orig_df)

    metrics     = config.get("metrics", orig_df.columns)
    groups      = config.get('groups', [])
    outputs     = config.get("outputs", orig_df.columns)
    dimensions  = config.get("dimensions", orig_df.columns)
    columns     = list(set(metrics + outputs + dimensions))

    msg += f"Combined set of columns: {columns}" + "\n"
    msg += f"{note(orig_df, 'Original DF')}" + "\n"

    #########
    # default anomaly detection
    #########
    # get tolerance thresold
    tolerance = config.get("threshold", config.get("thresold", "medium"))
    scalefactor = tolerances.get(tolerance, 2)

    # get the sample strategy for the normal data
    normal_samples = config.get("normal_samples", "partial")
    samplerate = samplerate_lut[normal_samples]

    msg += f"(tolerance, scalefactor): ({tolerance}, {scalefactor})" + "\n"

    logger.debug(f"Setting up for spec: {spec['name']}",
                     extra={
                         'transform': self.name,
                         'data': msg
                     })

    anomaly_stats = {}
    plotdata = {}
    dfs = []

    #########
    # we first do the leaf level, per metric to check for anomalies
    #########
    msg = f"Processing metrics: {metrics}" + "\n\n"

    for metric in metrics:

        # make a copy of the df, we'll keep adding anomlay metrics to it
        df = orig_df[columns].copy()

        if not is_numeric_dtype(df[metric]):
            msg += f"{metric} Metric not numeric. Skipping\n"
            continue

        # compute the anomalies for this metric
        points      = df[metric].to_numpy()     # all data as an MxN matrix
        centroid    = df[metric].mean()          # the computed centroid of the dataset
        distances   = abs(points - centroid)    # distances of each point to centroid
        stddev      = np.nanstd(points)      # std dev of distances
        threshold   = stddev * scalefactor
        anomalies   = np.where(distances.flatten()>threshold, 'anomaly', 'normal')    # flag where anomalies occur

        # add columns indicating anomaly label
        id = f"metric-{metric}"
        df['id'] = id
        df['level'] = 'metric'
        df['name'] = metric
        df['__is_anomaly__'] = pd.Series(anomalies)

        # add columns indicating reason for anomaly
        df[f"__anomaly_distance__"] = pd.Series(distances.flatten())
        df[f"__anomaly_note__"] = df.apply(lambda x: anomaly_note(x, threshold), axis=1)

        df_a = df[df['__is_anomaly__']=='anomaly']
        n_anomalies = len(df_a)
        perc_anomalies = round(n_anomalies/total_samples*100, 2)

        df_n = df[df['__is_anomaly__']=='normal'].sample(frac=samplerate)
        df_n = df_n[0:min(3*n_anomalies,len(df_n))] # min 3x n_anomalies or configured sample of normal samples
        n_nsamples = len(df_n)

        # for this metric, we now have all the detected anomalies and the sampled normal data
        sampled_df = pd.concat([df_a, df_n])

        msg += f"--------------------------" + "\n"
        msg += f"Metric: {metric}" + "\n"
        msg += f"Computed stddev: {stddev}" + "\n"
        msg += f"Threshold: {threshold}" + "\n"
        msg += f"Anomalies: {n_anomalies}/{total_samples}={perc_anomalies}%" + "\n"
        msg += f"--------------------------" + "\n\n"

        anomaly_stats[id] = {
            "level": 'metric',
            "name": metric,
            "dimensions": dimensions,
            "n_anomalies": n_anomalies,
            "perc_anomalies": perc_anomalies,
            "n_normalsamples": n_nsamples,
            "n_plotsamples": len(df),
        }
        plotdata[id] = df

        dfs.append(sampled_df)

    logger.debug(f"Processed metrics level: {spec['name']}",
                     extra={
                         'transform': self.name,
                         'data': msg
                     })


    # #########
    # # then we do the group level, hierarchial
    # #########
    msg = f"Processing groups: {groups}" + "\n\n"

    for group in groups:
        group_name      = group.get('group')
        g_dimensions    = group.get('dimensions', dimensions)
        g_metrics       = group.get('metrics')

        # we don't have what we need, skip
        if group_name == None or metrics == None:
            continue

        if not all([is_numeric_dtype(df[metric]) for metric in g_metrics]):
            msg += f"{group_name} One or more metrics are not numeric\n"
            continue

        # make a copy of the df, we'll keep adding anomlay metrics to it
        df = orig_df[columns].copy()

        points      = df[g_metrics].to_numpy()    # all data as an MxN matrix
        centroid    = df[g_metrics].mean().values # the computed centroid of the dataset
        distances   = distance.cdist(points, np.array([centroid]), 'euclidean') # distances of each point to centroid
        distances   = np.reshape(distances, len(distances))
        stddev      = np.nanstd(points)         # std dev of distances
        threshold   = stddev * scalefactor
        anomalies   = np.where(distances.flatten()>threshold, 'anomaly', 'normal')    # flag where anomalies occur

        # add columns indicating anomaly label
        id = f"group-{group_name}"
        df['id'] = id
        df['level'] = 'group'
        df['name'] = group_name
        df['__is_anomaly__'] = pd.Series(anomalies)

        # add columns indicating reason for anomaly
        df[f"__anomaly_distance__"] = pd.Series(distances.flatten())
        df[f"__anomaly_note__"] = df.apply(lambda x: anomaly_note(x, threshold), axis=1)

        df_a = df[df['__is_anomaly__']=='anomaly']
        n_anomalies = len(df_a)
        perc_anomalies = round(n_anomalies/total_samples*100, 2)

        df_n = df[df['__is_anomaly__']=='normal'].sample(frac=samplerate)
        df_n = df_n[0:min(3*n_anomalies,len(df_n))] # min 3x n_anomalies or configured sample of normal samples
        n_nsamples = len(df_n)

        # for this metric, we now have all the detected anomalies and the sampled normal data
        sampled_df = pd.concat([df_a, df_n])

        msg += f"--------------------------" + "\n"
        msg += f"Group: {group_name}" + "\n"
        msg += f"Computed stddev: {stddev}" + "\n"
        msg += f"Threshold: {threshold}" + "\n"
        msg += f"Anomalies: {n_anomalies}/{total_samples}={perc_anomalies}%" + "\n"
        msg += f"--------------------------" + "\n"

        anomaly_stats[id] = {
            "level": 'group',
            "name": group_name,
            "metrics": g_metrics,
            "dimensions": g_dimensions,
            "threshold": threshold,
            "n_anomalies": n_anomalies,
            "perc_anomalies": perc_anomalies,
            "n_normalsamples": n_nsamples,
            "n_plotsamples": len(df),
        }
        plotdata[id] = df

        dfs.append(sampled_df)

    logger.debug(f"Processed groups level: {spec['name']}",
                     extra={
                         'transform': self.name,
                         'data': msg
                     })

    if len(dfs) == 0:
        logger.debug(f"{name}: No outputs computed",
                     extra={
                         'transform': self.name,
                         'data': msg
                     })
        return None

    #########
    # construct the DF for output
    #########
    # concat for all metrics+groups
    df = pd.concat(dfs)
    # reorder columns
    first_cols = ['id', 'level', 'name']
    cols = first_cols + [c for c in df.columns if c not in first_cols]
    df = df[cols]

    msg = f"Final columns: {df.columns}" + "\n"

    window, start_date, end_date = self.get_window_dates(config, self.args)

    # compute stats of interest
    stats = {
        "timestamp": f"{datetime.now().isoformat()}",
        "policy": config,
        "data_start_date": f"{start_date}",
        "data_end_date": f"{end_date}",
        "strategy": "centroid",
        "tolerance": tolerance,
        "scalefactor": scalefactor,
        "normalsamples": normal_samples,
        "samplerate": samplerate,
        "n_rows": total_samples,
        "anomaly_stats": anomaly_stats,
    }

    msg += f"Stats: {json.dumps(stats, indent=4)}" + "\n"

    msg += f"{note(df, 'Anomaly DF')}" + "\n"

    logger.debug(f"Completed spec: {spec['name']}",
                     extra={
                         'transform': self.name,
                         'data': msg
                     })

    return {name: df, "stats": stats, "plotdata": plotdata}

ChangePointDetectorBase(*args, **kwargs)

Bases: Compute

Take a timeseries signal and identify changepoints in the signal

Features of transform baseclass include: * Flexible configuration * Highlevel specification of change point detection: * specified data source or custom method to generate one * generic change point detection method or custom defined ones

Source code in enrichsdk/contrib/lib/transforms/changepoints/__init__.py
def __init__(self, *args, **kwargs):
    super().__init__(*args, **kwargs)
    self.name = "ChangePointDetectorBase"
    self.description = "Change point(s) detection for a timeseries signal given a spec"
    self.testdata = {
        "data_root": os.path.join(os.environ["ENRICH_TEST"], self.name),
        "statedir": os.path.join(os.environ["ENRICH_TEST"], self.name, "state"),
        "conf": {"args": {}},
        "data": {},
    }

    self.cautions = {
        "low": {"color": "green", "desc": "MINIMAL", "rec": "In a BUSINESS-AS-USUAL regime now."},
        "medium": {"color": "gold", "desc": "LOW to MODERATE", "rec": "Expect LOW to MODERATE swings in this regime."},
        "high": {"color": "red", "desc": "HIGH to EXTREME", "rec": "Stay alert for HIGH to EXTREME swings in this regime."},
    }

    self.epoch = time.time()    #for output path

get_dataset_s3(spec, source, paths, start_date, end_date)

Gets all files from paths and puts them together into a single dataframe. If self.args['cache']==True, then this consolidated dataframe is cached / read from cache as applicable.

Source code in enrichsdk/contrib/lib/transforms/changepoints/__init__.py