Structured LLM extraction from text - my go to setup

Pierre Châtel-Innocenti on Unsplash

One of the core tasks that has been completely overtaken by LLMs is reading blocks of text and pulling out the most relevant information. A quick description of a pattern I tend to overuse in this space.

 

We now move faster, and more importantly, further with the LLM-based tooling. In this post I will outline my default starting point for converting text into structured data - which I tend to use more often than I would expect.

 

Where the need comes from

Lately I tend to work with Retrieval-Augmented Generation (RAG) settings. We already know that the training-time knowledge embedded in the models is huge but more often than not, for deeper analysis and more targeted research, we need to provide the correct context at the query time. RAG is the standard pattern for this problem where we use a separate search engine to find relevant pieces information, re-rank it to match the actual problem at hand, select the relevant subset, potentially summarise and inject it into the context. To build use-case-specific databases, I often need to sift through "a bunch" (i.e. 103 - 105) of documents to extract pieces of interests to facilitate the lookup - or provide analytics over the entire corpus.
 

This feeds to a more basic problem of - how do we approximate getting any useful information from a piece of text? This fits firmly in the NLP (Natural Language Processing, not Neuro-Linguistic Programming as a certain CBE once asserted with noone daring to correct them) area. This is a massive space with a massive range of solutions, ranging from dictionary based, through semantic mapping to complex deep learning (DL) architectures - and one that is becoming fast and continually squeezed by ever-improving LLMs. Since the goal standard are human - provided annotations over (mostly) text - this is exactly where we see their greatest strengths play out.

 

Since this is something that's suddenly easy - there now companies doing exactly what this post describes: throw a document in - get a bunch of structured data out. The famous "OpenAI wrapper" model.

 

Implementation

Without further ado, the approximate code I tend to start with.

from typing import Iterable, Any, Optional, Callable, TypeVar
  import uuid
  import orjson
  from concurrent.futures import ThreadPoolExecutor, as_completed

  import instructor
  from pydantic import BaseModel
  from openai import OpenAI
  from sqlitedict import SqliteDict

  from tqdm import tqdm

  T = TypeVar('T', bound=BaseModel)
  DEFAULT_EXTRACTOR_NAMESPACE = uuid.UUID('a1b2c3d4-e5f6-7890-abcd-ef1234567890')

  def extract_structured(
    items: Iterable[str | Any],
    model: str, 
    response_model: type[T], 
    client=instructor.from_openai(OpenAI()),
    prompt_fn: Callable[[Any], str] = lambda x: x,
    system_prompt: Optional[str] = None,
    max_workers: int = 5,
    max_retries: int = 0,
    show_progress: bool = True,
    batch_size: int = 5,
    cache_db_path: Optional[str] = None,
    cache_key_fn: Optional[Callable[[Any], str]] = None,
    **instructor_kwargs,
  ) -> list[Optional[T]]:
    """
    Extract structured data from texts using instructor with batched parallelization.

    Args:
        items: Iterable of text strings to process
        client: Instructor-wrapped client (e.g., instructor.from_openai(OpenAI()))
        model: Model name (e.g., "gpt-4o-mini")
        response_model: Pydantic model class for output structure
        prompt_fn: Function to transform text into prompt (default: identity)
        system_prompt: Optional system prompt
        max_workers: Number of parallel workers
        max_retries: Number of retry attempts (default: 0, no retry)
        show_progress: Whether to show progress bar
        batch_size: Number of items to process in a single API call (default: 5)
        cache_db_path: Path to SQLite cache database (None to disable caching)
        **instructor_kwargs: Additional kwargs passed to instructor

    Returns:
        List of results (None if error), same length as input
    """
    # UUID namespace for text hashing

    # Open cache if provided
    cache = (
        SqliteDict(
            cache_db_path, autocommit=False, encode=orjson.dumps, decode=orjson.loads
        )
        if cache_db_path
        else None
    )
    _get_cache_key = (
        cache_key_fn
        if cache_key_fn
        else lambda x: str(
            uuid.uuid5(
                DEFAULT_EXTRACTOR_NAMESPACE,
                (
                    x
                    if not issubclass(type(x), BaseModel)
                    else x.model_dump_json(exclude_none=True)
                ),
            )
        )
    )

    def _process_batch(batch_items: list[str]) -> list[Optional[T]]:
        # Check cache first if enabled
        if cache is not None:
            cached_results = []
            uncached_texts = []
            uncached_indices = []

            for i, text in enumerate(batch_items):
                cache_key = _get_cache_key(text)
                if cache_key in cache:
                    # Deserialize from JSON
                    cached_data = cache[cache_key]
                    cached_results.append(
                        (i, response_model.model_validate(cached_data))
                    )
                else:
                    uncached_texts.append(text)
                    uncached_indices.append(i)

            # If all cached, return immediately
            if not uncached_texts:
                result = [None] * len(batch_items)
                for idx, cached_result in cached_results:
                    result[idx] = cached_result
                return result

            # Process only uncached texts
            batch_to_process = uncached_texts
        else:
            batch_to_process = batch_items
            uncached_indices = list(range(len(batch_items)))
            cached_results = []

        for attempt in range(max_retries + 1):
            try:
                # Combine all texts in the batch into a single prompt
                combined_prompt = "\n\n".join(
                    [
                        f"Item {i+1}:\n{prompt_fn(text)}"
                        for i, text in enumerate(batch_to_process)
                    ]
                )

                messages = []
                if system_prompt:
                    messages.append({"role": "system", "content": system_prompt})
                messages.append({"role": "user", "content": combined_prompt})
                import pprint

                pprint.pprint(messages)
                # Expect a list of response_model instances
                result = client.chat.completions.create(
                    model=model,
                    response_model=list[response_model],
                    messages=messages,
                    **instructor_kwargs,
                )

                # Pad with None if we got fewer results than expected
                if len(result) < len(batch_to_process):
                    result.extend([None] * (len(batch_to_process) - len(result)))

                api_results = result[: len(batch_to_process)]

                # Cache the results
                if cache is not None:
                    for text, res in zip(batch_to_process, api_results):
                        if res is not None:
                            cache_key = _get_cache_key(text)
                            cache[cache_key] = res.model_dump()
                    cache.commit()  # Commit after each batch

                # Combine cached and new results
                final_results = [None] * len(batch_items)
                for idx, cached_result in cached_results:
                    final_results[idx] = cached_result
                for idx, api_result in zip(uncached_indices, api_results):
                    final_results[idx] = api_result

                return final_results
            except Exception as e:
                print(e)
                if attempt == max_retries:
                    # Combine cached and failures
                    final_results = [None] * len(batch_items)
                    for idx, cached_result in cached_results:
                        final_results[idx] = cached_result
                    return final_results

        # Combine cached and failures
        final_results = [None] * len(batch_items)
        for idx, cached_result in cached_results:
            final_results[idx] = cached_result
        return final_results

    texts_list = list(items)
    results = [None] * len(texts_list)

    # Create batches
    batches = []
    batch_indices = []
    for i in range(0, len(texts_list), batch_size):
        batch = texts_list[i : i + batch_size]
        batches.append(batch)
        batch_indices.append(list(range(i, i + len(batch))))

    try:
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            futures = {
                executor.submit(_process_batch, batch): indices
                for batch, indices in zip(batches, batch_indices)
            }

            iterator = as_completed(futures)
            if show_progress:
                iterator = tqdm(iterator, total=len(batches), desc="Extracting")

            for future in iterator:
                indices = futures[future]
                batch_results = future.result()
                for idx, result in zip(indices, batch_results):
                    results[idx] = result
    finally:
        # Close cache when done
        if cache is not None:
            cache.close()

    return results

A couple of ideas behind the implementation:

Batching

Every call packs multiple items into one prompt by default. One API round-trip per batch slashes token overhead and latency. If the model returns fewer results than expected, the batch is padded with nulls - no silent data loss.

Concurrency

Batches fan out across a thread pool. LLM calls are I/O-bound, so threads are the right primitive - no async ceremony, no process overhead. Progress tracking comes free.

Lightweight Caching

Using local SQLite file adds deterministic, content-addressed caching. Keys are UUID5 hashes by default. The cache commits after each batch, so (all too frequent) crashes don't lose everything.

Generalisability

Prompt construction, cache key logic, LLM client, response schema, and model parameters are all caller-supplied. The function focuses on lightweight execution plumbing.

Retries

Retries are configurable. On final failure, cached results for that batch are still returned - only truly unprocessable items come back as null. The output list always matches input length, so downstream code never has to guess about alignment.

Cross-calls normalisation

Finally - a note on result normalisation. In biochemical settings, when we run experiments across different machines / labs - or even if we do them sequentially - it often is a good practice to check if and to what extent conditions impacted a particular batch. Since LLMs are famously unstable by design - returning different results for the same input. This can make a dent on the usability and interpretation of the results. One of potential remedies is batch control - an identical sample injected to each batch which can subsequently be used to normalise to. As an extension one might want to inject multiple ones to approximate a distribution but realistically performance starts to get detrimental as batch size gets above 10 so this would need careful consideration.

Final thoughts

This is typically how I seed semi-structured or pure-text extraction - nested, . The function provides a stable foundation that handles the execution complexity - parallelisation, caching, error handling - while exposing injection for domain-specific logic. I'll often drop this into a project and then hand it off to Claude Code to iterate on - adjusting retry strategies, tuning batch sizes, or adding project-specific validation logic. Relative simplicity makes changes straightforward: the domain logic / models / LLM providers get injected, while the occasional changes to logging or caching gets adjusted within the function.