Task
Conditional Execution
All tasks support optional conditional execution through the condition parameter. This feature allows you to skip processing certain documents based on custom criteria without materializing all documents upfront.
Overview
The condition parameter accepts an optional callable with signature Callable[[Doc], bool]:
def condition(doc: Doc) -> bool:
# Return True to process the document
# Return False to skip it
return True
Implementation Details
When a task is executed with a condition:
- Per-Document Evaluation: Each document is evaluated against the condition individually
- Lazy Batching: Only documents that pass the condition are batched together and sent to the task's
_call()method - Order Preservation: Documents are returned in their original order, even if some were skipped
- Result Storage: Skipped documents have
results[task_id] = None
Examples
Skip Documents by Size
from sieves import tasks, Pipeline, Doc
# Only process documents longer than 100 characters
task = tasks.Classification(
labels={
"positive": "Positive sentiment or favorable opinion",
"negative": "Negative sentiment or unfavorable opinion"
},
model=model,
condition=lambda doc: len(doc.text or "") > 100
)
pipe = Pipeline([task])
docs = [Doc(text="short"), Doc(text="a very long document " * 10)]
results = list(pipe(docs))
# First doc: results[task.id] == None (skipped)
# Second doc: results[task.id] contains classification results
Skip Documents Based on Metadata
# Only process documents from specific sources
def should_process(doc: Doc) -> bool:
return doc.meta.get("source") in ["source_a", "source_b"]
task = tasks.NER(
entities={
"PERSON": "Names of people, including first and last names",
"LOCATION": "Geographic locations like cities, countries, and landmarks"
},
model=model,
condition=should_process
)
Multiple Conditions in Pipeline
# Different conditions for different tasks
import_task = tasks.Ingestion(export_format="markdown")
# Only chunk long documents
chunking_task = tasks.Chunking(
chunker,
condition=lambda doc: len(doc.text or "") > 500
)
# Only classify chunked documents
classification_task = tasks.Classification(
labels={
"science": "Scientific content including research and facts",
"fiction": "Fictional stories and creative writing"
},
model=model,
condition=lambda doc: len(doc.text or "") > 500
)
pipe = Pipeline([import_task, chunking_task, classification_task])
Technical Notes
- No Materialization: Documents are processed using iterators; passing documents are batched together without materializing the entire document collection upfront
- Index-Based Tracking: The implementation uses document indices for efficient filtering and reordering
- All Model wrappers Supported: Conditional execution works with all supported model libraries (DSPy, LangChain, Outlines, HuggingFace, GLiNER2, etc.)
- Serialization: Non-callable condition values (like
None) serialize naturally; callable conditions are serialized as placeholders
Confidence Values
All predictive tasks in sieves return confidence values alongside their predictions. These values represent the model's certainty in its output.
transformers&gliner2: Confidence scores are always present as they are derived from model logits.- LLMs (
dspy,langchain,outlines): Confidence scores are self-reported by the model and may occasionally beNoneif the model fails to provide them in the requested format.
Calibration Note
LLM Confidence Calibration
When using an LLM (Large Language Model) as the underlying model (e.g., through DSPy, LangChain, or Outlines), the reported confidence scores should be interpreted with caution. LLMs are often poorly calibrated, meaning their self-reported confidence may not accurately reflect the actual probability of the prediction being correct. These scores are best used for relative ranking or as a heuristic rather than as absolute probabilities.
Bases: ABC
Abstract base class for tasks that can be executed on documents.
Source code in sieves/tasks/core.py
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 | |
id
property
Return task ID.
Used by pipeline for results and dependency management.
Returns:
| Type | Description |
|---|---|
str
|
Task ID. |
__add__(other)
Chain this task with another task or pipeline using the + operator.
This returns a new Pipeline that executes this task first, followed by the
task(s) in other. The original task(s)/pipeline are not mutated.
Cache semantics:
- If other is a Pipeline, the resulting pipeline adopts other's
use_cache setting (because the left-hand side is a single task).
- If other is a Task, the resulting pipeline defaults to use_cache=True.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
other
|
Task | Pipeline
|
A |
required |
Returns:
| Type | Description |
|---|---|
Pipeline
|
A new |
Raises:
| Type | Description |
|---|---|
TypeError
|
If |
Source code in sieves/tasks/core.py
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 | |
__call__(docs)
Execute task with conditional logic.
Checks the condition for each document without materializing all docs upfront. Passes all documents that pass the condition to _call() for proper batching. Documents that fail the condition have results[task_id] set to None.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
docs
|
Iterable[Doc]
|
Docs to process. |
required |
Returns:
| Type | Description |
|---|---|
Iterable[Doc]
|
Processed docs (in original order). |
Source code in sieves/tasks/core.py
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 | |
__init__(task_id, include_meta, batch_size, condition=None)
Initiate new Task.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task_id
|
str | None
|
Task ID. |
required |
include_meta
|
bool
|
Whether to include meta information generated by the task. |
required |
batch_size
|
int
|
Batch size for processing documents. Use -1 to process all documents at once. |
required |
condition
|
Callable[[Doc], bool] | None
|
Optional callable that determines whether to process each document. If provided, called with each Doc; if returns False, document is skipped and results[task_id] is set to None. |
None
|
Source code in sieves/tasks/core.py
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 | |
deserialize(config, **kwargs)
classmethod
Generate Task instance from config.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
Config
|
Config to generate instance from. |
required |
kwargs
|
dict[str, Any]
|
Values to inject into loaded config. |
{}
|
Returns:
| Type | Description |
|---|---|
Task
|
Deserialized Task instance. |
Source code in sieves/tasks/core.py
154 155 156 157 158 159 160 161 162 163 | |
evaluate(docs, judge=None)
Evaluate task performance.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
docs
|
Iterable[Doc]
|
Documents to evaluate. |
required |
judge
|
LM | None
|
Optional judge model for evaluation. |
None
|
Returns:
| Type | Description |
|---|---|
TaskEvaluationReport
|
Evaluation report. |
Source code in sieves/tasks/core.py
99 100 101 102 103 104 105 106 | |
serialize()
Serialize task.
Returns:
| Type | Description |
|---|---|
Config
|
Config instance. |
Source code in sieves/tasks/core.py
147 148 149 150 151 152 | |