Skip to content

Docling

Docling is a tool for document parsing and ingestion.

Note: This task depends on optional ingestion libraries, which are not installed by default. Install them via the ingestion extra, or install the library directly.

Examples:

pip install "sieves[ingestion]"   # installs ingestion deps via extra
# or install the library directly
pip install docling

Usage

from sieves.tasks.preprocessing.ingestion.docling_ import Docling

# Basic usage
task = Docling(export_format="markdown")

Wrapper for Docling for the conversion of complex files into markdown.

Docling

Bases: Task

Parser wrapping the docling library to convert files into documents.

Source code in sieves/tasks/preprocessing/ingestion/docling_.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
class Docling(Task):
    """Parser wrapping the docling library to convert files into documents."""

    def __init__(
        self,
        converter: Converter | None = None,
        export_format: Literal["markdown", "html"] = "markdown",
        task_id: str | None = None,
        include_meta: bool = False,
        batch_size: int = -1,
        condition: Callable[[Doc], bool] | None = None,
    ):
        """Initialize the docling parser.

        :param converter: Docling parser instance.
        :param task_id: Task ID.
        :param include_meta: Whether to include meta information generated by the task.
        :param batch_size: Batch size to use for processing. Use -1 to process all documents at once.
        :param condition: Optional callable that determines whether to process each document.
        """
        super().__init__(task_id=task_id, include_meta=include_meta, batch_size=batch_size, condition=condition)
        self._converter = converter if converter else docling.document_converter.DocumentConverter()
        self._export_format = export_format

        # Set batch size in Docling performance settings.
        docling.datamodel.settings.settings.perf.doc_batch_size = (
            self._batch_size if self._batch_size > 0 else sys.maxsize
        )

    def _validate_docs(self, docs: Iterable[Doc]) -> None:
        """Validate documents.

        Raises warning if docs already have text values, as they will be overwritten.

        :param docs: Docs to validate.
        :raises ValueError: If documents do not have a URI.
        """
        have_text = False
        for doc in docs:
            assert doc.uri, ValueError("Documents have to have a value for .uri.")
            if doc.text:
                have_text = True
        if have_text:
            warnings.warn(f"Task {self._task_id} is about to overwrite existing .text values.")

    def _call(self, docs: Iterable[Doc]) -> Iterable[Doc]:
        """Parse resources using docling.

        :param docs: Resources to process.
        :return: Parsed documents.
        :raises ValueError: If documents failed to parse.
        """
        docs_iters = itertools.tee(docs, 3)
        self._validate_docs(docs_iters[0])

        for doc, parsed_resource in zip(
            docs_iters[2], self._converter.convert_all([resource.uri for resource in docs_iters[1]])
        ):
            try:
                if self._include_meta:
                    doc.meta |= {self.id: parsed_resource}
                if self._export_format == "html":
                    doc.text = parsed_resource.document.export_to_html()
                else:
                    doc.text = parsed_resource.document.export_to_markdown()

            except Exception as e:
                logger.error(f"Failed to parse file {doc.uri}: {str(e)}")
                raise e

            yield doc

    @property
    def _state(self) -> dict[str, Any]:
        return {
            **super()._state,
            "converter": self._converter,
            "export_format": self._export_format,
        }

id property

Return task ID.

Used by pipeline for results and dependency management.

Returns:

Type Description
str

Task ID.

__add__(other)

Chain this task with another task or pipeline using the + operator.

This returns a new Pipeline that executes this task first, followed by the task(s) in other. The original task(s)/pipeline are not mutated.

Cache semantics: - If other is a Pipeline, the resulting pipeline adopts other's use_cache setting (because the left-hand side is a single task). - If other is a Task, the resulting pipeline defaults to use_cache=True.

Parameters:

Name Type Description Default
other Task | Pipeline

A Task or Pipeline to execute after this task.

required

Returns:

Type Description
Pipeline

A new Pipeline representing the chained execution.

Raises:

Type Description
TypeError

If other is not a Task or Pipeline.

Source code in sieves/tasks/core.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
def __add__(self, other: Task | Pipeline) -> Pipeline:
    """Chain this task with another task or pipeline using the ``+`` operator.

    This returns a new ``Pipeline`` that executes this task first, followed by the
    task(s) in ``other``. The original task(s)/pipeline are not mutated.

    Cache semantics:
    - If ``other`` is a ``Pipeline``, the resulting pipeline adopts ``other``'s
      ``use_cache`` setting (because the left-hand side is a single task).
    - If ``other`` is a ``Task``, the resulting pipeline defaults to ``use_cache=True``.

    :param other: A ``Task`` or ``Pipeline`` to execute after this task.
    :return: A new ``Pipeline`` representing the chained execution.
    :raises TypeError: If ``other`` is not a ``Task`` or ``Pipeline``.
    """
    # Lazy import to avoid circular dependency at module import time.
    from sieves.pipeline import Pipeline

    if isinstance(other, Pipeline):
        return Pipeline(tasks=[self, *other.tasks], use_cache=other.use_cache)

    if isinstance(other, Task):
        return Pipeline(tasks=[self, other])

    raise TypeError(f"Cannot chain Task with {type(other).__name__}")

__call__(docs)

Execute task with conditional logic.

Checks the condition for each document without materializing all docs upfront. Passes all documents that pass the condition to _call() for proper batching. Documents that fail the condition have results[task_id] set to None.

Parameters:

Name Type Description Default
docs Iterable[Doc]

Docs to process.

required

Returns:

Type Description
Iterable[Doc]

Processed docs (in original order).

Source code in sieves/tasks/core.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def __call__(self, docs: Iterable[Doc]) -> Iterable[Doc]:
    """Execute task with conditional logic.

    Checks the condition for each document without materializing all docs upfront.
    Passes all documents that pass the condition to _call() for proper batching.
    Documents that fail the condition have results[task_id] set to None.

    :param docs: Docs to process.
    :return: Processed docs (in original order).
    """
    docs = iter(docs) if not isinstance(docs, Iterator) else docs

    # Materialize docs in batches. This doesn't incur additional memory overhead, as docs are materialized in
    # batches downstream anyway.
    batch_size = self._batch_size if self._batch_size > 0 else sys.maxsize
    while docs_batch := [doc for doc in itertools.islice(docs, batch_size)]:
        # First pass: determine which docs pass the condition by index.
        passing_indices: set[int] = {
            idx for idx, doc in enumerate(docs_batch) if self._condition is None or self._condition(doc)
        }

        # Process all passing docs in one batch.
        processed = self._call(d for i, d in enumerate(docs_batch) if i in passing_indices)
        processed_iter = iter(processed) if not isinstance(processed, Iterator) else processed

        # Iterate through original docs in order and yield results.
        for idx, doc in enumerate(docs_batch):
            if idx in passing_indices:
                # Doc passed condition - use processed result.
                yield next(processed_iter)
            else:
                # Doc failed condition - set `None` result and yield original.
                doc.results[self.id] = None
                yield doc

__init__(converter=None, export_format='markdown', task_id=None, include_meta=False, batch_size=-1, condition=None)

Initialize the docling parser.

Parameters:

Name Type Description Default
converter Converter | None

Docling parser instance.

None
task_id str | None

Task ID.

None
include_meta bool

Whether to include meta information generated by the task.

False
batch_size int

Batch size to use for processing. Use -1 to process all documents at once.

-1
condition Callable[[Doc], bool] | None

Optional callable that determines whether to process each document.

None
Source code in sieves/tasks/preprocessing/ingestion/docling_.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def __init__(
    self,
    converter: Converter | None = None,
    export_format: Literal["markdown", "html"] = "markdown",
    task_id: str | None = None,
    include_meta: bool = False,
    batch_size: int = -1,
    condition: Callable[[Doc], bool] | None = None,
):
    """Initialize the docling parser.

    :param converter: Docling parser instance.
    :param task_id: Task ID.
    :param include_meta: Whether to include meta information generated by the task.
    :param batch_size: Batch size to use for processing. Use -1 to process all documents at once.
    :param condition: Optional callable that determines whether to process each document.
    """
    super().__init__(task_id=task_id, include_meta=include_meta, batch_size=batch_size, condition=condition)
    self._converter = converter if converter else docling.document_converter.DocumentConverter()
    self._export_format = export_format

    # Set batch size in Docling performance settings.
    docling.datamodel.settings.settings.perf.doc_batch_size = (
        self._batch_size if self._batch_size > 0 else sys.maxsize
    )

deserialize(config, **kwargs) classmethod

Generate Task instance from config.

Parameters:

Name Type Description Default
config Config

Config to generate instance from.

required
kwargs dict[str, Any]

Values to inject into loaded config.

{}

Returns:

Type Description
Task

Deserialized Task instance.

Source code in sieves/tasks/core.py
154
155
156
157
158
159
160
161
162
163
@classmethod
def deserialize(cls, config: Config, **kwargs: dict[str, Any]) -> Task:
    """Generate Task instance from config.

    :param config: Config to generate instance from.
    :param kwargs: Values to inject into loaded config.
    :return: Deserialized Task instance.
    """
    # Deserialize and inject model wrapper.
    return cls(**config.to_init_dict(cls, **kwargs))

evaluate(docs, judge=None)

Evaluate task performance.

Parameters:

Name Type Description Default
docs Iterable[Doc]

Documents to evaluate.

required
judge LM | None

Optional judge model for evaluation.

None

Returns:

Type Description
TaskEvaluationReport

Evaluation report.

Source code in sieves/tasks/core.py
 99
100
101
102
103
104
105
106
def evaluate(self, docs: Iterable[Doc], judge: dspy.LM | None = None) -> TaskEvaluationReport:
    """Evaluate task performance.

    :param docs: Documents to evaluate.
    :param judge: Optional judge model for evaluation.
    :return: Evaluation report.
    """
    raise NotImplementedError(f"Evaluation not implemented for task {self.__class__.__name__}")

serialize()

Serialize task.

Returns:

Type Description
Config

Config instance.

Source code in sieves/tasks/core.py
147
148
149
150
151
152
def serialize(self) -> Config:
    """Serialize task.

    :return: Config instance.
    """
    return Config.create(self.__class__, {k: Attribute(value=v) for k, v in self._state.items()})