Skip to content

Common

Leverage ScribbleData's Library Services for efficient data management. Explore comprehensive documentation to optimize your data handling and processing workflows

get_agent_details(namespace, username, subdir, exts, get_task_specific_agent, force=False, extra=None) async

Check agent cache and return if it exists. Else instantiate an agent. Updates the statisics as well

Parameters:

Name Type Description Default
namespace

Enable instantiating the agent multiple times using a namespace string

required
subdir

Subdirectory within the DATA_ROOT environment variable

required
username

Name of the user. Each user has a separate instance

required
exts

List of extensions

required
get_task_specific_agent

Callback to instantiate an agent

required

Returns:

Name Type Description
status dict

Return a diction with agent instance, path of vector index on disk and name of the index

Source code in llmsdk/services/lib.py
async def get_agent_details(namespace,
                            username,
                            subdir,
                            exts,
                            get_task_specific_agent,
                            force=False,
                            extra=None):
    """
    Check agent cache and return if it exists. Else instantiate an agent. Updates the statisics as well

    Parameters
    ----------
    namespace: str
               Enable instantiating the agent multiple times using a namespace string
    subdir: str
            Subdirectory within the DATA_ROOT environment variable
    username: str
              Name of the user. Each user has a separate instance
    exts: list
           List of extensions
    get_task_specific_agent: callable
           Callback to instantiate an agent

    Returns
    -------
    status: dict
            Return a diction with agent instance, path of vector index on disk and name of the index
    """

    label = f"{username}_{subdir}"
    now = datetime.now().replace(microsecond=0).isoformat()

    run_root = os.environ['RUNDIR']
    dir_root = os.environ['DATA_ROOT']
    fullpath = os.path.join(dir_root, namespace, subdir)
    cred = {}

    # Add a check here to see if something has changed...
    reindex = os.path.join(fullpath, '.reindex')
    print("Reindex check", reindex)
    if os.path.exists(reindex):
        print("Document set changed. So indexing again")
        force = True

    if ((label in agents) and (not force)):
        logger.debug(f"[{namespace}] Returning existing instance",
                     extra={
                         'source': 'service',
                         'user': username,
                         'dataset': subdir
                     })
        return agents[label]

    try:

        if ((label in stats['datasets']) and (not force)):
            logger.debug(f"Return existing agent",
                         extra={
                             'source': 'service',
                             'user': username,
                             'dataset': subdir,
                             'data': json.dumps(stats['datasets'][label],
                                                indent=4,
                                                cls=SafeEncoder)
                         })
        else:
            logger.debug(f"Creating agent (Force: {force})",
                         extra={
                             'source': 'service',
                             'user': username,
                             'dataset': subdir,
                             'data': fullpath
                         })

        if not os.path.isdir(fullpath):
            logger.debug(f"Could not find data",
                         extra={
                             'source': 'service',
                             'user': username,
                             'dataset': subdir,
                             'data': fullpath
                         })
            return None

        # Compute the sha1
        sha1 = get_dir_sha1(fullpath, extra)

        stats['datasets'][label] = {
            'sha1': sha1,
            'loaded': now,
            'username': username,
            'subdir': subdir,
            'fullpath': fullpath,
            'agent_created': False,
            'agent_status': "initialization",
            "query_count": 0,
            "query_success": 0,
            "query_failure": 0,
        }

        filestats, files = get_files(fullpath, exts)

        logger.debug(f"Dataset details",
                     extra={
                         'source': 'service',
                         'user': username,
                         'dataset': subdir,
                         'data': json.dumps({
                             "filestats": filestats,
                             "files": files
                             }, indent=4)
                         })

        if filestats['total'] == 0:
            logger.warning(f"No data found",
                           extra={
                               'source': 'service',
                               'user': username,
                               'dataset': subdir,
                        })

            stats['datasets'][label]['agent_status'] = f"Error! Files not found"
            return None

        # Include the metadata
        metadatapath = os.path.join(fullpath, 'metadata.json')
        print("metadatapath", metadatapath)

        metadata = {}
        if os.path.exists(metadatapath):
            try:
                metadata = json.load(open(metadatapath))
            except:
                logger.exception(f"Failed to read metadata",
                                 extra={
                                     'source': 'service',
                                     'user': username,
                                     'dataset': subdir,
                                     'data': metadatapath
                                 })


        stats['datasets'][label].update({
            'filestats': filestats,
            "files": files,
            "metadata": metadata
        })


        agentdetails = await get_task_specific_agent(namespace,
                                                     username,
                                                     subdir,
                                                     fullpath,
                                                     files,
                                                     metadata,
                                                     drop_index=force)

        if agentdetails is None:
            raise Exception("Task specific agent returned invalid details")


        stats['datasets'][label].update({
            'agent_created': True,
            "agent_status": "Created"
        })

        agents[label] = {
            'sha1': sha1,
            'created': now,
            "metadata": metadata
        }

        agents[label].update(agentdetails)

        stats['datasets'][label]['agent_status'] = f"Ready!"
        logger.debug(f"Agent is ready!",
                       extra={
                           "data": {
                               'source': 'service',
                               'user': username,
                               'dataset': subdir,
                               'data': metadatapath
                           }
                       })

        if os.path.exists(reindex):
            os.remove(reindex)

    except Exception as e:
        logger.exception(f"Failed to build agent",
                       extra={
                           "data": {
                               'source': 'service',
                               'user': username,
                               'dataset': subdir,
                           }
                       })

        stats['datasets'][label]['agent_created'] = False
        stats['datasets'][label]['agent_status'] = f"Error! {e}"
        return None

    return agents[label]

get_generic_agent(agentcls, dataset, fullpath, files, metadata, drop_index=False) async

Instantiates the agent.

Parameters:

Name Type Description Default
agentcls

Agent class

required
dataset

Name of the dataset

required
fullpath

Root directory on the filesystem of dataset

required
files

Files within the directory

required

Returns:

Name Type Description
status dict

Return a diction with agent instance, path of vector index on disk and name of the index

Source code in llmsdk/services/lib.py
async def get_generic_agent(agentcls,
                            dataset,
                            fullpath,
                            files,
                            metadata,
                            drop_index=False):
    """
    Instantiates the agent.

    Parameters
    ----------
    agentcls: class
              Agent class
    dataset: str
             Name of the dataset
    fullpath: str
              Root directory on the filesystem of dataset
    files: list
           Files within the directory

    Returns
    -------
    status: dict
            Return a diction with agent instance, path of vector index on disk and name of the index
    """

    logger.debug("Using Generic Agent")

    name = os.environ.get('AGENTNAME',"default")
    index_name=slugify(f"{dataset}_index")
    index_path = os.path.expandvars(f"$RUNDIR/$AGENTNAME/index/{dataset}/chromadb_index")
    stats = defaultdict(int)

    agent = agentcls(name=name, cred={})

    try:
        if os.path.exists(index_path) and not drop_index:
            print(f"Loading existing index {name}")
            agent.load_index(store="chroma",
                             persist_directory=index_path,
                             index_name=index_name)
            logger.debug(f"Loaded index: {name}",
                         extra={
                             "data": json.dumps({
                                 'source': 'service',
                                 'user': 'all',
                                 'dataset': dataset,
                                 'stats': agent.get_index_stats()
                             }, indent=4, cls=SafeEncoder)
                         })

            return {
                'agent': agent,
                'index_path': index_path,
                'index_name': index_name
            }

    except:
        logger.exception(f"Failed to load index: {name}")

    print(f"Building new index {name}")
    logger.debug(f"Building a new index",
                 extra={
                     "data": json.dumps({
                         'source': 'service',
                         'user': 'all',
                         'dataset': dataset,
                         'data': index_path
                     },  indent=4, cls=SafeEncoder)
                 })

    first = True
    for ext, extfiles in files.items():

        params = {}
        if ext == "pdf":
            params['pdfloader'] = "pymupdf"

        for f in extfiles:

            # Load data...
            data = None
            try:
                if ext == "pdf":
                    data = agent.load_data(source='pdf',
                                           content=f,
                                           params={
                                               "pdfloader": "pymupdf"
                                           })
                    stats[f'{ext}-mupdf'] +=1
                elif ext in ["doc", "docx"]:
                    data = agent.load_data(source='docx', content=f,)
                    stats[f'{ext}-default'] +=1
                else:
                    data = agent.load_data(source='dir',
                                           content=os.path.dirname(f),
                                           params={
                                               "glob": os.path.basename(f)
                                           })
                    stats[f'{ext}-dirload'] +=1
            except:
                traceback.print_exc()
                stats[f'{ext}-error'] += 1

            if data is None:
                continue

            if first:
                print(f"Creating new index: {name}")
                agent.create_add_index(data=data,
                                       store="chroma",
                                       persist_directory=index_path,
                                       index_name=index_name)
                first = False
            else:
                agent.add_to_index(data=data)

    print(f"Loaded agent {name}")
    logger.debug(f"Loaded agent",
                 extra={
                     "data": json.dumps({
                         'source': 'service',
                         'user': 'all',
                         'dataset': dataset,
                         'stats': agent.get_index_stats()
                     }, indent=4, cls=SafeEncoder)
                 })

    return {
        'agent': agent,
        'index_path': index_path,
        'index_name': index_name
    }