Agents working with large datasets
My current project requires us to create multiple agents running complex algorithms with large volumes of data.
The algorithms work on large datasets and compute outcomes to be used in the next steps of our workflow. Our data scientists are creating these algorithms, and they’re most comfortable in Python, so that’s what our application is running as well, in a setup similar to the one I created in my Trial and Error repository.
If you head to this repository, you’ll notice a file called large_data_analysis.py. This file is responsible for creating agents that are capable of working with large volumes of data.
As you probably know, context windows for agents are limited. They are growing at a rapid pace, but ideally you keep the context small.
When you need to work with millions of records from a repository, you can put them in the agent’s context, but you’ll notice the limitations of this practice quite quickly. Either the agent will start hallucinating or forget details, or you’ll be prompted with an error stating that the context is too large. Both scenarios are disastrous when relying on the agent and its outcome.
Have tools query your repository
In my previous post, you could see how to create tools that can be used in agents.
Oftentimes, these tools require data to work with. If the data already exists inside the prompt, that’s great. The language model will take care of passing the correct parameters based on the descriptions.
However, if you need to get this data from a repository, you need to do some additional work.
You might opt for the agent to query the database directly, or to fetch the results via a data-access tool and pass them along to the data-processing tool.
While this works for small datasets, it’s not very efficient. Consider this flow:
The agent is invoking two tools here: one for retrieving the data to work with and one to process it. While the tools are quite granular and scoped to do a single thing, they will fill up the agent’s context window because the entire dataset will be placed in it.
A better solution is to have the tools query the dataset they need themselves. This way the agent only needs to invoke a single tool, saving a round trip, and all required data for the tool is loaded outside the agent’s context, saving precious tokens.
In my sample repository, I have set up an example using the DataAnalysisService class. I’ve copied a small excerpt of the code below.
# In the fabric_sql.py file
def query_to_dataframe(sql: str) -> pd.DataFrame:
engine = _build_engine()
with engine.connect() as connection:
df = pd.read_sql(sql, connection)
engine.dispose()
return df
# In the data_access.py file
def get_customer_information():
sql = os.getenv("CUSTOMER_INFORMATION_QUERY") # Sample query I use to load my large dataset, consider it as `SELECT * FROM [TableWithALotOfData]`
if not sql:
raise ValueError("CUSTOMER_INFORMATION_QUERY is not configured")
start_time = time.perf_counter()
df = query_to_dataframe(sql)
duration_ms = (time.perf_counter() - start_time) * 1000
record_count = len(df)
records = df.to_dict(orient="records")
# In the large_data_analysis.py file
class CustomerDataProvider(Protocol):
async def get_records(self) -> list[dict[str, Any]]: ...
class DatabaseCustomerDataProvider:
async def get_records(self) -> list[dict[str, Any]]:
data = get_customer_information()
return list(data) # ensure concrete list for typing
class DataAnalysisService:
def __init__(self, provider: CustomerDataProvider) -> None:
self._provider = provider
self._data: list[dict[str, Any]] | None = None
async def _ensure_loaded(self) -> None:
if self._data is None:
self._data = await self._provider.get_records()
async def record_count(self) -> int:
await self._ensure_loaded()
assert self._data is not None
return len(self._data)
async def distinct_companies(self) -> list[str]:
await self._ensure_loaded()
assert self._data is not None
companies = {
str(r["Company"]) for r in self._data if "Company" in r and r["Company"] is not None
}
return sorted(companies)
async def _values(self, column: str) -> list[float]:
await self._ensure_loaded()
assert self._data is not None
return [
float(r[column])
for r in self._data
if column in r and isinstance(r[column], (int, float))
]
async def mean(self, column: str) -> float:
values = await self._values(column)
if not values:
return 0.0
return sum(values) / len(values)
The DataAnalysisService class is responsible for exposing all methods we might want to run on the dataset. The methods, in turn, make sure the required data is loaded.
In my sample, I don’t have any filters. Those can be added as additional parameters, so you can inject filters if you want. They just need to be passed along to the corresponding query methods.
Now that all the functionality is in place to compute on our dataset, it needs to be exposed to the agent as tools. This works in a similar fashion to any other tool.
Just decorate the methods with the @tool attribute and add them to your agent’s set of tools. Check the sample below.
async with (
AzureCliCredential() as credential,
AgentsClient(
endpoint=os.environ["AZURE_AI_PROJECT_ENDPOINT"],
credential=credential,
) as agents_client,
AzureAIAgentsProvider(credential=credential) as provider,
):
instructions = """
You are a data analysis agent.
Customer data is preloaded server-side and cannot be accessed directly.
You MUST use the provided statistical tools.
Rules:
- NEVER fabricate values.
- ONLY use money columns 'AverageGrossSellingPrice'.
- Every metric MUST come from a tool call.
- Explain the exact sequence of tools used.
- Mention how many records have been processed (count)
- For analysis, also mention how many dataset (count).
"""
analysis_agent = await agents_client.create_agent(
model=os.environ["AZURE_AI_MODEL_DEPLOYMENT_NAME"],
name="DataAnalysisAgent",
instructions=instructions,
)
try:
# Define the service capable of doing compute & retrieve data
service = DataAnalysisService(provider=DatabaseCustomerDataProvider())
@tool(approval_mode="never_require")
async def calculate_mean(column: str) -> float:
return await service.mean(column)
@tool(approval_mode="never_require")
async def calculate_median(column: str) -> float:
return await service.median(column)
@tool(approval_mode="never_require")
async def calculate_standard_deviation(column: str) -> float:
return await service.std_dev(column)
# All the other logic and tools you need
# Create the agent
agent = await provider.get_agent(
analysis_agent.id,
# Add all the tools you deem necessary for this agent.
tools=[
calculate_mean,
calculate_median,
calculate_standard_deviation,
find_outliers,
get_record_count,
get_distinct_companies,
],
)
answer = await agent.run(
query,
options={"response_format": LargeDataAnalysisResponse},
)
By exposing all the tools you need for normal scenarios, you save quite a lot of tokens, both by avoiding loading data into the context and by avoiding having the agent come up with algorithms itself. It provides a similar token gain to providing scripts to your coding agents.
You can also add the regular Code Interpreter to the agent. Doing so gives you all the benefits of providing tools to the agent for regular flows, combined with the flexibility the code interpreter offers for creating scripts for new flows.
With the above, you should be able to ask questions like Calculate the mean, median, and standard deviation of our sales data and find anomalies, and the agent will invoke the tools to fetch the required data.
Things to consider
Do make sure the loaded data isn’t reused across requests. In the example above, the data is kept in memory and can be reused. While this is a nice feature, it is also something you need to decide whether you actually want.
If you have something like row-level security in place, you need a different implementation that makes sure every request retrieves its own data.
Also, all data is now loaded into the memory of your compute option, in my case a Container App. These containers are often created with a limited amount of memory, so make sure your compute option is able to handle the amount of data you need in memory for multiple requests.
