Skip to content

Marker

Note: This task depends on optional ingestion libraries that are not installed by default. You can install them via the ingestion extra, or install the library directly.

Examples:

pip install "sieves[ingestion]"   # installs ingestion deps via extra
# or install the library directly (e.g., the Marker PDF package)
pip install marker                 # or the appropriate marker package variant

Marker task for converting PDF documents to text.

Marker

Bases: Task

Marker task for converting PDF documents to text.

Source code in sieves/tasks/preprocessing/ingestion/marker_.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
class Marker(Task):
    """Marker task for converting PDF documents to text."""

    def __init__(
        self,
        converter: Converter | None = None,
        export_format: str = "markdown",
        task_id: str | None = None,
        include_meta: bool = False,
        batch_size: int = -1,
        extract_images: bool = False,
        condition: Callable[[Doc], bool] | None = None,
    ):
        """Initialize the Marker task.

        :param converter: Custom PdfConverter or TableConverter instance. If None, a default one will be created.
        :param export_format: Format to export the document in ("markdown", "html", or "json").
        :param task_id: Task ID.
        :param include_meta: Whether to include meta information generated by the task.
        :param batch_size: Batch size to use for processing. Use -1 to process all documents at once.
        :param extract_images: Whether to extract images from the PDF.
        :param condition: Optional callable that determines whether to process each document.
        """
        super().__init__(task_id=task_id, include_meta=include_meta, batch_size=batch_size, condition=condition)

        self._export_format = export_format
        self._converter = self._setup_converter(converter, self._export_format)
        self._extract_images = extract_images

    def _setup_converter(
        self, converter: PdfConverter | TableConverter | None, export_format: str
    ) -> PdfConverter | TableConverter:
        """Set up the converter with the specified renderer.

        :param converter: Custom converter instance or None.
        :param export_format: Format to export the document in.
        :return: Configured converter instance.
        """
        renderer: str = self._get_renderer(export_format)
        if converter is None:
            return PdfConverter(artifact_dict=create_model_dict(), renderer=renderer)

        # If a converter is provided, use its type but update the renderer
        if isinstance(converter, TableConverter):
            return TableConverter(artifact_dict=create_model_dict(), renderer=renderer)
        elif isinstance(converter, PdfConverter):
            return PdfConverter(artifact_dict=create_model_dict(), renderer=renderer)
        else:
            raise ValueError(f"Invalid converter type: {type(converter)}")

    def _get_renderer(self, export_format: str) -> str:
        """Get the renderer string based on the export format.

        :param export_format: Format to export the document in.
        :return: The renderer string.
        :raises ValueError: If the export format is invalid.
        """
        if export_format == "markdown":
            return "marker.renderers.markdown.MarkdownRenderer"
        elif export_format == "html":
            return "marker.renderers.html.HTMLRenderer"
        elif export_format == "json":
            return "marker.renderers.json.JSONRenderer"
        else:
            raise ValueError(f"Invalid export format: {export_format}")

    def _call(self, docs: Iterable[Doc]) -> Iterable[Doc]:
        """Process documents using Marker.

        :param docs: Documents to process.
        :return: Processed documents.
        """
        docs = list(docs)

        for doc in docs:
            # Convert URI to string if it's a Path
            uri = str(doc.uri) if isinstance(doc.uri, Path) else doc.uri
            # Process the document
            rendered = self._converter(uri)

            # Extract text and optionally images
            text, _, images = text_from_rendered(rendered)
            if self._extract_images:
                doc.images = images

            # Update document text
            doc.text = text

        for doc in docs:
            yield doc

    @property
    def _state(self) -> dict[str, Any]:
        """Get state for serialization.

        :return: State dictionary.
        """
        return {
            **super()._state,
            "converter": self._converter,
            "export_format": self._export_format,
            "extract_images": self._extract_images,
        }

id property

Return task ID.

Used by pipeline for results and dependency management.

Returns:

Type Description
str

Task ID.

__add__(other)

Chain this task with another task or pipeline using the + operator.

This returns a new Pipeline that executes this task first, followed by the task(s) in other. The original task(s)/pipeline are not mutated.

Cache semantics: - If other is a Pipeline, the resulting pipeline adopts other's use_cache setting (because the left-hand side is a single task). - If other is a Task, the resulting pipeline defaults to use_cache=True.

Parameters:

Name Type Description Default
other Task | Pipeline

A Task or Pipeline to execute after this task.

required

Returns:

Type Description
Pipeline

A new Pipeline representing the chained execution.

Raises:

Type Description
TypeError

If other is not a Task or Pipeline.

Source code in sieves/tasks/core.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def __add__(self, other: Task | Pipeline) -> Pipeline:
    """Chain this task with another task or pipeline using the ``+`` operator.

    This returns a new ``Pipeline`` that executes this task first, followed by the
    task(s) in ``other``. The original task(s)/pipeline are not mutated.

    Cache semantics:
    - If ``other`` is a ``Pipeline``, the resulting pipeline adopts ``other``'s
      ``use_cache`` setting (because the left-hand side is a single task).
    - If ``other`` is a ``Task``, the resulting pipeline defaults to ``use_cache=True``.

    :param other: A ``Task`` or ``Pipeline`` to execute after this task.
    :return: A new ``Pipeline`` representing the chained execution.
    :raises TypeError: If ``other`` is not a ``Task`` or ``Pipeline``.
    """
    # Lazy import to avoid circular dependency at module import time.
    from sieves.pipeline import Pipeline

    if isinstance(other, Pipeline):
        return Pipeline(tasks=[self, *other.tasks], use_cache=other.use_cache)

    if isinstance(other, Task):
        return Pipeline(tasks=[self, other])

    raise TypeError(f"Cannot chain Task with {type(other).__name__}")

__call__(docs)

Execute task with conditional logic.

Checks the condition for each document without materializing all docs upfront. Passes all documents that pass the condition to _call() for proper batching. Documents that fail the condition have results[task_id] set to None.

Parameters:

Name Type Description Default
docs Iterable[Doc]

Docs to process.

required

Returns:

Type Description
Iterable[Doc]

Processed docs (in original order).

Source code in sieves/tasks/core.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def __call__(self, docs: Iterable[Doc]) -> Iterable[Doc]:
    """Execute task with conditional logic.

    Checks the condition for each document without materializing all docs upfront.
    Passes all documents that pass the condition to _call() for proper batching.
    Documents that fail the condition have results[task_id] set to None.

    :param docs: Docs to process.
    :return: Processed docs (in original order).
    """
    # Create three independent iterators:
    #   1. Check which docs pass condition.
    #   2. Yield only passing docs to _call().
    #   3. Iterate and yield results in order.
    docs_iters = itertools.tee(docs, 3)

    # First pass: determine which docs pass the condition by index
    passing_indices: set[int] = set()

    for idx, doc in enumerate(docs_iters[0]):
        if self._condition is None or self._condition(doc):
            passing_indices.add(idx)

    # Process all passing docs together.
    processed = self._call(d for i, d in enumerate(docs_iters[1]) if i in passing_indices)
    processed_iter = iter(processed) if not isinstance(processed, Iterator) else processed

    # Iterate through original docs in order and yield results
    for idx, doc in enumerate(docs_iters[2]):
        if idx in passing_indices:
            # Doc passed condition - use processed result.
            yield next(processed_iter)
        else:
            # Doc failed condition - set None result and yield original.
            doc.results[self.id] = None
            yield doc

__init__(converter=None, export_format='markdown', task_id=None, include_meta=False, batch_size=-1, extract_images=False, condition=None)

Initialize the Marker task.

Parameters:

Name Type Description Default
converter Converter | None

Custom PdfConverter or TableConverter instance. If None, a default one will be created.

None
export_format str

Format to export the document in ("markdown", "html", or "json").

'markdown'
task_id str | None

Task ID.

None
include_meta bool

Whether to include meta information generated by the task.

False
batch_size int

Batch size to use for processing. Use -1 to process all documents at once.

-1
extract_images bool

Whether to extract images from the PDF.

False
condition Callable[[Doc], bool] | None

Optional callable that determines whether to process each document.

None
Source code in sieves/tasks/preprocessing/ingestion/marker_.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def __init__(
    self,
    converter: Converter | None = None,
    export_format: str = "markdown",
    task_id: str | None = None,
    include_meta: bool = False,
    batch_size: int = -1,
    extract_images: bool = False,
    condition: Callable[[Doc], bool] | None = None,
):
    """Initialize the Marker task.

    :param converter: Custom PdfConverter or TableConverter instance. If None, a default one will be created.
    :param export_format: Format to export the document in ("markdown", "html", or "json").
    :param task_id: Task ID.
    :param include_meta: Whether to include meta information generated by the task.
    :param batch_size: Batch size to use for processing. Use -1 to process all documents at once.
    :param extract_images: Whether to extract images from the PDF.
    :param condition: Optional callable that determines whether to process each document.
    """
    super().__init__(task_id=task_id, include_meta=include_meta, batch_size=batch_size, condition=condition)

    self._export_format = export_format
    self._converter = self._setup_converter(converter, self._export_format)
    self._extract_images = extract_images

deserialize(config, **kwargs) classmethod

Generate Task instance from config.

Parameters:

Name Type Description Default
config Config

Config to generate instance from.

required
kwargs dict[str, Any]

Values to inject into loaded config.

{}

Returns:

Type Description
Task

Deserialized Task instance.

Source code in sieves/tasks/core.py
144
145
146
147
148
149
150
151
152
153
@classmethod
def deserialize(cls, config: Config, **kwargs: dict[str, Any]) -> Task:
    """Generate Task instance from config.

    :param config: Config to generate instance from.
    :param kwargs: Values to inject into loaded config.
    :return: Deserialized Task instance.
    """
    # Deserialize and inject engine.
    return cls(**config.to_init_dict(cls, **kwargs))

serialize()

Serialize task.

Returns:

Type Description
Config

Config instance.

Source code in sieves/tasks/core.py
137
138
139
140
141
142
def serialize(self) -> Config:
    """Serialize task.

    :return: Config instance.
    """
    return Config.create(self.__class__, {k: Attribute(value=v) for k, v in self._state.items()})