Skip to content

Ingestion

The Ingestion task loads documents from various sources (files, URLs) and formats.

Usage

from sieves import tasks

task = tasks.Ingestion(
    export_format="markdown",
)

Ingestion task implementation.

Ingestion

Bases: Task

Base class for Ingestion tasks that extract text from documents.

This unified interface allows different Ingestion converters to be used interchangeably.

Source code in sieves/tasks/preprocessing/ingestion/core.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
class Ingestion(Task):
    """Base class for Ingestion tasks that extract text from documents.

    This unified interface allows different Ingestion converters to be used interchangeably.
    """

    def __init__(
        self,
        converter: Converter | None = None,
        export_format: Literal["markdown", "html"] = "markdown",
        task_id: str | None = None,
        include_meta: bool = False,
        batch_size: int = -1,
        condition: Callable[[Doc], bool] | None = None,
        **kwargs: Any,
    ):
        """Initialize the Ingestion task.

        :param converter: The Ingestion converter to use. If None, tries to initialize in this order:
            - A `docling` converter with default values
            - A `marker` converter with default values
            If neither is possible due to a lack of installed requirements, raises a `ModuleNotFoundError`.
        :param task_id: Task ID.
        :param include_meta: Whether to include meta information generated by the task.
        :param batch_size: Batch size to use for processing. Use -1 to process all documents at once.
        :param condition: Optional callable that determines whether to process each document.
        :param kwargs: Additional arguments for specific `Ingestion` implementations.
        """
        super().__init__(task_id=task_id, include_meta=include_meta, batch_size=batch_size, condition=condition)
        self._export_format = export_format
        self._converter = converter
        self._kwargs = kwargs
        self._task = self._init_ingestion_task()

    def _init_ingestion_task(self) -> Task:
        """Initialize the bridge for the specific Ingestion implementation.

        :raises ValueError: On unsupported converter type.
        :return: Ingestion bridge implementation.
        """
        converter_type = type(self._converter)

        # Set converter type manually in predefined order, depending on which dependencies are installed.
        if self._converter is None:
            if docling_ is not None:
                converter_type = docling_.Converter
            elif marker_ is not None:
                converter_type = marker_.Converter.__args__[0]
            else:
                raise ModuleNotFoundError(
                    "None of the supported ingestion libraries (`docling`, `marker`) are installed. Install at least "
                    "one of them to run ingestion tasks."
                )
        assert converter_type is not NoneType

        # Identify the ingestion task that maps to the specified converter's type.
        converter_module_map = {
            docling_: getattr(docling_, "Docling", None),
            marker_: getattr(marker_, "Marker", None),
        }

        for module, ingestion_task_type in converter_module_map.items():
            if ingestion_task_type is None:
                continue

            assert hasattr(module, "Converter")
            try:
                module_converter_types = module.Converter.__args__
            except AttributeError:
                module_converter_types = (module.Converter,)

            if any(issubclass(converter_type, module_model_type) for module_model_type in module_converter_types):
                ingestion_task = ingestion_task_type(
                    converter=self._converter,
                    export_format=self._export_format,
                    task_id=self.id,
                    include_meta=self._include_meta,
                    batch_size=self._batch_size,
                    **self._kwargs,
                )
                assert isinstance(ingestion_task, Task)

                return ingestion_task

        raise ValueError(
            f"converter type {self._converter} is not supported. Please check the documentation and ensure that (1) "
            f"you're providing a supported converter type and that (2) the corresponding library is installed in your "
            f"environment."
        )

    def _call(self, docs: Iterable[Doc]) -> Iterable[Doc]:
        """Process documents with Ingestion to extract text.

        :param docs: Documents to process.
        :return: Processed documents with extracted text.
        """
        docs = list(docs)
        assert all(doc.uri for doc in docs), ValueError("Documents have to have a value for .uri.")
        result = self._task(docs)

        yield from result

    @property
    def _state(self) -> dict[str, Any]:
        """Returns attributes to serialize.

        :return: Dict of attributes to serialize.
        """
        return {
            **super()._state,
            "converter": self._converter,
            "export_format": self._export_format,
            **self._kwargs,
        }

    @classmethod
    def deserialize(cls, config: Config, **kwargs: dict[str, Any]) -> Ingestion:
        """
        Generate Ingestion instance from config.

        :param config: Config to generate instance from.
        :param kwargs: Values to inject into loaded config.
        :return: Deserialized Ingestion instance.
        """
        return cls(**config.to_init_dict(cls, **kwargs))

id property

Return task ID.

Used by pipeline for results and dependency management.

Returns:

Type Description
str

Task ID.

__add__(other)

Chain this task with another task or pipeline using the + operator.

This returns a new Pipeline that executes this task first, followed by the task(s) in other. The original task(s)/pipeline are not mutated.

Cache semantics: - If other is a Pipeline, the resulting pipeline adopts other's use_cache setting (because the left-hand side is a single task). - If other is a Task, the resulting pipeline defaults to use_cache=True.

Parameters:

Name Type Description Default
other Task | Pipeline

A Task or Pipeline to execute after this task.

required

Returns:

Type Description
Pipeline

A new Pipeline representing the chained execution.

Raises:

Type Description
TypeError

If other is not a Task or Pipeline.

Source code in sieves/tasks/core.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
def __add__(self, other: Task | Pipeline) -> Pipeline:
    """Chain this task with another task or pipeline using the ``+`` operator.

    This returns a new ``Pipeline`` that executes this task first, followed by the
    task(s) in ``other``. The original task(s)/pipeline are not mutated.

    Cache semantics:
    - If ``other`` is a ``Pipeline``, the resulting pipeline adopts ``other``'s
      ``use_cache`` setting (because the left-hand side is a single task).
    - If ``other`` is a ``Task``, the resulting pipeline defaults to ``use_cache=True``.

    :param other: A ``Task`` or ``Pipeline`` to execute after this task.
    :return: A new ``Pipeline`` representing the chained execution.
    :raises TypeError: If ``other`` is not a ``Task`` or ``Pipeline``.
    """
    # Lazy import to avoid circular dependency at module import time.
    from sieves.pipeline import Pipeline

    if isinstance(other, Pipeline):
        return Pipeline(tasks=[self, *other.tasks], use_cache=other.use_cache)

    if isinstance(other, Task):
        return Pipeline(tasks=[self, other])

    raise TypeError(f"Cannot chain Task with {type(other).__name__}")

__call__(docs)

Execute task with conditional logic.

Checks the condition for each document without materializing all docs upfront. Passes all documents that pass the condition to _call() for proper batching. Documents that fail the condition have results[task_id] set to None.

Parameters:

Name Type Description Default
docs Iterable[Doc]

Docs to process.

required

Returns:

Type Description
Iterable[Doc]

Processed docs (in original order).

Source code in sieves/tasks/core.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def __call__(self, docs: Iterable[Doc]) -> Iterable[Doc]:
    """Execute task with conditional logic.

    Checks the condition for each document without materializing all docs upfront.
    Passes all documents that pass the condition to _call() for proper batching.
    Documents that fail the condition have results[task_id] set to None.

    :param docs: Docs to process.
    :return: Processed docs (in original order).
    """
    docs = iter(docs) if not isinstance(docs, Iterator) else docs

    # Materialize docs in batches. This doesn't incur additional memory overhead, as docs are materialized in
    # batches downstream anyway.
    batch_size = self._batch_size if self._batch_size > 0 else sys.maxsize
    while docs_batch := [doc for doc in itertools.islice(docs, batch_size)]:
        # First pass: determine which docs pass the condition by index.
        passing_indices: set[int] = {
            idx for idx, doc in enumerate(docs_batch) if self._condition is None or self._condition(doc)
        }

        # Process all passing docs in one batch.
        processed = self._call(d for i, d in enumerate(docs_batch) if i in passing_indices)
        processed_iter = iter(processed) if not isinstance(processed, Iterator) else processed

        # Iterate through original docs in order and yield results.
        for idx, doc in enumerate(docs_batch):
            if idx in passing_indices:
                # Doc passed condition - use processed result.
                yield next(processed_iter)
            else:
                # Doc failed condition - set `None` result and yield original.
                doc.results[self.id] = None
                yield doc

__init__(converter=None, export_format='markdown', task_id=None, include_meta=False, batch_size=-1, condition=None, **kwargs)

Initialize the Ingestion task.

Parameters:

Name Type Description Default
converter Converter | None

The Ingestion converter to use. If None, tries to initialize in this order: - A docling converter with default values - A marker converter with default values If neither is possible due to a lack of installed requirements, raises a ModuleNotFoundError.

None
task_id str | None

Task ID.

None
include_meta bool

Whether to include meta information generated by the task.

False
batch_size int

Batch size to use for processing. Use -1 to process all documents at once.

-1
condition Callable[[Doc], bool] | None

Optional callable that determines whether to process each document.

None
kwargs Any

Additional arguments for specific Ingestion implementations.

{}
Source code in sieves/tasks/preprocessing/ingestion/core.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
def __init__(
    self,
    converter: Converter | None = None,
    export_format: Literal["markdown", "html"] = "markdown",
    task_id: str | None = None,
    include_meta: bool = False,
    batch_size: int = -1,
    condition: Callable[[Doc], bool] | None = None,
    **kwargs: Any,
):
    """Initialize the Ingestion task.

    :param converter: The Ingestion converter to use. If None, tries to initialize in this order:
        - A `docling` converter with default values
        - A `marker` converter with default values
        If neither is possible due to a lack of installed requirements, raises a `ModuleNotFoundError`.
    :param task_id: Task ID.
    :param include_meta: Whether to include meta information generated by the task.
    :param batch_size: Batch size to use for processing. Use -1 to process all documents at once.
    :param condition: Optional callable that determines whether to process each document.
    :param kwargs: Additional arguments for specific `Ingestion` implementations.
    """
    super().__init__(task_id=task_id, include_meta=include_meta, batch_size=batch_size, condition=condition)
    self._export_format = export_format
    self._converter = converter
    self._kwargs = kwargs
    self._task = self._init_ingestion_task()

deserialize(config, **kwargs) classmethod

Generate Ingestion instance from config.

Parameters:

Name Type Description Default
config Config

Config to generate instance from.

required
kwargs dict[str, Any]

Values to inject into loaded config.

{}

Returns:

Type Description
Ingestion

Deserialized Ingestion instance.

Source code in sieves/tasks/preprocessing/ingestion/core.py
132
133
134
135
136
137
138
139
140
141
@classmethod
def deserialize(cls, config: Config, **kwargs: dict[str, Any]) -> Ingestion:
    """
    Generate Ingestion instance from config.

    :param config: Config to generate instance from.
    :param kwargs: Values to inject into loaded config.
    :return: Deserialized Ingestion instance.
    """
    return cls(**config.to_init_dict(cls, **kwargs))

evaluate(docs, judge=None)

Evaluate task performance.

Parameters:

Name Type Description Default
docs Iterable[Doc]

Documents to evaluate.

required
judge LM | None

Optional judge model for evaluation.

None

Returns:

Type Description
TaskEvaluationReport

Evaluation report.

Source code in sieves/tasks/core.py
 99
100
101
102
103
104
105
106
def evaluate(self, docs: Iterable[Doc], judge: dspy.LM | None = None) -> TaskEvaluationReport:
    """Evaluate task performance.

    :param docs: Documents to evaluate.
    :param judge: Optional judge model for evaluation.
    :return: Evaluation report.
    """
    raise NotImplementedError(f"Evaluation not implemented for task {self.__class__.__name__}")

serialize()

Serialize task.

Returns:

Type Description
Config

Config instance.

Source code in sieves/tasks/core.py
147
148
149
150
151
152
def serialize(self) -> Config:
    """Serialize task.

    :return: Config instance.
    """
    return Config.create(self.__class__, {k: Attribute(value=v) for k, v in self._state.items()})