Skip to content

DataGPT

DataGPT gives a controlled text-to-sql interface and works in conjunction with feature engineering pipelines

LLMDataQuerier(name, cred, mode, data, cascade, debug=False)

Bases: object

Class to do querying of a dataframe using LLMs

init the dataframe LLM query agent name: name of the agent cred: credentials object mode: type of input dataset 'csv': data will be available as path to a csv 'sqlite': data will be available as path to a sqlite db 'df': data will be available as pandas dataframe data: pointer to data, see mode param cascade: list of LLM backends to use the list will be tried in order for each query on failure, the next LLM in the list will be tried debug: if True, returns a bunch of useful information for debugging

Source code in llmsdk/agents/datagpt.py
def __init__(self,
             name,
             cred,
             mode,
             data,
             cascade,
             debug=False):
    """
    init the dataframe LLM query agent
    name: name of the agent
    cred: credentials object
    mode: type of input dataset
            'csv': data will be available as path to a csv
            'sqlite': data will be available as path to a sqlite db
            'df': data will be available as pandas dataframe
    data: pointer to data, see mode param
    cascade: list of LLM backends to use
                the list will be tried in order for each query
                on failure, the next LLM in the list will be tried
    debug: if True, returns a bunch of useful information for debugging
    """
    start_time = time.time()

    # logging
    self.logger = get_logger()

    # defaults
    self.max_llm_tokens = 1024 # max tokens in the response
    self.mode = mode
    self.data = data
    self.debug = debug

    # name
    self.agent_name = name

    # creds
    self.cred = cred
    # LLM params
    self.cascade = cascade

    # init the llm objects
    self.llms = self._get_llm_objs(cascade=self.cascade, cred=self.cred)

    # init the agents
    self.agents = self._get_agent(mode=self.mode, data=self.data)

    # note metadata for this agent
    self.metadata = {
        "agent": {
            "name": self.agent_name,
            "cascade": self.cascade,
            "mode": self.mode,
            "data": f"dataframe of shape {self.data.shape}" if self.mode=='df' else self.data,
        },
        "events": []
    }
    # log that the agent is ready
    duration = time.time() - start_time
    event = self._log_event(agent_events._EVNT_READY, duration)

get_metadata()

return metadata collected by the agent

Source code in llmsdk/agents/datagpt.py
def get_metadata(self):
    """
    return metadata collected by the agent
    """
    return self.metadata

mux_query_one_try(query, agent_id, agent)

do one try of the query through a specified agent in the cascade chain

Source code in llmsdk/agents/datagpt.py
def mux_query_one_try(self, query, agent_id, agent):
    """
    do one try of the query through a specified agent in the cascade chain
    """
    # get the llm backend
    agent_exec = agent['agent']

    try:
        # redirect stdout so we can capture the agent's chain-of-thought
        # this is a hack until we figure out how to get chain-of-thought
        # directly from LangChain
        f = io.StringIO()
        with redirect_stdout(f):
            result = agent_exec(query)
        thought = f.getvalue()
        # check if LLM could not find solution
        if result['output'].lower() == "i don't know.":
            raise Exception("Force exception")
    except:
        # something went wrong when trying the query
        result = { "input": query, "output": self._err_msg('err')}
        thought = ""
        success = False
    else:
        success = True
        result['cascade'] = { "id": agent_id }
        for key in ['platform', 'model']:
            result['cascade'][key] = agent[key]


    return success, result, thought

query(query, cascade_id=None)

run a query on the dataframe query: query string cascade_id: id of the model to use in the cascade list specified during agent init

Source code in llmsdk/agents/datagpt.py
def query(self, query, cascade_id=None):
    """
    run a query on the dataframe
    query: query string
    cascade_id: id of the model to use in the cascade list specified during agent init
    """
    start_time = time.time()

    seq = 0
    tries = []
    if cascade_id == None:
        # we have not specified an agent id
        # in the cascade list so try them all
        for _id, agent in self.agents.items():
            # try one agent in the cascade
            success, result, thought = self.mux_query_one_try(query, _id, agent)
            tries.append({"seq": seq, "cascade_id": _id, "success": success})
            seq += 1
            if success:
                break
    else:
        # we have specified an agent id
        # in the cascade list so try only that
        agent = self.agents.get(cascade_id)
        if not agent:
            raise Exception(f"Unknown agent ID {cascade_id}, not specified in cascade")

        # try the requested agent in the cascade
        success, result, thought = self.mux_query_one_try(query, cascade_id, agent)
        tries.append({"seq": seq, "cascade_id": cascade_id, "success": success})

    result['success'] = success
    result['tries'] = tries

    # format the result keys
    if result.get('input'):
        result['query'] = result.pop('input')
    if result.get('output'):
        result['answer'] = result.pop('output')
    result['intermediate_steps'] = [] if 'intermediate_steps' not in result else result['intermediate_steps']

    # check for result type
    is_json = self._is_json(result['answer'])
    if is_json:
        result['answer'] = json.loads(result['answer'])
        result['type'] = 'json'
    else:
        result['type'] = 'str'

    # get the chain of thought
    # strip ANSI control sequences first
    ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
    thought = ansi_escape.sub('', thought)
    # get line-by-line chain of thought from stdout
    result['raw_thoughts'] = thought.split("\n")

    # convert intermediate steps to chain of thought
    # this is more stable
    result['chain_of_thought'] = self._get_chain_of_thought(result)

    # get the code snippets from the chain of thought
    # result['code'] = self._parse_code_from_rawthought(result['raw_thoughts'])
    result['code'] = self._parse_code_from_thought(result['chain_of_thought'])

    # log the event
    params = {
        "query": query,
        "cascade_id": cascade_id,
        "result": result.copy() if isinstance(result, dict) else result
    }
    duration = time.time() - start_time
    event = self._log_event(agent_events._EVNT_QUERY, duration, params=params)

    # decide on what fields to return
    # we do it after the logging so that logs have everything
    if not self.debug:
        r = result.pop('raw_thoughts')
        r = result.pop('intermediate_steps')

    return result