Skip to content

Task

Conditional Execution

All tasks support optional conditional execution through the condition parameter. This feature allows you to skip processing certain documents based on custom criteria without materializing all documents upfront.

Overview

The condition parameter accepts an optional callable with signature Callable[[Doc], bool]:

def condition(doc: Doc) -> bool:
    # Return True to process the document
    # Return False to skip it
    return True

Implementation Details

When a task is executed with a condition:

  1. Per-Document Evaluation: Each document is evaluated against the condition individually
  2. Lazy Batching: Only documents that pass the condition are batched together and sent to the task's _call() method
  3. Order Preservation: Documents are returned in their original order, even if some were skipped
  4. Result Storage: Skipped documents have results[task_id] = None

Examples

Skip Documents by Size

from sieves import tasks, Pipeline, Doc

# Only process documents longer than 100 characters
task = tasks.Classification(
    labels={
        "positive": "Positive sentiment or favorable opinion",
        "negative": "Negative sentiment or unfavorable opinion"
    },
    model=model,
    condition=lambda doc: len(doc.text or "") > 100
)

pipe = Pipeline([task])
docs = [Doc(text="short"), Doc(text="a very long document " * 10)]
results = list(pipe(docs))

# First doc: results[task.id] == None (skipped)
# Second doc: results[task.id] contains classification results

Skip Documents Based on Metadata

# Only process documents from specific sources
def should_process(doc: Doc) -> bool:
    return doc.meta.get("source") in ["source_a", "source_b"]

task = tasks.NER(
    entities={
        "PERSON": "Names of people, including first and last names",
        "LOCATION": "Geographic locations like cities, countries, and landmarks"
    },
    model=model,
    condition=should_process
)

Multiple Conditions in Pipeline

# Different conditions for different tasks
import_task = tasks.Ingestion(export_format="markdown")

# Only chunk long documents
chunking_task = tasks.Chunking(
    chunker,
    condition=lambda doc: len(doc.text or "") > 500
)

# Only classify chunked documents
classification_task = tasks.Classification(
    labels={
        "science": "Scientific content including research and facts",
        "fiction": "Fictional stories and creative writing"
    },
    model=model,
    condition=lambda doc: len(doc.text or "") > 500
)

pipe = Pipeline([import_task, chunking_task, classification_task])

Technical Notes

  • No Materialization: Documents are processed using iterators; passing documents are batched together without materializing the entire document collection upfront
  • Index-Based Tracking: The implementation uses document indices for efficient filtering and reordering
  • All Model wrappers Supported: Conditional execution works with all supported model libraries (DSPy, LangChain, Outlines, HuggingFace, GLiNER2, etc.)
  • Serialization: Non-callable condition values (like None) serialize naturally; callable conditions are serialized as placeholders

Confidence Values

All predictive tasks in sieves return confidence values alongside their predictions. These values represent the model's certainty in its output.

  • transformers & gliner2: Confidence scores are always present as they are derived from model logits.
  • LLMs (dspy, langchain, outlines): Confidence scores are self-reported by the model and may occasionally be None if the model fails to provide them in the requested format.

Calibration Note

LLM Confidence Calibration

When using an LLM (Large Language Model) as the underlying model (e.g., through DSPy, LangChain, or Outlines), the reported confidence scores should be interpreted with caution. LLMs are often poorly calibrated, meaning their self-reported confidence may not accurately reflect the actual probability of the prediction being correct. These scores are best used for relative ranking or as a heuristic rather than as absolute probabilities.


Bases: ABC

Abstract base class for tasks that can be executed on documents.

Source code in sieves/tasks/core.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
class Task(abc.ABC):
    """Abstract base class for tasks that can be executed on documents."""

    def __init__(
        self,
        task_id: str | None,
        include_meta: bool,
        batch_size: int,
        condition: Callable[[Doc], bool] | None = None,
    ):
        """
        Initiate new Task.

        :param task_id: Task ID.
        :param include_meta: Whether to include meta information generated by the task.
        :param batch_size: Batch size for processing documents. Use -1 to process all documents at once.
        :param condition: Optional callable that determines whether to process each document.
                          If provided, called with each Doc; if returns False, document is skipped
                          and results[task_id] is set to None.
        """
        self._task_id = task_id if task_id else self.__class__.__name__
        self._include_meta = include_meta
        self._batch_size = batch_size
        self._condition = condition

    @property
    def id(self) -> str:
        """Return task ID.

        Used by pipeline for results and dependency management.

        :return: Task ID.
        """
        return self._task_id

    def __call__(self, docs: Iterable[Doc]) -> Iterable[Doc]:
        """Execute task with conditional logic.

        Checks the condition for each document without materializing all docs upfront.
        Passes all documents that pass the condition to _call() for proper batching.
        Documents that fail the condition have results[task_id] set to None.

        :param docs: Docs to process.
        :return: Processed docs (in original order).
        """
        docs = iter(docs) if not isinstance(docs, Iterator) else docs

        # Materialize docs in batches. This doesn't incur additional memory overhead, as docs are materialized in
        # batches downstream anyway.
        batch_size = self._batch_size if self._batch_size > 0 else sys.maxsize
        while docs_batch := [doc for doc in itertools.islice(docs, batch_size)]:
            # First pass: determine which docs pass the condition by index.
            passing_indices: set[int] = {
                idx for idx, doc in enumerate(docs_batch) if self._condition is None or self._condition(doc)
            }

            # Process all passing docs in one batch.
            processed = self._call(d for i, d in enumerate(docs_batch) if i in passing_indices)
            processed_iter = iter(processed) if not isinstance(processed, Iterator) else processed

            # Iterate through original docs in order and yield results.
            for idx, doc in enumerate(docs_batch):
                if idx in passing_indices:
                    # Doc passed condition - use processed result.
                    yield next(processed_iter)
                else:
                    # Doc failed condition - set `None` result and yield original.
                    doc.results[self.id] = None
                    yield doc

    @abc.abstractmethod
    def _call(self, docs: Iterable[Doc]) -> Iterable[Doc]:
        """Execute task logic (to be implemented by subclasses).

        :param docs: Docs to process.
        :return: Processed docs.
        """

    def evaluate(self, docs: Iterable[Doc], judge: dspy.LM | None = None) -> TaskEvaluationReport:
        """Evaluate task performance.

        :param docs: Documents to evaluate.
        :param judge: Optional judge model for evaluation.
        :return: Evaluation report.
        """
        raise NotImplementedError(f"Evaluation not implemented for task {self.__class__.__name__}")

    def __add__(self, other: Task | Pipeline) -> Pipeline:
        """Chain this task with another task or pipeline using the ``+`` operator.

        This returns a new ``Pipeline`` that executes this task first, followed by the
        task(s) in ``other``. The original task(s)/pipeline are not mutated.

        Cache semantics:
        - If ``other`` is a ``Pipeline``, the resulting pipeline adopts ``other``'s
          ``use_cache`` setting (because the left-hand side is a single task).
        - If ``other`` is a ``Task``, the resulting pipeline defaults to ``use_cache=True``.

        :param other: A ``Task`` or ``Pipeline`` to execute after this task.
        :return: A new ``Pipeline`` representing the chained execution.
        :raises TypeError: If ``other`` is not a ``Task`` or ``Pipeline``.
        """
        # Lazy import to avoid circular dependency at module import time.
        from sieves.pipeline import Pipeline

        if isinstance(other, Pipeline):
            return Pipeline(tasks=[self, *other.tasks], use_cache=other.use_cache)

        if isinstance(other, Task):
            return Pipeline(tasks=[self, other])

        raise TypeError(f"Cannot chain Task with {type(other).__name__}")

    @property
    def _state(self) -> dict[str, Any]:
        """Return attributes to serialize.

        :return: Dict of attributes to serialize.
        """
        return {
            "task_id": self._task_id,
            "include_meta": self._include_meta,
            "batch_size": self._batch_size,
            "condition": self._condition,
        }

    def serialize(self) -> Config:
        """Serialize task.

        :return: Config instance.
        """
        return Config.create(self.__class__, {k: Attribute(value=v) for k, v in self._state.items()})

    @classmethod
    def deserialize(cls, config: Config, **kwargs: dict[str, Any]) -> Task:
        """Generate Task instance from config.

        :param config: Config to generate instance from.
        :param kwargs: Values to inject into loaded config.
        :return: Deserialized Task instance.
        """
        # Deserialize and inject model wrapper.
        return cls(**config.to_init_dict(cls, **kwargs))

id property

Return task ID.

Used by pipeline for results and dependency management.

Returns:

Type Description
str

Task ID.

__add__(other)

Chain this task with another task or pipeline using the + operator.

This returns a new Pipeline that executes this task first, followed by the task(s) in other. The original task(s)/pipeline are not mutated.

Cache semantics: - If other is a Pipeline, the resulting pipeline adopts other's use_cache setting (because the left-hand side is a single task). - If other is a Task, the resulting pipeline defaults to use_cache=True.

Parameters:

Name Type Description Default
other Task | Pipeline

A Task or Pipeline to execute after this task.

required

Returns:

Type Description
Pipeline

A new Pipeline representing the chained execution.

Raises:

Type Description
TypeError

If other is not a Task or Pipeline.

Source code in sieves/tasks/core.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
def __add__(self, other: Task | Pipeline) -> Pipeline:
    """Chain this task with another task or pipeline using the ``+`` operator.

    This returns a new ``Pipeline`` that executes this task first, followed by the
    task(s) in ``other``. The original task(s)/pipeline are not mutated.

    Cache semantics:
    - If ``other`` is a ``Pipeline``, the resulting pipeline adopts ``other``'s
      ``use_cache`` setting (because the left-hand side is a single task).
    - If ``other`` is a ``Task``, the resulting pipeline defaults to ``use_cache=True``.

    :param other: A ``Task`` or ``Pipeline`` to execute after this task.
    :return: A new ``Pipeline`` representing the chained execution.
    :raises TypeError: If ``other`` is not a ``Task`` or ``Pipeline``.
    """
    # Lazy import to avoid circular dependency at module import time.
    from sieves.pipeline import Pipeline

    if isinstance(other, Pipeline):
        return Pipeline(tasks=[self, *other.tasks], use_cache=other.use_cache)

    if isinstance(other, Task):
        return Pipeline(tasks=[self, other])

    raise TypeError(f"Cannot chain Task with {type(other).__name__}")

__call__(docs)

Execute task with conditional logic.

Checks the condition for each document without materializing all docs upfront. Passes all documents that pass the condition to _call() for proper batching. Documents that fail the condition have results[task_id] set to None.

Parameters:

Name Type Description Default
docs Iterable[Doc]

Docs to process.

required

Returns:

Type Description
Iterable[Doc]

Processed docs (in original order).

Source code in sieves/tasks/core.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def __call__(self, docs: Iterable[Doc]) -> Iterable[Doc]:
    """Execute task with conditional logic.

    Checks the condition for each document without materializing all docs upfront.
    Passes all documents that pass the condition to _call() for proper batching.
    Documents that fail the condition have results[task_id] set to None.

    :param docs: Docs to process.
    :return: Processed docs (in original order).
    """
    docs = iter(docs) if not isinstance(docs, Iterator) else docs

    # Materialize docs in batches. This doesn't incur additional memory overhead, as docs are materialized in
    # batches downstream anyway.
    batch_size = self._batch_size if self._batch_size > 0 else sys.maxsize
    while docs_batch := [doc for doc in itertools.islice(docs, batch_size)]:
        # First pass: determine which docs pass the condition by index.
        passing_indices: set[int] = {
            idx for idx, doc in enumerate(docs_batch) if self._condition is None or self._condition(doc)
        }

        # Process all passing docs in one batch.
        processed = self._call(d for i, d in enumerate(docs_batch) if i in passing_indices)
        processed_iter = iter(processed) if not isinstance(processed, Iterator) else processed

        # Iterate through original docs in order and yield results.
        for idx, doc in enumerate(docs_batch):
            if idx in passing_indices:
                # Doc passed condition - use processed result.
                yield next(processed_iter)
            else:
                # Doc failed condition - set `None` result and yield original.
                doc.results[self.id] = None
                yield doc

__init__(task_id, include_meta, batch_size, condition=None)

Initiate new Task.

Parameters:

Name Type Description Default
task_id str | None

Task ID.

required
include_meta bool

Whether to include meta information generated by the task.

required
batch_size int

Batch size for processing documents. Use -1 to process all documents at once.

required
condition Callable[[Doc], bool] | None

Optional callable that determines whether to process each document. If provided, called with each Doc; if returns False, document is skipped and results[task_id] is set to None.

None
Source code in sieves/tasks/core.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
def __init__(
    self,
    task_id: str | None,
    include_meta: bool,
    batch_size: int,
    condition: Callable[[Doc], bool] | None = None,
):
    """
    Initiate new Task.

    :param task_id: Task ID.
    :param include_meta: Whether to include meta information generated by the task.
    :param batch_size: Batch size for processing documents. Use -1 to process all documents at once.
    :param condition: Optional callable that determines whether to process each document.
                      If provided, called with each Doc; if returns False, document is skipped
                      and results[task_id] is set to None.
    """
    self._task_id = task_id if task_id else self.__class__.__name__
    self._include_meta = include_meta
    self._batch_size = batch_size
    self._condition = condition

deserialize(config, **kwargs) classmethod

Generate Task instance from config.

Parameters:

Name Type Description Default
config Config

Config to generate instance from.

required
kwargs dict[str, Any]

Values to inject into loaded config.

{}

Returns:

Type Description
Task

Deserialized Task instance.

Source code in sieves/tasks/core.py
154
155
156
157
158
159
160
161
162
163
@classmethod
def deserialize(cls, config: Config, **kwargs: dict[str, Any]) -> Task:
    """Generate Task instance from config.

    :param config: Config to generate instance from.
    :param kwargs: Values to inject into loaded config.
    :return: Deserialized Task instance.
    """
    # Deserialize and inject model wrapper.
    return cls(**config.to_init_dict(cls, **kwargs))

evaluate(docs, judge=None)

Evaluate task performance.

Parameters:

Name Type Description Default
docs Iterable[Doc]

Documents to evaluate.

required
judge LM | None

Optional judge model for evaluation.

None

Returns:

Type Description
TaskEvaluationReport

Evaluation report.

Source code in sieves/tasks/core.py
 99
100
101
102
103
104
105
106
def evaluate(self, docs: Iterable[Doc], judge: dspy.LM | None = None) -> TaskEvaluationReport:
    """Evaluate task performance.

    :param docs: Documents to evaluate.
    :param judge: Optional judge model for evaluation.
    :return: Evaluation report.
    """
    raise NotImplementedError(f"Evaluation not implemented for task {self.__class__.__name__}")

serialize()

Serialize task.

Returns:

Type Description
Config

Config instance.

Source code in sieves/tasks/core.py
147
148
149
150
151
152
def serialize(self) -> Config:
    """Serialize task.

    :return: Config instance.
    """
    return Config.create(self.__class__, {k: Attribute(value=v) for k, v in self._state.items()})