Bases: object
Class to do querying of a dataframe using LLMs
init the dataframe LLM query agent
name: name of the agent
cred: credentials object
mode: type of input dataset
'csv': data will be available as path to a csv
'sqlite': data will be available as path to a sqlite db
'df': data will be available as pandas dataframe
data: pointer to data, see mode param
cascade: list of LLM backends to use
the list will be tried in order for each query
on failure, the next LLM in the list will be tried
debug: if True, returns a bunch of useful information for debugging
Source code in llmsdk/agents/datagpt.py
| def __init__(self,
name,
cred,
mode,
data,
cascade,
debug=False):
"""
init the dataframe LLM query agent
name: name of the agent
cred: credentials object
mode: type of input dataset
'csv': data will be available as path to a csv
'sqlite': data will be available as path to a sqlite db
'df': data will be available as pandas dataframe
data: pointer to data, see mode param
cascade: list of LLM backends to use
the list will be tried in order for each query
on failure, the next LLM in the list will be tried
debug: if True, returns a bunch of useful information for debugging
"""
start_time = time.time()
# logging
self.logger = get_logger()
# defaults
self.max_llm_tokens = 1024 # max tokens in the response
self.mode = mode
self.data = data
self.debug = debug
# name
self.agent_name = name
# creds
self.cred = cred
# LLM params
self.cascade = cascade
# init the llm objects
self.llms = self._get_llm_objs(cascade=self.cascade, cred=self.cred)
# init the agents
self.agents = self._get_agent(mode=self.mode, data=self.data)
# note metadata for this agent
self.metadata = {
"agent": {
"name": self.agent_name,
"cascade": self.cascade,
"mode": self.mode,
"data": f"dataframe of shape {self.data.shape}" if self.mode=='df' else self.data,
},
"events": []
}
# log that the agent is ready
duration = time.time() - start_time
event = self._log_event(agent_events._EVNT_READY, duration)
|
return metadata collected by the agent
Source code in llmsdk/agents/datagpt.py
| def get_metadata(self):
"""
return metadata collected by the agent
"""
return self.metadata
|
mux_query_one_try(query, agent_id, agent)
do one try of the query through a specified agent in the cascade chain
Source code in llmsdk/agents/datagpt.py
| def mux_query_one_try(self, query, agent_id, agent):
"""
do one try of the query through a specified agent in the cascade chain
"""
# get the llm backend
agent_exec = agent['agent']
try:
# redirect stdout so we can capture the agent's chain-of-thought
# this is a hack until we figure out how to get chain-of-thought
# directly from LangChain
f = io.StringIO()
with redirect_stdout(f):
result = agent_exec(query)
thought = f.getvalue()
# check if LLM could not find solution
if result['output'].lower() == "i don't know.":
raise Exception("Force exception")
except:
# something went wrong when trying the query
result = { "input": query, "output": self._err_msg('err')}
thought = ""
success = False
else:
success = True
result['cascade'] = { "id": agent_id }
for key in ['platform', 'model']:
result['cascade'][key] = agent[key]
return success, result, thought
|
query(query, cascade_id=None)
run a query on the dataframe
query: query string
cascade_id: id of the model to use in the cascade list specified during agent init
Source code in llmsdk/agents/datagpt.py
| def query(self, query, cascade_id=None):
"""
run a query on the dataframe
query: query string
cascade_id: id of the model to use in the cascade list specified during agent init
"""
start_time = time.time()
seq = 0
tries = []
if cascade_id == None:
# we have not specified an agent id
# in the cascade list so try them all
for _id, agent in self.agents.items():
# try one agent in the cascade
success, result, thought = self.mux_query_one_try(query, _id, agent)
tries.append({"seq": seq, "cascade_id": _id, "success": success})
seq += 1
if success:
break
else:
# we have specified an agent id
# in the cascade list so try only that
agent = self.agents.get(cascade_id)
if not agent:
raise Exception(f"Unknown agent ID {cascade_id}, not specified in cascade")
# try the requested agent in the cascade
success, result, thought = self.mux_query_one_try(query, cascade_id, agent)
tries.append({"seq": seq, "cascade_id": cascade_id, "success": success})
result['success'] = success
result['tries'] = tries
# format the result keys
if result.get('input'):
result['query'] = result.pop('input')
if result.get('output'):
result['answer'] = result.pop('output')
result['intermediate_steps'] = [] if 'intermediate_steps' not in result else result['intermediate_steps']
# check for result type
is_json = self._is_json(result['answer'])
if is_json:
result['answer'] = json.loads(result['answer'])
result['type'] = 'json'
else:
result['type'] = 'str'
# get the chain of thought
# strip ANSI control sequences first
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
thought = ansi_escape.sub('', thought)
# get line-by-line chain of thought from stdout
result['raw_thoughts'] = thought.split("\n")
# convert intermediate steps to chain of thought
# this is more stable
result['chain_of_thought'] = self._get_chain_of_thought(result)
# get the code snippets from the chain of thought
# result['code'] = self._parse_code_from_rawthought(result['raw_thoughts'])
result['code'] = self._parse_code_from_thought(result['chain_of_thought'])
# log the event
params = {
"query": query,
"cascade_id": cascade_id,
"result": result.copy() if isinstance(result, dict) else result
}
duration = time.time() - start_time
event = self._log_event(agent_events._EVNT_QUERY, duration, params=params)
# decide on what fields to return
# we do it after the logging so that logs have everything
if not self.debug:
r = result.pop('raw_thoughts')
r = result.pop('intermediate_steps')
return result
|