Creating Custom Tasks
This guide explains how to create custom tasks. sieves distinguishes two types of tasks:
- Ordinary tasks inherit from
Task. Pretty much their only requirement is to process a bunch of documents and output the same set of documents with their modifications. - Predictive tasks inherit from
PredictiveTask(which inherits fromTask). Those are for tasks using predictive (i.e. zero-shot) models. They are more complex, as they need to implement the required interface to integrate with at least one model type.
While there are a bunch of pre-built tasks available for you to use, you might want to write your own to match your use-case. This guide describes how to do that.
If you feel like your task might be useful for others, we'd happy to see you submit a PR!
Tasks
Inherit from Task whenever you want to implement something that doesn't require interacting with engines.
That can be document pre- or postprocessing, or something completely different - you could e.g. run an agent following
instructions provided in docs, and then follow this up with a subsequent task in your pipeline analyzing and
structuring those results.
To create a basic custom task, inherit from the Task class and implement the required abstract methods. In this case
we'll implement a dummy task that counts how many characters are in the document's text and stores that as a result.
from typing import Iterable
from sieves.tasks.core import Task
from sieves.data import Doc
class CharCountTask(Task):
def _call(self, docs: Iterable[Doc]) -> Iterable[Doc]:
"""Counts characters in doc.text.
:param docs: Documents to process.
:return Iterable[Doc]: Processed documents.
"""
for doc in docs:
doc.results[self.id] = len(doc.text)
yield doc
That's it! You can customize this, of course. You might also want to extend __init__() to allow for initializing what
you need.
Predictive Tasks
Inherit from PredictiveTask whenever you want to make use of the structured generation capabilities in sieves.
PredictiveTask requires you to implement a few methods that define how your task expects results to be structured, how
few-shot examples are expected to look like, which prompt to use etc.
We'll break down how to create a predictive task step by step. For this example, let's implement a sentiment analysis
task using outlines.
Understanding the Architecture
Before diving into implementation, let's understand why sieves uses the Bridge pattern and how components fit together.
The Challenge: Model Diversity
Different NLP frameworks have vastly different APIs:
- Outlines: Uses Jinja2 templates + JSON schemas for structured generation
- DSPy: Uses signatures + optimizers with Python-native schemas
- Transformers: Uses
pipeline()API with classification/generation modes - LangChain: Uses chains + prompts with custom parsers
- GliNER2: Uses structure chaining patterns to describe custom JSON structures
Without abstraction, each task would need separate implementations for each model - unfeasible to maintain.
The Solution: Bridge Pattern
The Bridge pattern decouples task logic (what to compute) from engine-specific implementation (how to compute):
┌─────────────────────────────────────────────────────┐
│ Pipeline │
│ (Orchestration: caching, batching, serialization) │
└──────────────────────┬──────────────────────────────┘
│
▼
┌──────────────────────┐
│ Task │ ◄── User-facing API
│ (What to compute) │ Classification, NER, etc.
└──────────┬───────────┘
│
▼
┌──────────────────────┐
│ Bridge │ ◄── Engine-specific adapter
│ (How to compute) │ DSPyClassification, OutlinesNER
└──────────┬───────────┘
│
▼
┌──────────────────────┐
│ ModelWrapper │ ◄── Internal inference handler
│ (Inference logic) │ Auto-detected from model type
└──────────┬───────────┘
│
▼
┌──────────────────────┐
│ Model │ ◄── User-provided.
│ (3rd party │ Any supported model
│ library instance) │
└──────────────────────┘
Responsibilities
Each layer has clear, focused responsibilities:
Task (User creates this for custom functionality):
- Defines what problem to solve (e.g., "classify text into sentiment categories")
- Provides task-specific configuration (labels, entity types, prompt instructions)
- Manages few-shot examples for in-context learning
- Handles optimization (prompt tuning) and distillation (model compression)
- Engine-agnostic: Same task API works with any backend (DSPy, Outlines, etc.)
Bridge (User implements this for custom tasks):
- Defines how to solve the problem for a specific engine
- Creates prompts (Jinja2 templates for Outlines, signatures for DSPy)
- Parses model outputs into structured results (Pydantic models)
- Integrates results into documents (
integrate()method) - Consolidates multi-chunk results into document-level results (
consolidate()method) - Engine-specific: Each engine needs its own Bridge implementation
Engine (Internal, automatically detected):
- Handles low-level inference mechanics (batching, generation, error handling)
- Manages model calls and streaming
- Applies generation settings (temperature, max_tokens, top_p)
- Users never interact with this directly - it's an implementation detail
Key Methods: integrate() vs consolidate()
These two methods serve different but complementary purposes in the processing pipeline:
integrate() - Stores raw results immediately after inference:
def integrate(self, results, docs):
# Called once per batch of chunks
# results = [Result1, Result2, Result3] # One per chunk
# docs = [Chunk1, Chunk2, Chunk3] # Corresponding chunks
for doc, result in zip(docs, results):
doc.results[self._task_id] = result.score # Store per-chunk
return docs
consolidate() - Merges chunk-level results into document-level results:
def consolidate(self, results, docs_offsets):
# Called after integrate(), once per original document
# results = [Result1, Result2, Result3] # All chunk results
# docs_offsets = [(0, 2), (2, 3)] # Chunk ranges per doc
for start, end in docs_offsets:
chunk_results = results[start:end] # Get chunks for this doc
avg_score = sum(r.score for r in chunk_results) / len(chunk_results)
yield ConsolidatedResult(score=avg_score) # One result per document
Why separate methods?
- Documents may exceed model context limits (e.g., 100-page PDFs vs 8K token limit)
- Sieves automatically splits long documents into chunks for processing
integrate()handles per-chunk results (stores immediately, no processing)consolidate()aggregates chunks back into per-document results (averaging, voting, etc.)
When to Create Custom Tasks
Use built-in tasks (Classification, NER, InformationExtraction, etc.) whenever possible. Only create custom tasks when:
✅ Novel task type: Your task doesn't fit any existing task (e.g., custom scoring, specialized extraction)
✅ Specialized consolidation: Your task requires unique logic for merging multi-chunk results (e.g., weighted averaging, consensus voting)
Don't create custom tasks for:
❌ Different prompts: Use prompt_instructions parameter on built-in tasks instead
❌ Different labels: Use task configuration (e.g., labels parameter in Classification)
❌ Different models: Just pass a different model instance to the existing task
Example - When you DON'T need a custom task:
# ❌ Bad: Creating entire custom task just to change the prompt
class MyCustomClassifier(Classification):
# 100+ lines of bridge implementation...
pass
# ✅ Good: Use built-in task with custom prompt
task = Classification(
labels=["positive", "negative", "neutral"],
model=model,
prompt_instructions="Analyze the text's sentiment carefully, "
"considering both explicit and implicit cues..." # Custom!
)
Now let's implement a custom task that actually needs custom logic: sentiment analysis with continuous scoring and reasoning.
1. Implement a Bridge
A Bridge defines how to solve a task for a certain engine. We decided to go with outlines as our engine (you can
allow multiple engines for a task by implementing corresponding bridges, but for simplicity's sake we'll stick with
DSPy only here).
A Bridge requires you to implement/specify the following:
- A prompt template (optional depending on the engine used).
- A prompt signature description (optional depending on the engine used).
- A prompt signature describing how results have to be structured.
- How to integrate results into docs.
- How to consolidate results from multiple doc chunks into one result per doc.
The inference mode (which defines how the engine queries the model and parses the results) is configured via GenerationSettings when creating the task, rather than in the Bridge.
We'll save this in sentiment_analysis_bridges.py.
Import Dependencies
First, import the required modules for building the bridge:
from collections.abc import Iterable
from functools import cached_property
import pydantic
from sieves.data import Doc
from sieves.engines import EngineInferenceMode, outlines_
from sieves.tasks.predictive.bridges import Bridge
Define the Output Schema
Define the structure of results using Pydantic. This specifies both the sentiment score and reasoning:
# This is how we require our response to look like - we require not just the score, but also a reasoning/justification
# for why this model assigns this score. We also force the score to be between 0 and 1.
class SentimentEstimate(pydantic.BaseModel):
reasoning: str
score: pydantic.confloat(ge=0, le=1)
The schema requires a reasoning explanation and a score between 0 and 1.
Create the Bridge Class
Start by defining the bridge class that will handle sentiment analysis. In this case we start with a bridge that will employ Outlines for our sentiment analysis task.
# This is the bridge class.
class OutlinesSentimentAnalysis(Bridge[SentimentEstimate, SentimentEstimate, outlines_.InferenceMode]):
Define the Prompt Template
The prompt template uses Jinja2 syntax to support few-shot examples:
# This defines the default prompt template as Jinja2 template string.
# We include an example block allowing us to include fewshot examples.
@property
def _default_prompt_instructions(self) -> str:
return """
Estimate the sentiment in this text as a float between 0 and 1. 0 is negative, 1 is positive. Provide your
reasoning for why you estimate this score before you output the score.
{% if examples|length > 0 -%}
Examples:
----------
{%- for example in examples %}
Text: "{{ example.text }}":
Output:
Reasoning: "{{ example.reasoning }}":
Sentiment: "{{ example.sentiment }}"
{% endfor -%}
----------
{% endif -%}
========
Text: {{ text }}
Output:
"""
This template instructs the model on how to estimate sentiment and allows for optional few-shot examples.
Configure Bridge Properties
Define the required properties that configure how the bridge behaves:
@property
def _prompt_example_template(self) -> str | None:
return None
@property
def _prompt_conclusion(self) -> str | None:
return None
@property
def inference_mode(self) -> outlines_.InferenceMode:
return self._generation_settings.inference_mode or outlines_.InferenceMode.json
# We return our SentimentEstimate as prompt signature.
@cached_property
def prompt_signature(self) -> type[pydantic.BaseModel]:
return SentimentEstimate
These properties specify the prompt signature (output structure) and inference mode.
Implement Result Integration
The integrate() method stores results into documents:
# We copy the result score into our doc's results attribute.
def integrate(self, results: Iterable[SentimentEstimate], docs: Iterable[Doc]) -> Iterable[Doc]:
for doc, result in zip(docs, results):
assert isinstance(result, SentimentEstimate)
# doc.results is a dict, with the task ID being the key to store our results under for the corresponding
# task.
doc.results[self._task_id] = result.score
return docs
This method extracts the sentiment score from each result and stores it in the document's results dictionary.
Implement Result Consolidation
The consolidate() method aggregates results from multiple document chunks:
# Consolidating multiple chunks for sentiment analysis: we compute the average score over
# all chunks and assume this to be the sentiment score for the entire document.
def consolidate(
self, results: Iterable[SentimentEstimate], docs_offsets: list[tuple[int, int]]
) -> Iterable[SentimentEstimate]:
results = list(results)
# docs_offsets contains (start, end) tuples indicating which result indices belong to which document.
# Example: [(0, 3), (3, 5)] means doc1 uses results[0:3], doc2 uses results[3:5]
# This mapping is necessary because long documents are split into multiple chunks for processing.
for doc_offset in docs_offsets:
# Accumulate reasonings and scores from all chunks of this document
reasonings: list[str] = []
scores = 0.
# Process each chunk's result for this document
for chunk_result in results[doc_offset[0] : doc_offset[1]]:
# Engines may return None results if they encounter errors in permissive mode.
# Skip None results to avoid crashes while still processing valid chunks.
if chunk_result:
assert isinstance(chunk_result, SentimentEstimate)
reasonings.append(chunk_result.reasoning)
scores += chunk_result.score
# Calculate how many chunks this document has
num_chunks = doc_offset[1] - doc_offset[0]
yield SentimentEstimate(
# Average the sentiment score across all chunks of this document
score=scores / num_chunks,
# Concatenate all chunk reasonings into a single string for the document
# (in production, you might want more sophisticated reasoning aggregation)
reasoning=str(reasonings)
)
For sentiment analysis, we compute the average score across chunks and concatenate all reasoning strings.
Our bridge now handles the complete workflow: prompting the model, parsing structured results, integrating them into documents, and consolidating multi-chunk results.
2. Build a SentimentAnalysisTask
The task class wraps the bridge from Section 1 and provides engine-agnostic functionality. It handles bridge instantiation, few-shot examples, and dataset export. We'll save this in sentiment_analysis_task.py.
Bridge Implementation
In a real project, you'd import the bridge from a separate module:
from sentiment_analysis_bridges import OutlinesSentimentAnalysis
For this guide's test to be self-contained, the complete code includes both the bridge and task implementation. Below, we show only the task-specific code that's new in this section.
Import Task Dependencies
Start with the core imports needed for the task wrapper:
from collections.abc import Iterable
from typing import Any
import datasets
import pydantic
from sieves.data import Doc
from sieves.engines import EngineType
from sieves.serialization import Config
from sieves.tasks.predictive.core import PredictiveTask
These imports provide the base classes and types needed to create a predictive task wrapper.
Define Few-Shot Example Schema
Define how few-shot examples should be structured:
# We'll define that class we require fewshot examples to be provided in. In our case we can just inherit from our
# prompt signature class and add a `text` property.
class FewshotExample(SentimentEstimate):
text: str
This allows users to provide training examples with text and expected sentiment.
Create the Task Class
Now create the main task class that uses the bridge:
class SentimentAnalysis(PredictiveTask[SentimentEstimate, SentimentEstimate, OutlinesSentimentAnalysis]):
Implement Bridge Initialization
Define how to initialize the bridge for supported engines:
# For the initialization of the bridge. We raise an error if an engine has been specified that we don't support (due
# to us not having a bridge implemented that would support this engine type).
def _init_bridge(self, engine_type: EngineType) -> OutlinesSentimentAnalysis:
if engine_type == EngineType.outlines:
return OutlinesSentimentAnalysis(
task_id=self._task_id,
prompt_instructions=self._custom_prompt_instructions,
overwrite=False,
generation_settings=self._generation_settings,
)
else:
raise KeyError(f"Engine type {engine_type} is not supported by {self.__class__.__name__}.")
# Represents set of supported engine types.
@property
def supports(self) -> set[EngineType]:
return {EngineType.outlines}
The task raises an error if an unsupported engine is specified.
Add Dataset Export (Optional)
Implement HuggingFace dataset export for analysis or distillation:
# This implements the conversion of a set of docs to a Hugging Face datasets.Dataset.
# You can implement this as `raise NotImplementedError` if you're not interested in generating a Hugging Face
# dataset from your result data.
def to_hf_dataset(self, docs: Iterable[Doc]) -> datasets.Dataset:
# Define metadata.
info = datasets.DatasetInfo(
description=f"Sentiment estimation dataset. Generated with sieves"
f"v{Config.get_version()}.",
features=datasets.Features({"text": datasets.Value("string"), "score": datasets.Value("float32")}),
)
def generate_data() -> Iterable[dict[str, Any]]:
"""Yields results as dicts.
:return Iterable[dict[str, Any]]: Results as dicts.
"""
for doc in docs:
yield {"text": doc.text, "score": doc.results[self._task_id]}
# Create dataset.
return datasets.Dataset.from_generator(generate_data, features=info.features, info=info)
And that's it! Our sentiment analysis task is complete and ready to use.
3. Running our task
We can now use our sentiment analysis task like every built-in task:
from sieves import Doc, Pipeline
import outlines
model = small_outlines_model
docs = [Doc(text="I'm feeling happy today."), Doc(text="I was sad yesterday.")]
pipe = Pipeline([SentimentAnalysis(model=model)])
for doc in pipe(docs):
print(doc.text, doc.results["SentimentAnalysis"])
Troubleshooting
Common Issues
Results not appearing in doc.results
Symptom: After running your task, doc.results[task_id] is empty or missing.
Possible causes:
integrate()not called correctly: Ensure your bridge'sintegrate()method stores results indoc.results[self._task_id]- Incorrect task_id: Verify you're using the correct task ID (check
task._task_id) - Engine returning None: The engine may be returning None results (e.g., due to generation errors in permissive mode)
Debug steps:
# Add debug logging to your integrate() method
def integrate(self, results, docs):
for doc, result in zip(docs, results):
if result is None:
print(f"WARNING: Got None result for doc: {doc.text[:50]}")
else:
print(f"Storing result: {result} for task {self._task_id}")
doc.results[self._task_id] = result
return docs
Type errors in consolidate()
Symptom: TypeError or AttributeError in the consolidate method.
Possible causes:
- Mismatched types: Results from integrate() don't match the type expected in consolidate()
- None results not handled: Some results may be None (from engine errors)
- Incorrect doc_offsets slicing: Check that you're using
results[doc_offset[0]:doc_offset[1]]correctly
Solution:
def consolidate(self, results, docs_offsets):
results = list(results)
for doc_offset in docs_offsets:
chunk_results = results[doc_offset[0]:doc_offset[1]]
# Filter out None results
valid_results = [r for r in chunk_results if r is not None]
if not valid_results:
# No valid results for this document
yield None
continue
# Your consolidation logic here
...
"Engine type X is not supported" error
Cause: You're trying to use an engine that your bridge doesn't support.
Solution: Either:
- Implement a bridge for that engine type
- Use a supported engine (check
task.supports) - Update
_init_bridge()to handle the engine type
# Check supported engines before creating task
from sieves.engines import EngineType
print(f"Supported engines: {task.supports}") # e.g., {EngineType.outlines}
Prompt template not rendering correctly
Symptom: Model outputs are unexpected or malformed.
Debug steps: 1. Check Jinja2 syntax: Ensure your template variables are correct 2. Validate few-shot examples: Ensure examples match your template's expected structure 3. Print the rendered prompt: Add debug logging to see what's actually sent to the model
# In your bridge, add this to see the rendered prompt:
@cached_property
def _prompt_template(self) -> str:
template = """..."""
print(f"Template: {template}")
return template
Best Practices
- Start simple: Begin with a basic bridge, test it, then add complexity
- Test consolidate() separately: Write unit tests with mock data to verify consolidation logic
- Handle None results: Always check for None in integrate() and consolidate()
- Use type hints: Proper typing helps catch errors early
- Add assertions: Use
assert isinstance(result, YourType)to catch type mismatches - Log generously: Add debug logging during development to track data flow
Related Guides
- Task Optimization - Optimize your custom tasks for better performance
- Task Distillation - Distill custom tasks using
to_hf_dataset() - Serialization - Save custom tasks (requires providing init_params for complex objects)
Custom Task Serialization
When saving pipelines with custom tasks, you'll need to provide initialization parameters for any complex objects (models, tokenizers, etc.) in init_params during load.