We now move faster, and more importantly, further with the LLM-based tooling. In this post I will outline my default starting point for converting text into structured data - which I tend to use more often than I would expect.
Where the need comes from
Lately I tend to work with Retrieval-Augmented Generation (RAG) settings. We already know that the training-time knowledge embedded in the models is huge but more often than not, for deeper analysis and more targeted research, we need to provide the correct context at the query time. RAG is the standard pattern for this problem where we use a separate search engine to find relevant pieces information, re-rank it to match the actual problem at hand, select the relevant subset, potentially summarise and inject it into the context. To build use-case-specific databases, I often need to sift through "a bunch" (i.e. 103 - 105) of documents to extract pieces of interests to facilitate the lookup - or provide analytics over the entire corpus.
This feeds to a more basic problem of - how do we approximate getting any useful information from a piece of text? This fits firmly in the NLP (Natural Language Processing, not Neuro-Linguistic Programming as a certain CBE once asserted with noone daring to correct them) area. This is a massive space with a massive range of solutions, ranging from dictionary based, through semantic mapping to complex deep learning (DL) architectures - and one that is becoming fast and continually squeezed by ever-improving LLMs. Since the goal standard are human - provided annotations over (mostly) text - this is exactly where we see their greatest strengths play out.
Since this is something that's suddenly easy - there now companies doing exactly what this post describes: throw a document in - get a bunch of structured data out. The famous "OpenAI wrapper" model.
Implementation
Without further ado, the approximate code I tend to start with.
from typing import Iterable, Any, Optional, Callable, TypeVar
import uuid
import orjson
from concurrent.futures import ThreadPoolExecutor, as_completed
import instructor
from pydantic import BaseModel
from openai import OpenAI
from sqlitedict import SqliteDict
from tqdm import tqdm
T = TypeVar('T', bound=BaseModel)
DEFAULT_EXTRACTOR_NAMESPACE = uuid.UUID('a1b2c3d4-e5f6-7890-abcd-ef1234567890')
def extract_structured(
items: Iterable[str | Any],
model: str,
response_model: type[T],
client=instructor.from_openai(OpenAI()),
prompt_fn: Callable[[Any], str] = lambda x: x,
system_prompt: Optional[str] = None,
max_workers: int = 5,
max_retries: int = 0,
show_progress: bool = True,
batch_size: int = 5,
cache_db_path: Optional[str] = None,
cache_key_fn: Optional[Callable[[Any], str]] = None,
**instructor_kwargs,
) -> list[Optional[T]]:
"""
Extract structured data from texts using instructor with batched parallelization.
Args:
items: Iterable of text strings to process
client: Instructor-wrapped client (e.g., instructor.from_openai(OpenAI()))
model: Model name (e.g., "gpt-4o-mini")
response_model: Pydantic model class for output structure
prompt_fn: Function to transform text into prompt (default: identity)
system_prompt: Optional system prompt
max_workers: Number of parallel workers
max_retries: Number of retry attempts (default: 0, no retry)
show_progress: Whether to show progress bar
batch_size: Number of items to process in a single API call (default: 5)
cache_db_path: Path to SQLite cache database (None to disable caching)
**instructor_kwargs: Additional kwargs passed to instructor
Returns:
List of results (None if error), same length as input
"""
# UUID namespace for text hashing
# Open cache if provided
cache = (
SqliteDict(
cache_db_path, autocommit=False, encode=orjson.dumps, decode=orjson.loads
)
if cache_db_path
else None
)
_get_cache_key = (
cache_key_fn
if cache_key_fn
else lambda x: str(
uuid.uuid5(
DEFAULT_EXTRACTOR_NAMESPACE,
(
x
if not issubclass(type(x), BaseModel)
else x.model_dump_json(exclude_none=True)
),
)
)
)
def _process_batch(batch_items: list[str]) -> list[Optional[T]]:
# Check cache first if enabled
if cache is not None:
cached_results = []
uncached_texts = []
uncached_indices = []
for i, text in enumerate(batch_items):
cache_key = _get_cache_key(text)
if cache_key in cache:
# Deserialize from JSON
cached_data = cache[cache_key]
cached_results.append(
(i, response_model.model_validate(cached_data))
)
else:
uncached_texts.append(text)
uncached_indices.append(i)
# If all cached, return immediately
if not uncached_texts:
result = [None] * len(batch_items)
for idx, cached_result in cached_results:
result[idx] = cached_result
return result
# Process only uncached texts
batch_to_process = uncached_texts
else:
batch_to_process = batch_items
uncached_indices = list(range(len(batch_items)))
cached_results = []
for attempt in range(max_retries + 1):
try:
# Combine all texts in the batch into a single prompt
combined_prompt = "\n\n".join(
[
f"Item {i+1}:\n{prompt_fn(text)}"
for i, text in enumerate(batch_to_process)
]
)
messages = []
if system_prompt:
messages.append({"role": "system", "content": system_prompt})
messages.append({"role": "user", "content": combined_prompt})
import pprint
pprint.pprint(messages)
# Expect a list of response_model instances
result = client.chat.completions.create(
model=model,
response_model=list[response_model],
messages=messages,
**instructor_kwargs,
)
# Pad with None if we got fewer results than expected
if len(result) < len(batch_to_process):
result.extend([None] * (len(batch_to_process) - len(result)))
api_results = result[: len(batch_to_process)]
# Cache the results
if cache is not None:
for text, res in zip(batch_to_process, api_results):
if res is not None:
cache_key = _get_cache_key(text)
cache[cache_key] = res.model_dump()
cache.commit() # Commit after each batch
# Combine cached and new results
final_results = [None] * len(batch_items)
for idx, cached_result in cached_results:
final_results[idx] = cached_result
for idx, api_result in zip(uncached_indices, api_results):
final_results[idx] = api_result
return final_results
except Exception as e:
print(e)
if attempt == max_retries:
# Combine cached and failures
final_results = [None] * len(batch_items)
for idx, cached_result in cached_results:
final_results[idx] = cached_result
return final_results
# Combine cached and failures
final_results = [None] * len(batch_items)
for idx, cached_result in cached_results:
final_results[idx] = cached_result
return final_results
texts_list = list(items)
results = [None] * len(texts_list)
# Create batches
batches = []
batch_indices = []
for i in range(0, len(texts_list), batch_size):
batch = texts_list[i : i + batch_size]
batches.append(batch)
batch_indices.append(list(range(i, i + len(batch))))
try:
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(_process_batch, batch): indices
for batch, indices in zip(batches, batch_indices)
}
iterator = as_completed(futures)
if show_progress:
iterator = tqdm(iterator, total=len(batches), desc="Extracting")
for future in iterator:
indices = futures[future]
batch_results = future.result()
for idx, result in zip(indices, batch_results):
results[idx] = result
finally:
# Close cache when done
if cache is not None:
cache.close()
return results
A couple of ideas behind the implementation:
Batching
Every call packs multiple items into one prompt by default. One API round-trip per batch slashes token overhead and latency. If the model returns fewer results than expected, the batch is padded with nulls - no silent data loss.
Concurrency
Batches fan out across a thread pool. LLM calls are I/O-bound, so threads are the right primitive - no async ceremony, no process overhead. Progress tracking comes free.
Lightweight Caching
Using local SQLite file adds deterministic, content-addressed caching. Keys are UUID5 hashes by default. The cache commits after each batch, so (all too frequent) crashes don't lose everything.
Generalisability
Prompt construction, cache key logic, LLM client, response schema, and model parameters are all caller-supplied. The function focuses on lightweight execution plumbing.
Retries
Retries are configurable. On final failure, cached results for that batch are still returned - only truly unprocessable items come back as null. The output list always matches input length, so downstream code never has to guess about alignment.
Cross-calls normalisation
Finally - a note on result normalisation. In biochemical settings, when we run experiments across different machines / labs - or even if we do them sequentially - it often is a good practice to check if and to what extent conditions impacted a particular batch. Since LLMs are famously unstable by design - returning different results for the same input. This can make a dent on the usability and interpretation of the results. One of potential remedies is batch control - an identical sample injected to each batch which can subsequently be used to normalise to. As an extension one might want to inject multiple ones to approximate a distribution but realistically performance starts to get detrimental as batch size gets above 10 so this would need careful consideration.
Final thoughts
This is typically how I seed semi-structured or pure-text extraction - nested, . The function provides a stable foundation that handles the execution complexity - parallelisation, caching, error handling - while exposing injection for domain-specific logic. I'll often drop this into a project and then hand it off to Claude Code to iterate on - adjusting retry strategies, tuning batch sizes, or adding project-specific validation logic. Relative simplicity makes changes straightforward: the domain logic / models / LLM providers get injected, while the occasional changes to logging or caching gets adjusted within the function.