State
Enrich has a notion of a pipeline state. The State object is used to keep track of all available data objects (dataframes, dictionaries, html objects etc.) and deliver it to whatever transform requires it.
The combination of transform and state allows a complex pipeline to be split into multiple stages, and independent development. Further the transform code can be spread across multiple repositories.
The State object associates metadata with every data object, that is dumped into the metadata.json at the end of the run.
The metadata associated with a data object includes::
- Name of the frame (each data object has a unique name)
- Category of the data object
- Type of frame (e.g., pandas, html, dict)
- History of access to the dataframe
- Parameters that can have arbitrary informations such as column documentation, security, messages etc.
- Stages that each data object goes through
EnrichStateBase(config, *args, **kwargs)
→
Bases: object
Base class for state manager
Config object is a required parameter
Source code in enrichsdk/core/state.py
get_kpi()
→
msgpop(to_transform)
→
Update the state with the details of a dataframe
Args: to_transform (str): Name of receiver transform
Returns: dict: If there is a message, return it else None
Source code in enrichsdk/core/state.py
msgpush(from_transform, to_transform, data)
→
Update the state with the details of a dataframe
Args: from_transform (str): Name of sender transform to_transform (str): Name of receiver transform message (dict): message
Source code in enrichsdk/core/state.py
update_frame(name, detail, create=True)
→
Update the state with the details of a dataframe
Args: name (str): Name of the dataframe detail (dict): Dict with a number of details create (bool): Add if the frame doesnt exist.
The detail has the elements mentioned above::
{
'df': df, # data object
'params': params, # extra metadata
'transform': self.name, # Callee
'history': [
{
'log': 'Passed some additional params'
}
]
}
Params could be a dict or a list of dicts. Each
entry has a type
and type-specific attributes.
Over a period of time a nuber of different
attributes were added. Some of them are::
1. Compute - column information and description
2. Overrides - information/instructions from one
transform to another
3. Lineage - Input and output dependencies for
better tracking
For example::
[
{
# Column metadata
'type': 'compute',
'columns': columns,
'description': [
"Cars1 Dataset with no changes"
]
},
{
# 'Fake' the saving
'type': 'overrides',
'transform': 'TableSink',
'args': {
'save': False,
'rows': 193332
}
},
{
# Dependency information
"type": "lineage",
"transform": self.name,
"dependencies": [
{
"type": "file",
"nature": "output",
"objects": [sqloutput, output]
},
{
"type": "dataframe",
"nature": "input",
"objects": ["cars1", "cars2"]
}
]
}
]
Source code in enrichsdk/core/state.py
update_kpi(name, value)
→
Add a KPI to the pipeline. We can
Args: name (str): Name of the KPI value (obj): Serializable value (int/str/float)