Skip to content

Pipeline

Pipelines orchestrate sequential execution of tasks and support two ways to define the sequence:

  • Verbose initialization using Pipeline([...]) (allows setting parameters like use_cache)
  • Succinct chaining with + for readability

Examples

from sieves import Pipeline, tasks

# Verbose initialization (allows non-default configuration).
t_ingest = tasks.preprocessing.Ingestion(export_format="markdown")
t_chunk = tasks.preprocessing.Chunking(chunker)
t_cls = tasks.predictive.Classification(labels=["science", "politics"], model=engine)
pipe = Pipeline([t_ingest, t_chunk, t_cls], use_cache=True)

# Succinct chaining (equivalent task order).
pipe2 = t_ingest + t_chunk + t_cls

# You can also chain pipelines and tasks.
pipe_left = Pipeline([t_ingest])
pipe_right = Pipeline([t_chunk, t_cls])
pipe3 = pipe_left + pipe_right  # results in [t_ingest, t_chunk, t_cls]

# In-place append (mutates the left pipeline).
pipe_left += t_chunk
pipe_left += pipe_right  # appends all tasks from right

# Note:
# - Additional Pipeline parameters (e.g., use_cache=False) are only settable via the verbose form
# - Chaining never mutates existing tasks or pipelines; it creates a new Pipeline
# - Using "+=" mutates the existing pipeline by appending tasks

Note: Ingestion libraries (e.g., docling) are optional and not installed by default. Install them manually or via the extra:

pip install "sieves[ingestion]"

Pipeline.

Pipeline

Pipeline for executing tasks on documents.

Source code in sieves/pipeline/core.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
class Pipeline:
    """Pipeline for executing tasks on documents."""

    def __init__(
        self,
        tasks: Iterable[Task] | Task,
        use_cache: bool = True,
    ):
        """Initialize pipeline.

        :param tasks: List of tasks to execute.
        :param use_cache: If True, pipeline will build a cache over processed `Doc`s to ensure that no redundant
            requests will be sent to the model. If False, all `Doc`s will be processed from scratch, regardless of
            whether they have already been processed..
        """
        self._tasks = [tasks] if isinstance(tasks, Task) else list(tasks)
        self._use_cache = use_cache
        self._cache: dict[int, Doc] = {}
        self._cache_stats: dict[str, int] = {"total": 0, "unique": 0, "hits": 0, "misses": 0}
        self._validate_tasks()

    def add_tasks(self, tasks: Iterable[Task]) -> None:
        """Add tasks to pipeline. Revalidates pipeline.

        :param tasks: Tasks to be added.
        """
        self._tasks.extend(tasks)
        self._validate_tasks()

    @property
    def tasks(self) -> list[Task]:
        """Return tasks.

        :return: List of tasks.
        """
        return self._tasks

    @property
    def use_cache(self) -> bool:
        """Return whether pipeline uses cache.

        :return: Whether pipeline uses cache.
        """
        return self._use_cache

    def _validate_tasks(self) -> None:
        """Validate tasks.

        :raises ValueError: On pipeline component signature mismatch.
        """
        task_ids: set[str] = set()

        for i, task in enumerate(self._tasks):
            if task.id in task_ids:
                raise ValueError(f"Task with duplicate ID {task.id}. Ensure unique task IDs.")
            task_ids.add(task.id)

    def _get_unseen_unique_docs(self, docs: Iterable[Doc]) -> Iterable[Doc]:
        """Yield unseen, unique docs.

        I.e. those docs that are not in cache and that are unique within the provided
        collection.

        :param docs: Documents to process.
        """
        doc_hashes: set[int] = set()

        for doc in docs:
            assert doc.text or doc.uri
            doc_cache_id = hash(doc.text or doc.uri)

            if doc_cache_id not in self._cache and doc_cache_id not in doc_hashes:
                doc_hashes.add(doc_cache_id)
                self._cache_stats["unique"] += 1
                yield doc

    def __call__(self, docs: Iterable[Doc], in_place: bool = False) -> Iterable[Doc]:
        """Process a list of documents through all tasks.

        :param docs: Documents to process.
        :param in_place: Whether to modify documents in-place or create copies.
        :return Iterable[Doc]: Processed documents.
        """
        n_docs: int | None = len(docs) if isinstance(docs, Sized) else None
        docs_iters = itertools.tee(docs if in_place else (copy.deepcopy(doc) for doc in docs), 2)
        processed_docs = self._get_unseen_unique_docs(docs_iters[0]) if self._use_cache else docs_iters[0]

        for i, task in enumerate(self._tasks):
            processed_docs = task(processed_docs)

        # If returned docs are not iterators (e.g. returned as lists), get corresponding iterators.
        if not isinstance(processed_docs, Iterator):
            processed_docs = iter(processed_docs)

        # Iterate over all docs. Retrieve doc from cache if available, otherwise add to cache.
        for i, doc in tqdm.tqdm(enumerate(docs_iters[1]), desc="Running pipeline", total=n_docs):
            assert doc.text or doc.uri
            self._cache_stats["total"] += 1
            # Docs must either all have URIs or texts. Either is a sufficient identifier. If first task is Ingestion
            # and not all docs have IDs, pipeline fails. If first task is predictive and not all docs have texts,
            # pipeline fails.
            doc_cache_id = hash(doc.text or doc.uri)

            if doc_cache_id not in self._cache:
                # Update cache.
                self._cache_stats["misses"] += 1
                processed_doc = next(processed_docs)

                if self._use_cache:
                    self._cache[doc_cache_id] = processed_doc
            else:
                self._cache_stats["hits"] += 1
                processed_doc = self._cache[doc_cache_id]

            yield processed_doc

    def dump(self, path: Path | str) -> None:
        """Save pipeline config to disk.

        :param path: Target path.
        """
        self.serialize().dump(path)

    def clear_cache(self) -> None:
        """Clear cache."""
        self._cache.clear()
        self._cache_stats = {k: 0 for k in self._cache_stats}

    @classmethod
    def load(cls, path: Path | str, task_kwargs: Iterable[dict[str, Any]]) -> Pipeline:
        """Generate pipeline from disk.

        :param path: Path to config file.
        :param task_kwargs: Values to inject into loaded config.
        :return: Pipeline instance.
        """
        return cls.deserialize(Config.load(path), task_kwargs)

    def serialize(self) -> Config:
        """Serialize pipeline object.

        :return: Serialized pipeline representation.
        """
        return Config.create(
            self.__class__,
            {
                "tasks": Attribute(value=[task.serialize() for task in self._tasks]),
                "use_cache": Attribute(value=self._use_cache),
            },
        )

    @classmethod
    def deserialize(cls, config: Config, tasks_kwargs: Iterable[dict[str, Any]]) -> Pipeline:
        """Generate pipeline from config.

        :param config: Config to generate pipeline from.
        :param tasks_kwargs: Values to inject into task configs. One dict per task (dict can be empty).
        :return: Deserialized pipeline instance.
        """
        config.validate_init_params(cls)
        tasks_kwargs = tuple(tasks_kwargs)

        assert hasattr(config, "tasks")
        assert len(config.tasks.value) == len(tasks_kwargs), ValueError(
            f"len(tasks_kwargs) has to match the number of tasks in this pipeline ({len(config.tasks.value)}."
        )
        assert config.tasks.is_placeholder is False

        # Deserialize tasks.
        tasks: list[Task] = []
        for task_attr, task_kwargs in zip(config.tasks.value, tasks_kwargs):
            # Restore task config, if provided as dict.
            match task_attr:
                case dict():
                    task_config, task_cls = Config.from_dict(task_attr)
                case Config():
                    task_config = task_attr
                    task_cls = task_attr.config_cls
                case _:
                    raise TypeError(f"Deserialization can't handle configs of type {type(task_attr)}.")

            # Deserialize task.
            assert issubclass(task_cls, Serializable)
            assert issubclass(task_cls, Task)
            task = task_cls.deserialize(task_config, **task_kwargs)
            tasks.append(task)

        return cls(tasks=tasks)

    def __getitem__(self, task_id: str) -> Task:
        """Get task with this ID.

        :param task_id: ID of task to fetch.
        :return: Task with specified ID.
        :raises KeyError: If no task with such ID exists.
        """
        for task in self._tasks:
            if task.id == task_id:
                return task

        raise KeyError(f"No task with ID {task_id} exists in this pipeline.")

    def __add__(self, other: Task | Pipeline) -> Pipeline:
        """Chain this pipeline with another task or pipeline using ``+``.

        Returns a new pipeline that executes all tasks of this pipeline first,
        followed by the task(s) provided via ``other``. The original pipeline(s)
        and task(s) are not mutated.

        Cache semantics:
        - The resulting pipeline preserves this pipeline's ``use_cache`` setting
          regardless of whether ``other`` is a task or pipeline.

        :param other: A ``Task`` or another ``Pipeline`` to execute after this pipeline.
        :return: A new ``Pipeline`` representing the chained execution.
        :raises TypeError: If ``other`` is not a ``Task`` or ``Pipeline``.
        """
        if isinstance(other, Pipeline):
            return Pipeline(tasks=[*self._tasks, *other._tasks], use_cache=self._use_cache)

        if isinstance(other, Task):
            return Pipeline(tasks=[*self._tasks, other], use_cache=self._use_cache)

        raise TypeError(f"Cannot chain Pipeline with {type(other).__name__}")

    def __iadd__(self, other: Task | Pipeline) -> Pipeline:
        """Append a task or pipeline to this pipeline in-place using ``+=``.

        Extending with a pipeline appends all tasks from ``other``. Cache setting
        remains unchanged and follows this (left) pipeline.

        Revalidates the pipeline and updates distillation targets.

        :param other: Task or Pipeline to append.
        :return: This pipeline instance (mutated).
        :raises TypeError: If ``other`` is not a ``Task`` or ``Pipeline``.
        """
        if isinstance(other, Task):
            self._tasks.append(other)
        elif isinstance(other, Pipeline):
            self._tasks.extend(other._tasks)
        else:
            raise TypeError(f"Can only add Task or Pipeline to Pipeline with +=, got {type(other).__name__}")
        self._validate_tasks()

        return self

tasks property

Return tasks.

Returns:

Type Description
list[Task]

List of tasks.

use_cache property

Return whether pipeline uses cache.

Returns:

Type Description
bool

Whether pipeline uses cache.

__add__(other)

Chain this pipeline with another task or pipeline using +.

Returns a new pipeline that executes all tasks of this pipeline first, followed by the task(s) provided via other. The original pipeline(s) and task(s) are not mutated.

Cache semantics: - The resulting pipeline preserves this pipeline's use_cache setting regardless of whether other is a task or pipeline.

Parameters:

Name Type Description Default
other Task | Pipeline

A Task or another Pipeline to execute after this pipeline.

required

Returns:

Type Description
Pipeline

A new Pipeline representing the chained execution.

Raises:

Type Description
TypeError

If other is not a Task or Pipeline.

Source code in sieves/pipeline/core.py
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
def __add__(self, other: Task | Pipeline) -> Pipeline:
    """Chain this pipeline with another task or pipeline using ``+``.

    Returns a new pipeline that executes all tasks of this pipeline first,
    followed by the task(s) provided via ``other``. The original pipeline(s)
    and task(s) are not mutated.

    Cache semantics:
    - The resulting pipeline preserves this pipeline's ``use_cache`` setting
      regardless of whether ``other`` is a task or pipeline.

    :param other: A ``Task`` or another ``Pipeline`` to execute after this pipeline.
    :return: A new ``Pipeline`` representing the chained execution.
    :raises TypeError: If ``other`` is not a ``Task`` or ``Pipeline``.
    """
    if isinstance(other, Pipeline):
        return Pipeline(tasks=[*self._tasks, *other._tasks], use_cache=self._use_cache)

    if isinstance(other, Task):
        return Pipeline(tasks=[*self._tasks, other], use_cache=self._use_cache)

    raise TypeError(f"Cannot chain Pipeline with {type(other).__name__}")

__call__(docs, in_place=False)

Process a list of documents through all tasks.

Parameters:

Name Type Description Default
docs Iterable[Doc]

Documents to process.

required
in_place bool

Whether to modify documents in-place or create copies.

False

Returns:

Type Description
Iterable[Doc]

Processed documents.

Source code in sieves/pipeline/core.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
def __call__(self, docs: Iterable[Doc], in_place: bool = False) -> Iterable[Doc]:
    """Process a list of documents through all tasks.

    :param docs: Documents to process.
    :param in_place: Whether to modify documents in-place or create copies.
    :return Iterable[Doc]: Processed documents.
    """
    n_docs: int | None = len(docs) if isinstance(docs, Sized) else None
    docs_iters = itertools.tee(docs if in_place else (copy.deepcopy(doc) for doc in docs), 2)
    processed_docs = self._get_unseen_unique_docs(docs_iters[0]) if self._use_cache else docs_iters[0]

    for i, task in enumerate(self._tasks):
        processed_docs = task(processed_docs)

    # If returned docs are not iterators (e.g. returned as lists), get corresponding iterators.
    if not isinstance(processed_docs, Iterator):
        processed_docs = iter(processed_docs)

    # Iterate over all docs. Retrieve doc from cache if available, otherwise add to cache.
    for i, doc in tqdm.tqdm(enumerate(docs_iters[1]), desc="Running pipeline", total=n_docs):
        assert doc.text or doc.uri
        self._cache_stats["total"] += 1
        # Docs must either all have URIs or texts. Either is a sufficient identifier. If first task is Ingestion
        # and not all docs have IDs, pipeline fails. If first task is predictive and not all docs have texts,
        # pipeline fails.
        doc_cache_id = hash(doc.text or doc.uri)

        if doc_cache_id not in self._cache:
            # Update cache.
            self._cache_stats["misses"] += 1
            processed_doc = next(processed_docs)

            if self._use_cache:
                self._cache[doc_cache_id] = processed_doc
        else:
            self._cache_stats["hits"] += 1
            processed_doc = self._cache[doc_cache_id]

        yield processed_doc

__getitem__(task_id)

Get task with this ID.

Parameters:

Name Type Description Default
task_id str

ID of task to fetch.

required

Returns:

Type Description
Task

Task with specified ID.

Raises:

Type Description
KeyError

If no task with such ID exists.

Source code in sieves/pipeline/core.py
207
208
209
210
211
212
213
214
215
216
217
218
def __getitem__(self, task_id: str) -> Task:
    """Get task with this ID.

    :param task_id: ID of task to fetch.
    :return: Task with specified ID.
    :raises KeyError: If no task with such ID exists.
    """
    for task in self._tasks:
        if task.id == task_id:
            return task

    raise KeyError(f"No task with ID {task_id} exists in this pipeline.")

__iadd__(other)

Append a task or pipeline to this pipeline in-place using +=.

Extending with a pipeline appends all tasks from other. Cache setting remains unchanged and follows this (left) pipeline.

Revalidates the pipeline and updates distillation targets.

Parameters:

Name Type Description Default
other Task | Pipeline

Task or Pipeline to append.

required

Returns:

Type Description
Pipeline

This pipeline instance (mutated).

Raises:

Type Description
TypeError

If other is not a Task or Pipeline.

Source code in sieves/pipeline/core.py
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
def __iadd__(self, other: Task | Pipeline) -> Pipeline:
    """Append a task or pipeline to this pipeline in-place using ``+=``.

    Extending with a pipeline appends all tasks from ``other``. Cache setting
    remains unchanged and follows this (left) pipeline.

    Revalidates the pipeline and updates distillation targets.

    :param other: Task or Pipeline to append.
    :return: This pipeline instance (mutated).
    :raises TypeError: If ``other`` is not a ``Task`` or ``Pipeline``.
    """
    if isinstance(other, Task):
        self._tasks.append(other)
    elif isinstance(other, Pipeline):
        self._tasks.extend(other._tasks)
    else:
        raise TypeError(f"Can only add Task or Pipeline to Pipeline with +=, got {type(other).__name__}")
    self._validate_tasks()

    return self

__init__(tasks, use_cache=True)

Initialize pipeline.

Parameters:

Name Type Description Default
tasks Iterable[Task] | Task

List of tasks to execute.

required
use_cache bool

If True, pipeline will build a cache over processed Docs to ensure that no redundant requests will be sent to the model. If False, all Docs will be processed from scratch, regardless of whether they have already been processed..

True
Source code in sieves/pipeline/core.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def __init__(
    self,
    tasks: Iterable[Task] | Task,
    use_cache: bool = True,
):
    """Initialize pipeline.

    :param tasks: List of tasks to execute.
    :param use_cache: If True, pipeline will build a cache over processed `Doc`s to ensure that no redundant
        requests will be sent to the model. If False, all `Doc`s will be processed from scratch, regardless of
        whether they have already been processed..
    """
    self._tasks = [tasks] if isinstance(tasks, Task) else list(tasks)
    self._use_cache = use_cache
    self._cache: dict[int, Doc] = {}
    self._cache_stats: dict[str, int] = {"total": 0, "unique": 0, "hits": 0, "misses": 0}
    self._validate_tasks()

add_tasks(tasks)

Add tasks to pipeline. Revalidates pipeline.

Parameters:

Name Type Description Default
tasks Iterable[Task]

Tasks to be added.

required
Source code in sieves/pipeline/core.py
39
40
41
42
43
44
45
def add_tasks(self, tasks: Iterable[Task]) -> None:
    """Add tasks to pipeline. Revalidates pipeline.

    :param tasks: Tasks to be added.
    """
    self._tasks.extend(tasks)
    self._validate_tasks()

clear_cache()

Clear cache.

Source code in sieves/pipeline/core.py
141
142
143
144
def clear_cache(self) -> None:
    """Clear cache."""
    self._cache.clear()
    self._cache_stats = {k: 0 for k in self._cache_stats}

deserialize(config, tasks_kwargs) classmethod

Generate pipeline from config.

Parameters:

Name Type Description Default
config Config

Config to generate pipeline from.

required
tasks_kwargs Iterable[dict[str, Any]]

Values to inject into task configs. One dict per task (dict can be empty).

required

Returns:

Type Description
Pipeline

Deserialized pipeline instance.

Source code in sieves/pipeline/core.py
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
@classmethod
def deserialize(cls, config: Config, tasks_kwargs: Iterable[dict[str, Any]]) -> Pipeline:
    """Generate pipeline from config.

    :param config: Config to generate pipeline from.
    :param tasks_kwargs: Values to inject into task configs. One dict per task (dict can be empty).
    :return: Deserialized pipeline instance.
    """
    config.validate_init_params(cls)
    tasks_kwargs = tuple(tasks_kwargs)

    assert hasattr(config, "tasks")
    assert len(config.tasks.value) == len(tasks_kwargs), ValueError(
        f"len(tasks_kwargs) has to match the number of tasks in this pipeline ({len(config.tasks.value)}."
    )
    assert config.tasks.is_placeholder is False

    # Deserialize tasks.
    tasks: list[Task] = []
    for task_attr, task_kwargs in zip(config.tasks.value, tasks_kwargs):
        # Restore task config, if provided as dict.
        match task_attr:
            case dict():
                task_config, task_cls = Config.from_dict(task_attr)
            case Config():
                task_config = task_attr
                task_cls = task_attr.config_cls
            case _:
                raise TypeError(f"Deserialization can't handle configs of type {type(task_attr)}.")

        # Deserialize task.
        assert issubclass(task_cls, Serializable)
        assert issubclass(task_cls, Task)
        task = task_cls.deserialize(task_config, **task_kwargs)
        tasks.append(task)

    return cls(tasks=tasks)

dump(path)

Save pipeline config to disk.

Parameters:

Name Type Description Default
path Path | str

Target path.

required
Source code in sieves/pipeline/core.py
134
135
136
137
138
139
def dump(self, path: Path | str) -> None:
    """Save pipeline config to disk.

    :param path: Target path.
    """
    self.serialize().dump(path)

load(path, task_kwargs) classmethod

Generate pipeline from disk.

Parameters:

Name Type Description Default
path Path | str

Path to config file.

required
task_kwargs Iterable[dict[str, Any]]

Values to inject into loaded config.

required

Returns:

Type Description
Pipeline

Pipeline instance.

Source code in sieves/pipeline/core.py
146
147
148
149
150
151
152
153
154
@classmethod
def load(cls, path: Path | str, task_kwargs: Iterable[dict[str, Any]]) -> Pipeline:
    """Generate pipeline from disk.

    :param path: Path to config file.
    :param task_kwargs: Values to inject into loaded config.
    :return: Pipeline instance.
    """
    return cls.deserialize(Config.load(path), task_kwargs)

serialize()

Serialize pipeline object.

Returns:

Type Description
Config

Serialized pipeline representation.

Source code in sieves/pipeline/core.py
156
157
158
159
160
161
162
163
164
165
166
167
def serialize(self) -> Config:
    """Serialize pipeline object.

    :return: Serialized pipeline representation.
    """
    return Config.create(
        self.__class__,
        {
            "tasks": Attribute(value=[task.serialize() for task in self._tasks]),
            "use_cache": Attribute(value=self._use_cache),
        },
    )