Skip to content


Agent to support specification-based extraction of content from documents. This could include dates, clauses, snippets with complex conditionality, retries using alternatives, and multi-class specifications.

LLMQuerierExtractor(name, cred={}, platform='openai', model='gpt-4o-mini', searchapi='serpapi', statestore='redis', topk=7)

Bases: BaseLLMAgent

Class to do querying of a docset and extracting specific information fields using LLMs Query can be run against a specified set of documents that act as context to constrain the answers

init the LLM query agent name: name of the agent cred: credentials object platform: name of the LLM platform backend to use default to OpenAI GPT platform for now, Azure is also supported will be extended in the future to suuport other models memory_size: how many tokens of memory to use when chatting with the LLM

Source code in llmsdk/agents/
def __init__(self,
    init the LLM query agent
    name: name of the agent
    cred: credentials object
    platform: name of the LLM platform backend to use
            default to OpenAI GPT platform for now, Azure is also supported
            will be extended in the future to suuport other models
    memory_size: how many tokens of memory to use when chatting with the LLM

    start_time = time.time()

    # init the base class

    # defaults
    self.chunk_size = 1000
    self.chunk_overlap = 300
    self.index = None
    self.metadata = {}
    self.vdb_client = None
    self.index_name = None
    self.index_store = None
    self.topk = topk
    self.doc_signatures = [] = {}

    # LLM params
    self.platform = platform
    self.chaintype = "stuff"
    self.searchapi = searchapi

    # init the llm and embeddings objects
    self.llm, self.embeddings = self._get_llm_objs(platform=self.platform,

    # init the QnA chain for internal queries
    prompt = self._get_query_prompt_internal()
    self.llm_chain_int = load_qa_chain(llm=self.llm,
    # init the agent for searches
    self.llm_agent_srch, self.searchengine = self._load_search_agent(cred=self.cred,
    # note metadata for this agent
    self.metadata = {
        "agent": {
            "name": self.agent_name,
            "type": self.agent_type,
            "platform": self.platform,
            "chaintype": self.chaintype,
        "events": []
    # log that the agent is ready
    duration = time.time() - start_time
    event = self._log_event(agent_events._EVNT_READY, duration)


Convert extracts object into dataframe

Source code in llmsdk/agents/
def extracts_to_df(self, info):
    Convert extracts object into dataframe
    extracts = info['extracts']
    entries = []

    for col, details in extracts.items():
        entry = {
            "field": col,
            "answer": details['answer'],
            "grounding": json.dumps(details['sources']),

    df = pd.DataFrame(entries)

    return df


process a profilespec

Source code in llmsdk/agents/
def process_spec(self, spec):
    process a profilespec
    name = spec.get("name")
    self.logger.debug(f"Processing spec: {name}",
                             'source': self.agent_name,
                             'data': json.dumps(spec, indent=4, cls=SafeEncoder)

    # check for signatures
    signatures = self.process_spec_signatures(spec)

    # process the questions
    extracts = self.process_spec_queries(spec)

    result = {
        "spec": name,
        "timestamp": time.time(),
        "extracts": extracts,
        "signatures": signatures

    return result


take a spec containing questions and answer them against the docset indexed by the agent

Source code in llmsdk/agents/
def process_spec_queries(self, spec):
    take a spec containing questions and answer them
    against the docset indexed by the agent

    # retry answers
    retry_answers = ["im having trouble understanding try another way of wording your query",

    # get the list of queries
    query_set = spec.get("query_set", [])
    indx_query_set = {}

    # begin an empty dict
    extracted_info = {}
    grounding = {}

    # foreach query to process
    for one_query in query_set:

        enable = one_query.get("enable", True)
        q_name = one_query['name']
        query = one_query.get("query")
        query_mod = one_query.get("query_mod")
        query_alts = one_query.get("query_alts", [])
        use_alts = one_query.get("use_alts", "on-fail")
        postprocess = one_query.get("postprocess", {})
        fill_columns = one_query.get("fill_columns", [])
        query_mode = one_query.get("mode", "internal")
        instructions = one_query.get("instructions", "")

        # post-processing handling
        pp_handler = postprocess.get("handler")
        pp_response = postprocess.get("response", "fill")
        pp_othercols = postprocess.get("othercols", [])

        # make note of this one_query in the index
        # we'll need this later when re-ordering
        indx_query_set[q_name] = one_query

        self.logger.debug(f"Running query: {q_name}",
                                 'source': self.agent_name,
                                 'data': json.dumps(one_query, indent=4, cls=SafeEncoder)

        if not enable:

        if not query:

        if query_mod:
            # we have to modify our query before passing to the LLM
            cannot_modify = False
            params = {}
            for col in query_mod.get("inputs", []):
                if col not in extracted_info:
                    cannot_modify = True
                    params[col] = extracted_info[col]
            if cannot_modify:

            # modify the query using the params collected
            query = query.format(**params)

            # check if we need to modify alt queries also
            apply_to = query_mod.get("apply_to", "first")
            if apply_to == "all":
                query_alts = [q.format(**params) for q in query_alts]

        # collect all the queries we need to run
        queries = [query] + query_alts

        # run the queries against the LLM
        for query in queries:

            # get the answer
            response = self.query(query, instructions=instructions, mode=query_mode)
            answer = response['answer']
            sources = response['sources']
            # normalized answer
            s_answer = answer.translate(str.maketrans('', '', string.punctuation)).lower().strip()

            # check the answer for UNKNOWN
            # but only in the case when postprocess->response==fill
            # this is for correct handling of spec extension/replacement
            if pp_response == 'fill':
                if s_answer in retry_answers:

            # post-process if needed
            # check if handler is callable
            if callable(pp_handler):
                # collect all the columns needed to post-process the answer
                params = {q_name: answer}
                for col in pp_othercols:
                    if col in extracted_info:
                        if isinstance(extracted_info[col], list):
                            val = extracted_info[col][0]
                            val = extracted_info[col]
                        params[col] = val
                answer = pp_handler(params)

            # if next step action is 'extend'
            # then, we need to extend the query spec
            if pp_response == "extend":
                # continue, so that we move to the next query

            # if next step action is 'replace'
            # then, we need to replace the query_set items in the default query spec
            # with the newly loaded one
            if pp_response == "replace":
                # get the names of all query items in replace spec
                replace_query_set_items = {}
                for q_item in answer:
                    replace_query_set_items[q_item['name']] = q_item
                # now, run through each query item in the default query set
                # and check if replacement is needed
                for i in range(len(query_set)):
                    if query_set[i]['name'] in replace_query_set_items:
                        # we have found a query item that needs to be replaced
                        query_set[i] = replace_query_set_items[query_set[i]['name']]
                        # remove the found query item from the replace set
                        d = replace_query_set_items.pop(query_set[i]['name'])

                # now, all the remaining query items need to be incluced
                for qr_name, q_item in replace_query_set_items.items():

                # continue, so that we move to the next query

            # add the answers to the columns in the extracted dataset
            if len(fill_columns) == 0:
            if len(fill_columns) > 1:
                i = 0
                for col in fill_columns:
                    ans_i = answer[i]
                    ans = extracted_info.get(col,[])
                    if answer in ans:
                        # we have found this answer before
                        # no need to collect it again
                    extracted_info[col] = ans
                    i += 1
                col = fill_columns[0] # have only one column to fill
                ans = extracted_info.get(col,[])
                if answer in ans:
                    # we have found this answer before
                    # no need to collect it again
                extracted_info[col] = ans

            # at this point, at least one alt query has response
            # collect the grounding elements
            for col in fill_columns:
                curr_sources = grounding.get(col, [])
                grounding[col] = curr_sources

            # check if we need to run other alts
            if use_alts == "on-fail":
                # no need to run an alt query
                # since we have atleast some response

    # get the order of fields to return
    reordered_query_set = []
    field_order = spec.get("order")
    if not field_order:
        # we don't have an explicit order provided
        # use the default ordering
        reordered_query_set = query_set
        for field in field_order:
            if field in indx_query_set:
                one_query = indx_query_set[field]

    # check if all columns exist
    # and add the collected grounding
    extracts = {}
    for one_query in reordered_query_set:
        # check if we need to inlcude this field
        include = one_query.get("include", True)
        if not include:

        # we need to include this field
        fill_columns = one_query.get("fill_columns", [])
        default = one_query.get("default")
        default = [] if default == None else [default]
        for col in fill_columns:
            answer = extracted_info.get(col, default)
            extracts[col] = {
                "n_answers": len(answer),
                "answer": answer,
                "sources": grounding.get(col, []),

    return extracts


check if signatures are present

Source code in llmsdk/agents/
def process_spec_signatures(self, spec):
    check if signatures are present
    self.logger.debug("Detecting signatures...",
                         extra={'source': self.agent_name})

    if spec.get("detect_signatures", False) == False:
        return None

    pages = []
    confidence = 0
    if len(self.doc_signatures) > 0:
        for signature in self.doc_signatures:
            confidence += signature['confidence']
        n_sigs = len(pages) # this is correct
        pages = list(set(pages))
        n_pages = len(pages)
        confidence = round(confidence/n_sigs, 2)

        comment = f"Detected {n_sigs} signatures across {n_pages} pages"

        signatures = {
            "found": True,
            "comment": comment,
            "n_signatures": n_sigs,
            "n_pages": n_pages,
            "pages": pages,
            "confidence": confidence,
        signatures = {
            "found": False,
            "comment": f"No signatures detected",

    return signatures

query(query, instructions='', mode='internal')

run a query on an index using an llm chain object query: query string mode: 'internal' for querying over docset, 'search' for searching the web

Source code in llmsdk/agents/
def query(self, query, instructions="", mode="internal"):
    run a query on an index using an llm chain object
    query: query string
    mode: 'internal' for querying over docset, 'search' for searching the web

    start_time = time.time()

    method = getattr(self, f"run_query_{mode}", None)
    if method is None:
        raise Exception(f"Unsupported mode: {mode}")

        if self.platform in ['openai', 'azure']:
            with get_openai_callback() as cb:
                result = method(query, instructions)
            stats = {
                "total_tokens": cb.total_tokens,
                "prompt_tokens": cb.prompt_tokens,
                "completion_tokens": cb.completion_tokens,
                "total_cost": round(cb.total_cost, 4)
            result = method(query, instructions)
            stats = {}
        result = {
            "question": query,
            "answer": self._err_msg('field'),
            "sources": [],
        stats = {}

    # log the event
    params = {
        "query": query,
        "mode": mode,
        "result": result.copy() if result is not None else None,
        "stats": stats,
    duration = time.time() - start_time
    event = self._log_event(agent_events._EVNT_QUERY, duration, params=params)

    # add the event to the result
    result['metadata'] = {
        "timestamp": event['timestamp'],
        "duration": event['duration'],

    return result

read_document(source, content, metadata={}, params={}, store='chroma', persist_directory=None)

wrapper function that takes in the path to a document and sets it up for reading by the agent this function will create a new index if the agent does not already have one else it will use the existing index pointer needs the persist_directory that the index will use

Source code in llmsdk/agents/
def read_document(self, source, content, metadata={}, params={}, store="chroma", persist_directory=None):
    wrapper function that takes in the path to a document
    and sets it up for reading by the agent
    this function will create a new index if the agent does not already have one
    else it will use the existing index pointer
    needs the persist_directory that the index will use
    # load the document
    data = self.load_data(source=source, content=content)

    # add the document to index
    if not self.index:
        # we have to init a new index
        # we can use the agent's index pointer

    # extract text from document if it is a pdf
    # so that we have the table data
    if source in ["pdf"]:

        # run through Textract
        extracted_data = extract_text_from_file(content, provider="aws")

        # take the Textract output
        # and add tables and linetext to index
        for block in ["tables", "text"]:
            for extract in extracted_data:
                # for each page in the document
                for entry in extract[block]:
                    # for each table in the page
                    if any(f not in entry for f in ['id', 'content']):
                    metadata = { "source": entry['id'] }
                    data = self.load_data(source="str",

        # add the signature details to the agent's knowledge
        for extract in extracted_data:
            signatures = extract["signatures"]


run_query_internal(query, instructions='')

run a query using llm on an internal docset indexed in index this is useful when looking for answers using a private source of data

Source code in llmsdk/agents/
def run_query_internal(self, query, instructions=""):
    run a query using llm on an internal docset indexed in index
    this is useful when looking for answers using a private source of data
    # get the similar docs
    docs = self.get_similar_docs(query, topk=self.topk)

    # setup the QnA chain object
    response = self.llm_chain_int({"input_documents":docs, "input":query, "instructions":instructions},

    # run the query against the similar docs
    result = {
        "question": query,
        "answer": response.get('output_text', self._err_msg('field')).strip(),
        "sources": [{"content": d.page_content, "metadata": d.metadata, "distance": d.metadata.pop('distance')} for d in docs],

    return result

run a query using the search agent this is useful when looking for answers using a search engine

Source code in llmsdk/agents/
def run_query_search(self, query, instructions=""):
    run a query using the search agent
    this is useful when looking for answers using a search engine
    def extract_content_sources(sourcedata):
        docs = sourcedata.get('organic_results')
        if not docs:
            return None
        sources = [{"content": d.get('snippet', ""), "source": d.get('link')} for d in docs]
        return sources

    # get the human-readable result
    result =

    # get the sources
    sourcedata = self.searchengine.results(query)
    sources = extract_content_sources(sourcedata)
    if not sources:
        sources = [{"content": "", "source": f"search-{self.searchapi}"}]

    # construct result
    result = {
        "question": query,
        "answer": result,
        "suggest": list(set([q.get('question', '') for q in sourcedata.get('related_questions', [])])),
        "sources": sources

    return result