Skip to content

Assets

enrichsdk.contrib.lib.assets

Reusable library of modules (e.g., base classes and algorithms) to be incorporated into transforms

anonymizer

AnonymizerMixin

Bases: object

Embed the core anonymization functions in transforms

anonymize_init(anonargs)

Initialize the anonymizer

Source code in enrichsdk/contrib/lib/assets/anonymizer.py
def anonymize_init(self, anonargs):
    """
    Initialize the anonymizer
    """

    if not hasattr(self, 'state'):
        raise Exception("Missing state attribute in transform")

    #=> Initialize the anonymization args
    if (('cache' not in anonargs) or
        ('textgen_cred' not in anonargs)):
        logger.error("Missing cache or textgencred",
                     extra={
                         'transform': self.name,
                         'data': json.dumps(anonargs, indent=4)
                     })
        raise Exception("Missing cache or textgencred")

    self.anonymizer = CachingAnonymizer(cachepath=anonargs['cache'],
                                        textgen_cred=anonargs['textgen_cred'])


    if 'targets' not in anonargs:
        logger.error("Missing targets for anonymization",
                     extra={
                         'transform': self.name,
                         'data': json.dumps(anonargs, indent=4)
                     })
        raise Exception("Missing anonymization targets")

    # => Clean the targets and store them after including the spec
    defaultspec = anonargs.get('spec', {
        'config': {
            'sample': 1000,
            'nontransformed': 'drop'
        }
    })

    targets = anonargs['targets']
    cleaned_targets = []
    for t in targets:
        if isinstance(t, str):
            t = {
                'name': t,
            }
        if 'name' not in t:
            logger.warning("Anonymization target skipped",
                           extra={
                               'transform': self.name,
                               'data': json.dumps(t)
                           })
            continue

        if 'spec' not in t:
            t['spec'] = copy.copy(defaultspec)

        cleaned_targets.append(t)

    anonargs['targets'] = cleaned_targets

    logger.debug("Anonymization targets",
                       extra={
                           'transform': self.name,
                           'data': json.dumps(anonargs)
                       })

    self.anonargs = anonargs
    self.anonymization_actions = []

BaseAnonymizer(textgen_cred, *args, **kwargs)

Bases: object

default

Source code in enrichsdk/contrib/lib/assets/anonymizer.py
def __init__(self, textgen_cred, *args, **kwargs):
    """
    default
    """
    # set vars
    self.textgen_cred = textgen_cred

    # what are our possible column classes
    self.column_classes = {
        'person': 'person-name',
        'country': 'country_2char',
        'address': 'location-address',
        'phone': 'phone_number',
        'mobile': 'phone_number',
        'email': 'email',
        'identifier': 'categorical',
        'number': 'numeric',
        'latlong': 'numeric',   # need to write a custom anonymizer
        'datetime': 'numeric',  # need to write a custom anonymizer
        'location': 'categorical',
    }

    # init a Faker object
    self.fakeObj  = self.init_faker_object()

    super().__init__(*args, **kwargs)
anon_categorical(df, col_name, column)

Method to anonymize categorical data. Various anonymization methods can be defined here. Input is the full dataframe, output is the relavant column being anonymized.

Source code in enrichsdk/contrib/lib/assets/anonymizer.py
def anon_categorical(self, df, col_name, column):
    """
    Method to anonymize categorical data. Various anonymization methods can be defined here.
    Input is the full dataframe, output is the relavant column being anonymized.
    """
    msg = ""

    def generate_hash(txt):
        if pd.isnull(txt):
            return np.nan

        if not isinstance(txt, str):
            txt = str(txt)

        hashed = hashlib.md5(txt.encode('utf-8')).hexdigest()
        hashed = hashed[:-10]
        return hashed

    method      = column.get("method", "hash")
    params      = column.get("params", {})

    if method == "hash":
        val = df.apply(lambda x: generate_hash(x[col_name]), axis=1)
    else:
        msg = f"Unknown method to anon CATEGORICAL column, setting default NaNs."
        val = np.nan

    return val, msg
anon_email(df, col_name, column)

Method to anonymize email data. Can generate emails to match or not match data in some name field. Also respects original email domain distribution if required. Input is the full dataframe, output is the relavant column being anonymized.

Source code in enrichsdk/contrib/lib/assets/anonymizer.py
def anon_email(self, df, col_name, column):
    """
    Method to anonymize email data. Can generate emails to match or not match
    data in some name field. Also respects original email domain distribution if required.
    Input is the full dataframe, output is the relavant column being anonymized.
    """
    msg = ""

    match_names = column.get("match_names", True)
    if match_names is True:
        if "name_field" not in column:
            msg = f"Unknown name field to match emails, setting random emails."
            match_names = False
            return np.nan, msg
        else:
            if column["name_field"] not in data.columns:
                msg = f"name field not in dataframe, setting random emails."
                match_names = False
                return np.nan, msg

    def generate_email(fakeObj, row, col_name, column, match_names):

        email     = row[email_col_name]
        if pd.isnull(email):
            return np.nan

        # whitelist of email domains
        # if the origninal email is in this list, don't replace it
        # useful to maintain data distribution
        domain_whitelist = ['gmail.com',
                            'yahoo.com',
                            'hotmail.com',
                            'aol.com']

        email_col_name  = col_name
        orig_domain     = row[email_col_name].split('@')[1]

        # set the email domain first
        if column.get("dist", "yes") == "yes":
            # we need to ensure that the distribution of generated email domains
            # match what was present in the input
            # popular free email domains will carry over, while others will be
            # replaced with random domains while still retaining distribution
            if any([d==orig_domain for d in domain_whitelist]):
                # retain the origninal domain name
                domain = orig_domain
            else:
                # get a new domain name
                domain = fakeObj['generators']['email_domain'][orig_domain]
        else:
            # no need to match distribution of generated email domains
            domain = fakeObj['faker'].ascii_email().split('@')[1]

        if match_names is True:
            # we want to match the anon email with the name field
            name = row[column['name_field']]
            names = unidecode.unidecode(name).lower().split(' ')
        else:
            # we don't care about matching the anon email with the name field
            names = fakeObj['faker'].name().split(' ')

        firstname = names[0]
        lastname = names[-1]

        # possible variations of email
        nameparts = {
            1: f"{firstname}",
            2: f"{lastname}",
            3: f"{firstname}.{lastname}",
            4: f"{firstname}.{firstname[0]}.{lastname}",
            5: f"{firstname}.{lastname[0]}.{lastname}",
            6: f"{firstname}.{firstname[0]}.{lastname[0]}",
            7: f"{firstname}.{random.randint(1,10000)}",
            8: f"{firstname}_{random.randint(1,10000)}",
            9: f"{firstname}.{lastname}.{random.randint(1,10000)}",
        }
        choice = random.randint(1, len(nameparts))
        namepart = nameparts[choice]
        email = f"{namepart}@{domain}"

        return email

    val = df.apply(lambda x: generate_email(self.fakeObj, x, col_name, column, match_names), axis=1)

    return val, msg
anon_numeric(df, col_name, column)

Method to fuzz numeric data. Various fuzzing methods can be defined here. Input is the full dataframe, output is the relavant column being fuzzed.

Source code in enrichsdk/contrib/lib/assets/anonymizer.py
def anon_numeric(self, df, col_name, column):
    """
    Method to fuzz numeric data. Various fuzzing methods can be defined here.
    Input is the full dataframe, output is the relavant column being fuzzed.
    """
    msg = ""

    method      = column.get("method", "perturb")
    params      = column.get("params", {})

    val = df[col_name].fillna(0)

    if method == "perturb":
        range = params.get("range", 0.05)
        val += random.uniform(-range*val, range*val)
    else:
        msg = f"Unknown method to anon NUMERIC column, setting default NaNs."
        val = np.nan

    return val, msg
anonymize_dataset(df, spec={})

Anonymize a dataset given a spec. The spec defines how the dataset should be handled and what kinds of anonymization needs to be performed. If no spec is given, we infer one from the dataset.

Source code in enrichsdk/contrib/lib/assets/anonymizer.py
def anonymize_dataset(self, df, spec={}):
    """
    Anonymize a dataset given a spec.
    The spec defines how the dataset should be handled
    and what kinds of anonymization needs to be performed.
    If no spec is given, we infer one from the dataset.
    """

    name    = spec.get('name', 'dataframe-anonymizer')
    config  = spec.get('config', {})
    columns = config.get('columns', {})
    sample  = config.get('sample', -1)

    # if no columns are specified
    # we need to anonymize all available columns
    if len(columns) == 0:
        _columns = {}
        for c in df.columns:
            _columns[c] = {"include": "yes"}
        columns = _columns

    # Sanity check...
    for c, details in columns.items():
        if not isinstance(details, dict):
            raise Exception(f"column specification {c} should be a dictionary")

    # Sample if required
    if ((sample > 0) and (sample < len(df))):
        df = df.sample(sample)

    # we need to classify the columns
    # so we know what kind of anonymization to use
    columns = self.classify_columns_common(df, columns, spec)

    # prepare for anonymization
    anon_cols       = []
    missing_cols    = []
    error_cols      = []
    anon_actions    = {}
    dropped_cols    = []
    retained_cols   = []
    anon_df         = df.copy()

    # run through each column and try to anonymize
    for col_name, col_obj in columns.items():
        include = col_obj.get("include", "yes")
        if include == "yes":
            if col_name not in df.columns:
                # this column in the spec is missing in the DF, make note of it
                missing_cols.append(col_name)
            else:
                try:
                    # anonymize this single column
                    anon_df[col_name], msg = self.anonymize_single_column(col_name, col_obj, df)
                    # make note of the column name
                    anon_cols.append(col_name)
                    # make note of the anonymization action
                    anon_actions[col_name] = msg
                except Exception as e:
                    traceback.print_exc()
                    error_cols.append(col_name)

    # drop the other columns if required by spec
    nontransformed = config.get("nontransformed", "retain")
    if nontransformed == "drop":
        dropped_cols = [c for c in df.columns if c not in anon_cols]
        anon_df = anon_df[anon_cols]
    else:
        retained_cols = [c for c in df.columns if c not in anon_cols]

    # construct the result object
    anon_data = {
        "df": anon_df,
        "actions": anon_actions,
        "anon_cols": anon_cols,
        "missing_cols": missing_cols,
        "error_cols": error_cols,
        'dropped_cols': dropped_cols,
        'retained_cols': retained_cols,
    }

    success = True if len(anon_cols)>0 else False

    return success, anon_data
anonymize_single_column(col_name, col_obj, df, params={})

Takes a dataset and anonymizes the specified column

Source code in enrichsdk/contrib/lib/assets/anonymizer.py
def anonymize_single_column(self, col_name, col_obj, df, params={}):
    """
    Takes a dataset and anonymizes the specified column
    """

    # get the faker object
    fakeObj = self.fakeObj

    # setup handlers for the various anonymization types
    generators = {}
    # first for the lookup generators
    for g, lookup in fakeObj['generators'].items():
        generators[g] = {
            "type": "lookup",
            "handler": lookup
        }
    # then for the custom generators
    generators["numeric"] = {
        "type": "custom",
        "handler": "anon_numeric"
    }
    generators["categorical"] = {
        "type": "custom",
        "handler": "anon_categorical"
    }
    generators["email"] = {
        "type": "custom",
        "handler": "anon_email"
    }

    anon_type = col_obj.get('anon_type')

    _d = []
    if anon_type in generators:
        gen_type = generators[anon_type]['type']
        gen_handler = generators[anon_type]['handler']
        if gen_type == "lookup":
            # we call the apply only on the specific column
            data = df[col_name].apply(lambda x: gen_handler[x])
            l_msg = ""
        else:
            handler = getattr(self, gen_handler)
            # we call the apply to the full dataframe, we may need other columns
            # return is only the relevant column
            data, l_msg = handler(df, col_name, col_obj)
        msg = f"Anonymized using <{gen_handler}> handler of <{gen_type}> type." + l_msg
    else:
        data = np.nan
        msg = f"No <{anon_type}> generator found, defaulting to NaN."

    return data, msg

CachingAnonymizer(cachepath, *args, **kwargs)

Bases: BaseAnonymizer

Cache results of the classification

Source code in enrichsdk/contrib/lib/assets/anonymizer.py
def __init__(self, cachepath, *args, **kwargs):
    self.cachepath = cachepath
    if os.path.exists(cachepath):
        try:
            self.cache = pickle.load(open(cachepath, 'rb'))
        except:
            self.cache = {}
            logger.exception("Unable to load anonymizer cache",
                             extra={
                                 'data': cachepath
                             })

    else:
        self.cache = {}

    super().__init__(*args, **kwargs)
anonymize_dataset(df, spec={})

Filter out column that neednt be computed

Source code in enrichsdk/contrib/lib/assets/anonymizer.py
def anonymize_dataset(self, df, spec={}):
    """
    Filter out column that neednt be computed
    """

    spec['cache'] = self.cache

    # Run the
    success, anon_data = super().anonymize_dataset(df, spec)

    self.update_cache()

    return success, anon_data

changepoints

BaseChangePointDetectorModel(df, *args, **kwargs)

Bases: object

Base class for change point detection

default

Source code in enrichsdk/contrib/lib/assets/changepoints.py
def __init__(self, df, *args, **kwargs):
    """
    default
    """
    # setup defaults
    self.rpt_model      = 'rbf'
    self.rpt_penalty    = 1
    self.rpt_jump       = 1
    self.rpt_minsize    = 3
    self.rpt_default_hybrid_model = 'maxvote'

    # set the dataframe
    self.data = df

    # init the change points
    self.changepoints = None

    # init the visualization
    self.viz = None

    super().__init__(*args, **kwargs)
detect_changepoints(method='hybrid')

detect change points using the method specified

Source code in enrichsdk/contrib/lib/assets/changepoints.py
def detect_changepoints(self, method="hybrid"):
    """
    detect change points using the method specified
    """

    if method == "pelt":
        # use the Ruptures changepoint detection method
        self.changepoints = self.detect_changepoints_pelt()
    elif method == "hybrid":
        # use the Ruptures changepoint detection method
        self.changepoints = self.detect_changepoints_hybrid()
    else:
        # we have no other methods available
        pass

    return self.changepoints
detect_changepoints_hybrid(strategy=None, model=None, penalty=None, jump=None, min_size=None)

run the change point detector using the Hybrid method with one of the following strategies strict: all models should agree on the changepoints maxvote: a majority of models evaluated should agree on the changepoints (default strategy) anyone: all changepoints detected by any model are included select: the specified model is run

Source code in enrichsdk/contrib/lib/assets/changepoints.py
def detect_changepoints_hybrid(self,
                                strategy=None,
                                model=None,
                                penalty=None,
                                jump=None,
                                min_size=None
                                ):
    """
    run the change point detector using the Hybrid method with one of the following strategies
        strict: all models should agree on the changepoints
        maxvote: a majority of models evaluated should agree on the changepoints (default strategy)
        anyone: all changepoints detected by any model are included
        select: the specified model is run
    """

    # set the params
    if strategy == None:
        strategy = self.rpt_default_hybrid_model

    ## first, run the 3 models
    changepoints = {}
    for m in ['rbf', 'l2', 'l1']:
        changepoints[m] = self.detect_changepoints_pelt(model=m)

    ## then, run the hybrid strategy
    if strategy == 'strict':
        allpoints = []
        for cp in changepoints:
            allpoints.append(changepoints[cp])
        common_bkps = sorted(list(set.intersection(*map(set,allpoints))))
    elif strategy == 'maxvote':
        allpoints = defaultdict(int)
        for model, cps in changepoints.items():
            for cp in cps:
                allpoints[cp] += 1
        threshold = math.ceil(len(changepoints)/2)
        common_bkps = [cp for cp in allpoints if allpoints[cp]>=threshold]
    elif strategy == 'anyone':
        allpoints = []
        for model, cps in changepoints.items():
            allpoints += cps
        common_bkps = sorted(list(set(allpoints)))
    elif strategy == 'select':
        common_bkps = changepoints[model]
    else:
        # no strategy specified
        common_bkps = []

    common_bkps = sorted(common_bkps)

    self.changepoints = common_bkps

    return self.changepoints
detect_changepoints_pelt(model=None, penalty=None, jump=None, min_size=None)

run the change point detector using the Pelt method

Source code in enrichsdk/contrib/lib/assets/changepoints.py
def detect_changepoints_pelt(self,
                                model=None,
                                penalty=None,
                                jump=None,
                                min_size=None
                                ):
    """
    run the change point detector using the Pelt method
    """
    # get the data
    ts = self.data.values

    # set the params
    if model == None:
        model = self.rpt_model
    if penalty == None:
        penalty = self.rpt_penalty
    if jump == None:
        jump = self.rpt_jump
    if min_size == None:
        min_size = self.rpt_minsize

    # fit and find change points
    algo = rpt.Pelt(model=model, jump=jump, min_size=min_size).fit(ts)
    bkps = algo.predict(pen=penalty)

    self.changepoints = bkps

    return self.changepoints
get_changepoints()

return the detected changepoints if any

Source code in enrichsdk/contrib/lib/assets/changepoints.py
def get_changepoints(self):
    """
    return the detected changepoints if any
    """
    return self.changepoints
get_data()

return the data being used

Source code in enrichsdk/contrib/lib/assets/changepoints.py
def get_data(self):
    """
    return the data being used
    """
    return self.data

llmtextgen

LLMTextGenerator(cred, *args, **kwargs)

Bases: object

default

Source code in enrichsdk/contrib/lib/assets/llmtextgen.py
def __init__(self, cred, *args, **kwargs):
    """
    default
    """
    # vars
    self.available_models = {
        "completion": [
                "text-davinci-003",
                "text-curie-001",
                "text-babbage-001",
                "text-ada-001",
                "code-davinci-002",
                "code-cushman-001"
            ],
        "embedding": [
                "text-embedding-ada-002"
            ]
    }
    self.default_code_model = "code-davinci-002"
    self.default_text_model = "text-davinci-003"
    self.default_embedding_model = "text-embedding-ada-002"

    # setup defaults
    # use a valid model
    model = kwargs.pop('model', "")
    available_models = []
    for task, models in self.available_models.items():
        available_models += models
    if model not in available_models:
        model = self.default_text_model
    self.model = model

    # setup the api key
    self.cred = cred
    self.api_key = self.get_api_key(self.cred)

    super().__init__(*args, **kwargs)
call_completion_api(prompt, model)

Make a call to OpenAI API to get the text completion

Source code in enrichsdk/contrib/lib/assets/llmtextgen.py
def call_completion_api(self, prompt, model):
    """
    Make a call to OpenAI API to get the text completion
    """
    openai.api_key = self.api_key

    response = openai.Completion.create(
      model=model,
      prompt=prompt,
      temperature=0,
      max_tokens=round(len(prompt)*1.2),
      top_p=1.0,
      frequency_penalty=0.0,
      presence_penalty=0.0,
      stop=["#", ";"]
    )

    text = response.choices[0]['text']

    return text
call_embedding_api(prompt, model)

Make a call to OpenAI API to get the embedding

Source code in enrichsdk/contrib/lib/assets/llmtextgen.py
def call_embedding_api(self, prompt, model):
    """
    Make a call to OpenAI API to get the embedding
    """
    openai.api_key = self.api_key

    response = openai.Embedding.create(
      model=model,
      input=prompt
    )

    embedding = response['data'][0]['embedding']

    return embedding
generate_code(**kwargs)

generate a code completion given a prompt

Source code in enrichsdk/contrib/lib/assets/llmtextgen.py
def generate_code(self, **kwargs):
    """
    generate a code completion given a prompt
    """
    kwargs['model'] = self.default_code_model
    return self.generate_common(task='completion', **kwargs)
generate_common(task, **kwargs)

generate a completion given a prompt

Source code in enrichsdk/contrib/lib/assets/llmtextgen.py
def generate_common(self, task, **kwargs):
    """
    generate a completion given a prompt
    """
    # default, override if successful
    result = {"success": False}

    # check if we have all the required params
    if self.api_key == None:
        result['msg'] = "No API key available"
        return result

    prompt = kwargs.get('prompt')
    if prompt == None:
        result['msg'] = "No valid input data specified"
        return result

    model = kwargs.get('model')
    if model == None:
        result['msg'] = "No valid model specified"
        return result
    else:
        success = self.set_model(task, model)
        if success:
            model = self.get_model()
        else:
            result['msg'] = "No valid model specified"
            return result

    # call OpenAI API
    try:
        if task == 'completion':
            result['text'] = self.call_completion_api(prompt, model)
            result['success'] = True
        elif task == 'embedding':
            result['embedding'] = self.call_embedding_api(prompt, model)
            result['success'] = True
        else:
            result['msg'] = "Unknown task specified, cannot proceed"
    except:
        result['msg'] = "Something went wrong when calling the API"

    return result
generate_embedding(**kwargs)

generate an embedding vector given some text

Source code in enrichsdk/contrib/lib/assets/llmtextgen.py
def generate_embedding(self, **kwargs):
    """
    generate an embedding vector given some text
    """
    kwargs['model'] = self.default_embedding_model
    return self.generate_common(task='embedding', **kwargs)
generate_text(**kwargs)

generate a text completion given a prompt

Source code in enrichsdk/contrib/lib/assets/llmtextgen.py
def generate_text(self, **kwargs):
    """
    generate a text completion given a prompt
    """
    kwargs['model'] = self.default_text_model
    return self.generate_common(task='completion', **kwargs)
get_api_key(cred)

get the API key from the cred

Source code in enrichsdk/contrib/lib/assets/llmtextgen.py
def get_api_key(self, cred):
    """
    get the API key from the cred
    """
    api_key = None
    if isinstance(cred, str):
        api_key = cred
    if isinstance(cred, dict) and 'apikey' in cred:
        api_key = cred['apikey']
    return api_key
get_model()

get the model to use for text completion

Source code in enrichsdk/contrib/lib/assets/llmtextgen.py
def get_model(self):
    """
    get the model to use for text completion
    """
    return self.model
set_model(task, model)

set the model to use for text completion

Source code in enrichsdk/contrib/lib/assets/llmtextgen.py
def set_model(self, task, model):
    """
    set the model to use for text completion
    """
    available_models = self.available_models.get(task, [])
    if model not in available_models:
        return False

    self.model = model
    return True

profilespec

get_profile_from_api(clsobj, spec_category)

Read the profile json from API

Source code in enrichsdk/contrib/lib/assets/profilespec.py
def get_profile_from_api(clsobj, spec_category):
    """
    Read the profile json from API
    """

    msg = ""

    if (not hasattr(clsobj, "args")):
        raise Exception(
            "'args' transform attribute should be defined"
        )
    for p in ['apicred']:
        if clsobj.args.get(p) == None:
            raise Exception(
                f"'{p}' attribute in args should be defined"
                )

    # call the API to get the anomaly specs
    msg += f"Loading profile from API" + "\n"
    specs, is_valid, l_msg = load_profile_api(clsobj.args, spec_category)
    msg += l_msg

    if specs != None:
        specs = specs["specs"]
        msg += f"Found {len(specs)} policies for spec: {spec_category}" + "\n"

    return specs, is_valid, msg

get_profile_from_file(clsobj)

Read the profile json from profilespec

Source code in enrichsdk/contrib/lib/assets/profilespec.py
def get_profile_from_file(clsobj):
    """
    Read the profile json from profilespec
    """

    is_valid = False
    msg = ""

    if (not hasattr(clsobj, "profiledir")) and (not hasattr(clsobj, "profilefile")):
        raise Exception(
            "'profiledir' transform attribute should be defined to use default get_profile method"
        )

    paths = []
    if hasattr(clsobj, "profilefile"):
        paths.append(self.profilefile)

    if hasattr(clsobj, "profiledir"):
        paths.extend(
            [
                clsobj.profiledir + "/profile.json",
                clsobj.profiledir + "/profile.yaml",
                clsobj.profiledir + "/profilespec.json",
                clsobj.profiledir + "/profilespec.yaml",
            ]
        )

    profile = None
    for p in paths:
        if not os.path.exists(p):
            continue
        if p.endswith(".json"):
            profile = json.load(open(p))
        elif p.endswith(".yaml"):
            profile = yaml.load(open(p))

    if profile is None:
        raise Exception("Profile could not be found")

    specs = profile.get("specs")
    if specs != None:
        is_valid = True
        msg += f"Found {len(specs)} specs to work on" + "\n"
    else:
        msg += f"No specs to work on" + "\n"

    return specs, is_valid, msg