Skip to content

Proxy

Proxy service to observe and learn from the prompts defined by the end-users.

Simple LLM Proxy that works with the front end. This derives from the QnA service module.

cache_health()

Summarize the content of the cache

Source code in llmsdk/services/proxy.py
@app.get("/cache")
def cache_health():
    """
    Summarize the content of the cache
    """

    cache = get_cache()
    summary = defaultdict(int)
    for k, v in cache.items():
        summary[v['status']] += 1

    return summary

health()

Return usage statistics

Returns:

Name Type Description
stats dict

Usage statistics

Source code in llmsdk/services/proxy.py
@app.get("/health")
def health():
    """
    Return usage statistics

    Returns
    -------
    stats: dict
           Usage statistics

    """
    logger.info("Returning stats",
                extra={
                    'data': json.dumps(stats, indent=4)
                })
    return stats

query_index(user, dataset, query, background_tasks, mode='internal', context='', namespace='default', policy={}, extra={}) async

Initiate a new request

Parameters:

Name Type Description Default
user str

User (an unintepreted string)

required
query str

A string

required
namespace

A namespace associated with this instance of the service. Multiple

'default'
context str

Any additional text

''
policy dict

List of policies to enforce

{}
extra dict

Pass any other information

{}

Returns:

Name Type Description
status dict

Dictionary with request id, query, status and data

Source code in llmsdk/services/proxy.py
@app.post("/qna")
async def query_index(user: str,
                      dataset: str,
                      query: str,
                      background_tasks: BackgroundTasks,
                      mode: str = 'internal',
                      context: str = "",
                      namespace="default",
                      policy: dict = {},
                      extra: dict = {}
):
    """
    Initiate a new request

    Parameters
    ----------
    user: str
          User (an unintepreted string)
    query: str
           A string
    namespace: str
           A namespace associated with this instance of the service. Multiple
    context: str
           Any additional text
    policy: dict
           List of policies to enforce
    extra: dict
           Pass any other information

    Returns
    -------
    status: dict
         Dictionary with request id, query, status and data
    """

    cache = get_cache()
    request_id = str(uuid4())

    params = {
        "user": user,
        "dataset": dataset,
        "query": query,
        "mode": mode,
        "context": context,
        "namespace": namespace,
        "policy": policy
    }
    label = f"{user}_{dataset}"

    if label not in stats['datasets']:
        stats['datasets'][label] = {
            "query_count": 0,
            "query_success": 0,
            "query_failure": 0,
        }


    logger.debug(f"Starting query",
                extra={
                    "source": "service",
                    "user": user,
                    "dataset": dataset,
                    "data": json.dumps(params, indent=4, cls=SafeEncoder)
                })

    cache[request_id] = {
        "status": "pending",
        "user": user,
        "dataset": dataset,
        "query": query,        
        "params": params
    }

    # Run the background task...
    background_tasks.add_task(query_run, request_id)

    return {
        "request_id": request_id,
        "status": "pending",
        "data": {}
    }

query_run(request_id) async

Query the agent instance created

Parameters:

Name Type Description Default
request_id

UUID generated for the request

required

Returns:

Type Description
None

Nothing is returned. Only the result cache is updated

Source code in llmsdk/services/proxy.py
async def query_run(request_id):
    """
    Query the agent instance created

    Parameters
    ----------
    request_id: str
                UUID generated for the request

    Returns
    -------
    None
         Nothing is returned. Only the result cache is updated

    """

    cache = get_cache()

    if request_id not in cache:
        logger.error(f"Failure",
                     extra={
                         'request_id': "invalid",
                         "source": "service",
                     })
        cache[request_id] = {
            'status': "failure",
            "message": f"Invalid request id"
        }
        return

    # First get the params
    value = cache[request_id]

    try:
        params  = value['params']
        user    = params['user']
        dataset = params['dataset']
        query   = params['query']
        namespace = params['namespace']
        label = f"{user}_{dataset}"

        logger.debug(f"{label}: Running Query",
                     extra={
                         "source": "service",
                         "user": user,
                         "dataset": dataset,
                         "request_id": request_id,
                         'data': json.dumps(value, indent=4, cls=SafeEncoder)
                     })

        stats['query_count'] += 1


        result   = await run_in_threadpool(lambda: proxy.completion(request_id, params))
        result['request_id'] = request_id

        logger.debug(f"Raw result received",
                     extra={
                         "source": "service",
                         "request_id": request_id,
                         "user": user,
                         "dataset": dataset,
                         "data": json.dumps(result, indent=4, cls=SafeEncoder)
                     })
        result['answer'] = result['answer'].strip()
        stats['query_success'] += 1
        stats['datasets'][label]['query_count'] += 1
        stats['datasets'][label]['query_success'] += 1
        query_update_result(request_id, {
            "status": "success",
            "result": result
        })
    except Exception as e:
        stats['query_failure'] += 1
        stats['datasets'][label]['query_count'] += 1
        stats['datasets'][label]['query_failure'] += 1
        logger.exception(f"Failed to run query",
                         extra={
                             "source": "service",
                             'request_id': request_id,
                             "user": params.get('user', "unknown"),
                             "dataset": params.get('dataset', "unknown"),
                         })
        query_update_result(request_id, {
            "status": "failure",
            "message":  f"Unable to construct the answer. Internal error. See the agent log",
            "result": {
                "question": query,
                "answer": "Error while accessing the LLM Agent. See log",
                "raw": [
                    str(e)
                ]
            }
        })

query_status(request_id) async

Get the status of the query request

Parameters:

Name Type Description Default
request_id str

UUID generated for the request

required

Returns:

Name Type Description
status dict

Dictionary with request id, status, message and data

Source code in llmsdk/services/proxy.py
@app.get("/qna/status")
async def query_status(request_id: str):
    """
    Get the status of the query request

    Parameters
    ----------
    request_id: str
                UUID generated for the request

    Returns
    -------
    status: dict
         Dictionary with request id, status, message and data

    """
    # First get the params
    try:

        value: dict = {}

        cache = get_cache()

        if request_id not in cache:
            logger.error(f"Query Status: Invalid Request ID",
                         extra={
                             'request_id': request_id,
                             "source": "service",
                         })
            return {
                "request_id": request_id,
                "status": "failure",
                "message": "Request ID not found"
            }

        value = cache[request_id]
        status = value['status']
        response = {
            "request_id": request_id,
            "status": value['status'],
            'message': value.get('message', ""),
            'data': value.get('result',{})
        }
        logger.debug(f"Query Status: {status}",
                     extra={
                         "source": "service",
                         'request_id': request_id,
                         "dataset": value.get('dataset', 'unknown'),
                         "user": value.get('user', 'unknown'),
                         'data': json.dumps(response,indent=4, cls=SafeEncoder)
                     })
        return response

    except Exception as e:
        logger.exception("Query Status: failure",
                     extra={
                         "source": "service",
                         'request_id': request_id,
                         "user": value.get('user',"unknown"),
                         "dataset": value.get('dataset',"unknown"),
                     })

        return {
            "request_id": request_id,
            "status": "failure",
            "message": str(e),
            "data": {}
        }