Skip to content

Creating Custom Tasks

This guide explains how to create custom tasks. sieves distinguishes two types of tasks:

  1. Ordinary tasks inherit from Task. Pretty much their only requirement is to process a bunch of documents and output the same set of documents with their modifications.
  2. Predictive tasks inherit from PredictiveTask (which inherits from Task). Those are for tasks using predictive (i.e. zero-shot) models. They are more complex, as they need to implement the required interface to integrate with at least one model type.

While there are a bunch of pre-built tasks available for you to use, you might want to write your own to match your use-case. This guide describes how to do that.

If you feel like your task might be useful for others, we'd happy to see you submit a PR!

Tasks

Inherit from Task whenever you want to implement something that doesn't require interacting with engines. That can be document pre- or postprocessing, or something completely different - you could e.g. run an agent following instructions provided in docs, and then follow this up with a subsequent task in your pipeline analyzing and structuring those results.

To create a basic custom task, inherit from the Task class and implement the required abstract methods. In this case we'll implement a dummy task that counts how many characters are in the document's text and stores that as a result.

Basic custom task
from typing import Iterable
from sieves.tasks.core import Task
from sieves.data import Doc

class CharCountTask(Task):
    def _call(self, docs: Iterable[Doc]) -> Iterable[Doc]:
        """Counts characters in doc.text.

        :param docs: Documents to process.
        :return Iterable[Doc]: Processed documents.
        """
        for doc in docs:
            doc.results[self.id] = len(doc.text)
            yield doc

That's it! You can customize this, of course. You might also want to extend __init__() to allow for initializing what you need.

Predictive Tasks

Inherit from PredictiveTask whenever you want to make use of the structured generation capabilities in sieves. PredictiveTask requires you to implement a few methods that define how your task expects results to be structured, how few-shot examples are expected to look like, which prompt to use etc.

We'll break down how to create a predictive task step by step. For this example, let's implement a sentiment analysis task using outlines.

Understanding the Architecture

Before diving into implementation, let's understand why sieves uses the Bridge pattern and how components fit together.

The Challenge: Model Diversity

Different NLP frameworks have vastly different APIs:

  • Outlines: Uses Jinja2 templates + JSON schemas for structured generation
  • DSPy: Uses signatures + optimizers with Python-native schemas
  • Transformers: Uses pipeline() API with classification/generation modes
  • LangChain: Uses chains + prompts with custom parsers
  • GliNER2: Uses structure chaining patterns to describe custom JSON structures

Without abstraction, each task would need separate implementations for each model - unfeasible to maintain.

The Solution: Bridge Pattern

The Bridge pattern decouples task logic (what to compute) from engine-specific implementation (how to compute):

┌─────────────────────────────────────────────────────┐
│                     Pipeline                        │
│  (Orchestration: caching, batching, serialization)  │
└──────────────────────┬──────────────────────────────┘
                       │
                       ▼
            ┌──────────────────────┐
            │        Task          │ ◄── User-facing API
            │  (What to compute)   │     Classification, NER, etc.
            └──────────┬───────────┘
                       │
                       ▼
            ┌──────────────────────┐
            │       Bridge         │ ◄── Engine-specific adapter
            │  (How to compute)    │     DSPyClassification, OutlinesNER
            └──────────┬───────────┘
                       │
                       ▼
            ┌──────────────────────┐
            │    ModelWrapper      │ ◄── Internal inference handler
            │  (Inference logic)   │     Auto-detected from model type
            └──────────┬───────────┘
                       │
                       ▼
            ┌──────────────────────┐
            │        Model         │ ◄── User-provided.
            │  (3rd party          │     Any supported model
            │   library instance)  │
            └──────────────────────┘

Responsibilities

Each layer has clear, focused responsibilities:

Task (User creates this for custom functionality):

  • Defines what problem to solve (e.g., "classify text into sentiment categories")
  • Provides task-specific configuration (labels, entity types, prompt instructions)
  • Manages few-shot examples for in-context learning
  • Handles optimization (prompt tuning) and distillation (model compression)
  • Engine-agnostic: Same task API works with any backend (DSPy, Outlines, etc.)

Bridge (User implements this for custom tasks):

  • Defines how to solve the problem for a specific engine
  • Creates prompts (Jinja2 templates for Outlines, signatures for DSPy)
  • Parses model outputs into structured results (Pydantic models)
  • Integrates results into documents (integrate() method)
  • Consolidates multi-chunk results into document-level results (consolidate() method)
  • Engine-specific: Each engine needs its own Bridge implementation

Engine (Internal, automatically detected):

  • Handles low-level inference mechanics (batching, generation, error handling)
  • Manages model calls and streaming
  • Applies generation settings (temperature, max_tokens, top_p)
  • Users never interact with this directly - it's an implementation detail

Key Methods: integrate() vs consolidate()

These two methods serve different but complementary purposes in the processing pipeline:

integrate() - Stores raw results immediately after inference:

def integrate(self, results, docs):
    # Called once per batch of chunks
    # results = [Result1, Result2, Result3]  # One per chunk
    # docs = [Chunk1, Chunk2, Chunk3]        # Corresponding chunks

    for doc, result in zip(docs, results):
        doc.results[self._task_id] = result.score  # Store per-chunk
    return docs

consolidate() - Merges chunk-level results into document-level results:

def consolidate(self, results, docs_offsets):
    # Called after integrate(), once per original document
    # results = [Result1, Result2, Result3]    # All chunk results
    # docs_offsets = [(0, 2), (2, 3)]          # Chunk ranges per doc

    for start, end in docs_offsets:
        chunk_results = results[start:end]     # Get chunks for this doc
        avg_score = sum(r.score for r in chunk_results) / len(chunk_results)
        yield ConsolidatedResult(score=avg_score)  # One result per document

Why separate methods?

  • Documents may exceed model context limits (e.g., 100-page PDFs vs 8K token limit)
  • Sieves automatically splits long documents into chunks for processing
  • integrate() handles per-chunk results (stores immediately, no processing)
  • consolidate() aggregates chunks back into per-document results (averaging, voting, etc.)

When to Create Custom Tasks

Use built-in tasks (Classification, NER, InformationExtraction, etc.) whenever possible. Only create custom tasks when:

Novel task type: Your task doesn't fit any existing task (e.g., custom scoring, specialized extraction)

Specialized consolidation: Your task requires unique logic for merging multi-chunk results (e.g., weighted averaging, consensus voting)

Don't create custom tasks for:

Different prompts: Use prompt_instructions parameter on built-in tasks instead

Different labels: Use task configuration (e.g., labels parameter in Classification)

Different models: Just pass a different model instance to the existing task

Example - When you DON'T need a custom task:

# ❌ Bad: Creating entire custom task just to change the prompt
class MyCustomClassifier(Classification):
    # 100+ lines of bridge implementation...
    pass

# ✅ Good: Use built-in task with custom prompt
task = Classification(
    labels=["positive", "negative", "neutral"],
    model=model,
    prompt_instructions="Analyze the text's sentiment carefully, "
                        "considering both explicit and implicit cues..."  # Custom!
)

Now let's implement a custom task that actually needs custom logic: sentiment analysis with continuous scoring and reasoning.

1. Implement a Bridge

A Bridge defines how to solve a task for a certain engine. We decided to go with outlines as our engine (you can allow multiple engines for a task by implementing corresponding bridges, but for simplicity's sake we'll stick with DSPy only here).

A Bridge requires you to implement/specify the following: - A prompt template (optional depending on the engine used). - A prompt signature description (optional depending on the engine used). - A prompt signature describing how results have to be structured. - How to integrate results into docs. - How to consolidate results from multiple doc chunks into one result per doc.

The inference mode (which defines how the engine queries the model and parses the results) is configured via GenerationSettings when creating the task, rather than in the Bridge.

We'll save this in sentiment_analysis_bridges.py.

Import Dependencies

First, import the required modules for building the bridge:

from collections.abc import Iterable
from functools import cached_property

import pydantic

from sieves.data import Doc
from sieves.engines import EngineInferenceMode, outlines_
from sieves.tasks.predictive.bridges import Bridge

Define the Output Schema

Define the structure of results using Pydantic. This specifies both the sentiment score and reasoning:

# This is how we require our response to look like - we require not just the score, but also a reasoning/justification
# for why this model assigns this score. We also force the score to be between 0 and 1.
class SentimentEstimate(pydantic.BaseModel):
   reasoning: str
   score: pydantic.confloat(ge=0, le=1)

The schema requires a reasoning explanation and a score between 0 and 1.

Create the Bridge Class

Start by defining the bridge class that will handle sentiment analysis. In this case we start with a bridge that will employ Outlines for our sentiment analysis task.

# This is the bridge class.
class OutlinesSentimentAnalysis(Bridge[SentimentEstimate, SentimentEstimate, outlines_.InferenceMode]):

Define the Prompt Template

The prompt template uses Jinja2 syntax to support few-shot examples:

# This defines the default prompt template as Jinja2 template string.
# We include an example block allowing us to include fewshot examples.
@property
def _default_prompt_instructions(self) -> str:
    return """
    Estimate the sentiment in this text as a float between 0 and 1. 0 is negative, 1 is positive. Provide your
    reasoning for why you estimate this score before you output the score.

    {% if examples|length > 0 -%}
        Examples:
        ----------
        {%- for example in examples %}
            Text: "{{ example.text }}":
            Output:
                Reasoning: "{{ example.reasoning }}":
                Sentiment: "{{ example.sentiment }}"
        {% endfor -%}
        ----------
    {% endif -%}

    ========
    Text: {{ text }}
    Output:
    """

This template instructs the model on how to estimate sentiment and allows for optional few-shot examples.

Configure Bridge Properties

Define the required properties that configure how the bridge behaves:

@property
def _prompt_example_template(self) -> str | None:
    return None

@property
def _prompt_conclusion(self) -> str | None:
    return None

@property
def inference_mode(self) -> outlines_.InferenceMode:
    return self._generation_settings.inference_mode or outlines_.InferenceMode.json

# We return our SentimentEstimate as prompt signature.
@cached_property
def prompt_signature(self) -> type[pydantic.BaseModel]:
    return SentimentEstimate

These properties specify the prompt signature (output structure) and inference mode.

Implement Result Integration

The integrate() method stores results into documents:

# We copy the result score into our doc's results attribute.
def integrate(self, results: Iterable[SentimentEstimate], docs: Iterable[Doc]) -> Iterable[Doc]:
    for doc, result in zip(docs, results):
        assert isinstance(result, SentimentEstimate)
        # doc.results is a dict, with the task ID being the key to store our results under for the corresponding
        # task.
        doc.results[self._task_id] = result.score
    return docs

This method extracts the sentiment score from each result and stores it in the document's results dictionary.

Implement Result Consolidation

The consolidate() method aggregates results from multiple document chunks:

# Consolidating multiple chunks for sentiment analysis: we compute the average score over
# all chunks and assume this to be the sentiment score for the entire document.
def consolidate(
    self, results: Iterable[SentimentEstimate], docs_offsets: list[tuple[int, int]]
) -> Iterable[SentimentEstimate]:
    results = list(results)

    # docs_offsets contains (start, end) tuples indicating which result indices belong to which document.
    # Example: [(0, 3), (3, 5)] means doc1 uses results[0:3], doc2 uses results[3:5]
    # This mapping is necessary because long documents are split into multiple chunks for processing.
    for doc_offset in docs_offsets:
        # Accumulate reasonings and scores from all chunks of this document
        reasonings: list[str] = []
        scores = 0.

        # Process each chunk's result for this document
        for chunk_result in results[doc_offset[0] : doc_offset[1]]:
            # Engines may return None results if they encounter errors in permissive mode.
            # Skip None results to avoid crashes while still processing valid chunks.
            if chunk_result:
                assert isinstance(chunk_result, SentimentEstimate)
                reasonings.append(chunk_result.reasoning)
                scores += chunk_result.score

        # Calculate how many chunks this document has
        num_chunks = doc_offset[1] - doc_offset[0]

        yield SentimentEstimate(
           # Average the sentiment score across all chunks of this document
           score=scores / num_chunks,
           # Concatenate all chunk reasonings into a single string for the document
           # (in production, you might want more sophisticated reasoning aggregation)
           reasoning=str(reasonings)
        )

For sentiment analysis, we compute the average score across chunks and concatenate all reasoning strings.

Our bridge now handles the complete workflow: prompting the model, parsing structured results, integrating them into documents, and consolidating multi-chunk results.

2. Build a SentimentAnalysisTask

The task class wraps the bridge from Section 1 and provides engine-agnostic functionality. It handles bridge instantiation, few-shot examples, and dataset export. We'll save this in sentiment_analysis_task.py.

Bridge Implementation

In a real project, you'd import the bridge from a separate module:

from sentiment_analysis_bridges import OutlinesSentimentAnalysis

For this guide's test to be self-contained, the complete code includes both the bridge and task implementation. Below, we show only the task-specific code that's new in this section.

Import Task Dependencies

Start with the core imports needed for the task wrapper:

from collections.abc import Iterable
from typing import Any

import datasets
import pydantic

from sieves.data import Doc
from sieves.engines import EngineType
from sieves.serialization import Config
from sieves.tasks.predictive.core import PredictiveTask

These imports provide the base classes and types needed to create a predictive task wrapper.

Define Few-Shot Example Schema

Define how few-shot examples should be structured:

# We'll define that class we require fewshot examples to be provided in. In our case we can just inherit from our
# prompt signature class and add a `text` property.
class FewshotExample(SentimentEstimate):
    text: str

This allows users to provide training examples with text and expected sentiment.

Create the Task Class

Now create the main task class that uses the bridge:

class SentimentAnalysis(PredictiveTask[SentimentEstimate, SentimentEstimate, OutlinesSentimentAnalysis]):

Implement Bridge Initialization

Define how to initialize the bridge for supported engines:

# For the initialization of the bridge. We raise an error if an engine has been specified that we don't support (due
# to us not having a bridge implemented that would support this engine type).
def _init_bridge(self, engine_type: EngineType) -> OutlinesSentimentAnalysis:
    if engine_type == EngineType.outlines:
        return OutlinesSentimentAnalysis(
            task_id=self._task_id,
            prompt_instructions=self._custom_prompt_instructions,
            overwrite=False,
            generation_settings=self._generation_settings,
        )
    else:
        raise KeyError(f"Engine type {engine_type} is not supported by {self.__class__.__name__}.")

# Represents set of supported engine types.
@property
def supports(self) -> set[EngineType]:
    return {EngineType.outlines}

The task raises an error if an unsupported engine is specified.

Add Dataset Export (Optional)

Implement HuggingFace dataset export for analysis or distillation:

# This implements the conversion of a set of docs to a Hugging Face datasets.Dataset.
# You can implement this as `raise NotImplementedError` if you're not interested in generating a Hugging Face
# dataset from your result data.
def to_hf_dataset(self, docs: Iterable[Doc]) -> datasets.Dataset:
    # Define metadata.
    info = datasets.DatasetInfo(
        description=f"Sentiment estimation dataset. Generated with sieves"
                    f"v{Config.get_version()}.",
        features=datasets.Features({"text": datasets.Value("string"), "score": datasets.Value("float32")}),
    )

    def generate_data() -> Iterable[dict[str, Any]]:
        """Yields results as dicts.
        :return Iterable[dict[str, Any]]: Results as dicts.
        """
        for doc in docs:
            yield {"text": doc.text, "score": doc.results[self._task_id]}

    # Create dataset.
    return datasets.Dataset.from_generator(generate_data, features=info.features, info=info)

And that's it! Our sentiment analysis task is complete and ready to use.

3. Running our task

We can now use our sentiment analysis task like every built-in task:

from sieves import Doc, Pipeline
import outlines

model = small_outlines_model

docs = [Doc(text="I'm feeling happy today."), Doc(text="I was sad yesterday.")]
pipe = Pipeline([SentimentAnalysis(model=model)])

for doc in pipe(docs):
    print(doc.text, doc.results["SentimentAnalysis"])

Troubleshooting

Common Issues

Results not appearing in doc.results

Symptom: After running your task, doc.results[task_id] is empty or missing.

Possible causes:

  1. integrate() not called correctly: Ensure your bridge's integrate() method stores results in doc.results[self._task_id]
  2. Incorrect task_id: Verify you're using the correct task ID (check task._task_id)
  3. Engine returning None: The engine may be returning None results (e.g., due to generation errors in permissive mode)

Debug steps:

# Add debug logging to your integrate() method
def integrate(self, results, docs):
    for doc, result in zip(docs, results):
        if result is None:
            print(f"WARNING: Got None result for doc: {doc.text[:50]}")
        else:
            print(f"Storing result: {result} for task {self._task_id}")
            doc.results[self._task_id] = result
    return docs

Type errors in consolidate()

Symptom: TypeError or AttributeError in the consolidate method.

Possible causes:

  1. Mismatched types: Results from integrate() don't match the type expected in consolidate()
  2. None results not handled: Some results may be None (from engine errors)
  3. Incorrect doc_offsets slicing: Check that you're using results[doc_offset[0]:doc_offset[1]] correctly

Solution:

def consolidate(self, results, docs_offsets):
    results = list(results)
    for doc_offset in docs_offsets:
        chunk_results = results[doc_offset[0]:doc_offset[1]]
        # Filter out None results
        valid_results = [r for r in chunk_results if r is not None]

        if not valid_results:
            # No valid results for this document
            yield None
            continue

        # Your consolidation logic here
        ...

"Engine type X is not supported" error

Cause: You're trying to use an engine that your bridge doesn't support.

Solution: Either:

  1. Implement a bridge for that engine type
  2. Use a supported engine (check task.supports)
  3. Update _init_bridge() to handle the engine type
# Check supported engines before creating task
from sieves.engines import EngineType
print(f"Supported engines: {task.supports}")  # e.g., {EngineType.outlines}

Prompt template not rendering correctly

Symptom: Model outputs are unexpected or malformed.

Debug steps: 1. Check Jinja2 syntax: Ensure your template variables are correct 2. Validate few-shot examples: Ensure examples match your template's expected structure 3. Print the rendered prompt: Add debug logging to see what's actually sent to the model

# In your bridge, add this to see the rendered prompt:
@cached_property
def _prompt_template(self) -> str:
    template = """..."""
    print(f"Template: {template}")
    return template

Best Practices

  1. Start simple: Begin with a basic bridge, test it, then add complexity
  2. Test consolidate() separately: Write unit tests with mock data to verify consolidation logic
  3. Handle None results: Always check for None in integrate() and consolidate()
  4. Use type hints: Proper typing helps catch errors early
  5. Add assertions: Use assert isinstance(result, YourType) to catch type mismatches
  6. Log generously: Add debug logging during development to track data flow

Custom Task Serialization

When saving pipelines with custom tasks, you'll need to provide initialization parameters for any complex objects (models, tokenizers, etc.) in init_params during load.