Skip to content

Marker

Note: This task depends on optional ingestion libraries that are not installed by default. You can install them via the ingestion extra, or install the library directly.

Examples:

pip install "sieves[ingestion]"   # installs ingestion deps via extra
# or install the library directly (e.g., the Marker PDF package)
pip install marker                 # or the appropriate marker package variant

Marker task for converting PDF documents to text.

Marker

Bases: Task

Marker task for converting PDF documents to text.

Source code in sieves/tasks/preprocessing/ingestion/marker_.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
class Marker(Task):
    """Marker task for converting PDF documents to text."""

    def __init__(
        self,
        converter: Converter | None = None,
        export_format: str = "markdown",
        task_id: str | None = None,
        include_meta: bool = False,
        batch_size: int = -1,
        extract_images: bool = False,
    ):
        """Initialize the Marker task.

        :param converter: Custom PdfConverter or TableConverter instance. If None, a default one will be created.
        :param export_format: Format to export the document in ("markdown", "html", or "json").
        :param task_id: Task ID.
        :param include_meta: Whether to include meta information generated by the task.
        :param batch_size: Batch size to use for processing. Use -1 to process all documents at once.
        :param extract_images: Whether to extract images from the PDF.
        """
        super().__init__(task_id=task_id, include_meta=include_meta, batch_size=batch_size)

        self._export_format = export_format
        self._converter = self._setup_converter(converter, self._export_format)
        self._extract_images = extract_images

    def _setup_converter(
        self, converter: PdfConverter | TableConverter | None, export_format: str
    ) -> PdfConverter | TableConverter:
        """Set up the converter with the specified renderer.

        :param converter: Custom converter instance or None.
        :param export_format: Format to export the document in.
        :return: Configured converter instance.
        """
        renderer: str = self._get_renderer(export_format)
        if converter is None:
            return PdfConverter(artifact_dict=create_model_dict(), renderer=renderer)

        # If a converter is provided, use its type but update the renderer
        if isinstance(converter, TableConverter):
            return TableConverter(artifact_dict=create_model_dict(), renderer=renderer)
        elif isinstance(converter, PdfConverter):
            return PdfConverter(artifact_dict=create_model_dict(), renderer=renderer)
        else:
            raise ValueError(f"Invalid converter type: {type(converter)}")

    def _get_renderer(self, export_format: str) -> str:
        """Get the renderer string based on the export format.

        :param export_format: Format to export the document in.
        :return: The renderer string.
        :raises ValueError: If the export format is invalid.
        """
        if export_format == "markdown":
            return "marker.renderers.markdown.MarkdownRenderer"
        elif export_format == "html":
            return "marker.renderers.html.HTMLRenderer"
        elif export_format == "json":
            return "marker.renderers.json.JSONRenderer"
        else:
            raise ValueError(f"Invalid export format: {export_format}")

    def __call__(self, docs: Iterable[Doc]) -> Iterable[Doc]:
        """Process documents using Marker.

        :param docs: Documents to process.
        :return: Processed documents.
        """
        docs = list(docs)

        for doc in docs:
            # Convert URI to string if it's a Path
            uri = str(doc.uri) if isinstance(doc.uri, Path) else doc.uri
            # Process the document
            rendered = self._converter(uri)

            # Extract text and optionally images
            text, _, images = text_from_rendered(rendered)
            if self._extract_images:
                doc.images = images

            # Update document text
            doc.text = text

        for doc in docs:
            yield doc

    @property
    def _state(self) -> dict[str, Any]:
        """Get state for serialization.

        :return: State dictionary.
        """
        return {
            **super()._state,
            "converter": self._converter,
            "export_format": self._export_format,
            "extract_images": self._extract_images,
        }

id property

Return task ID.

Used by pipeline for results and dependency management.

Returns:

Type Description
str

Task ID.

__add__(other)

Chain this task with another task or pipeline using the + operator.

This returns a new Pipeline that executes this task first, followed by the task(s) in other. The original task(s)/pipeline are not mutated.

Cache semantics: - If other is a Pipeline, the resulting pipeline adopts other's use_cache setting (because the left-hand side is a single task). - If other is a Task, the resulting pipeline defaults to use_cache=True.

Parameters:

Name Type Description Default
other Task | Pipeline

A Task or Pipeline to execute after this task.

required

Returns:

Type Description
Pipeline

A new Pipeline representing the chained execution.

Raises:

Type Description
TypeError

If other is not a Task or Pipeline.

Source code in sieves/tasks/core.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
def __add__(self, other: Task | Pipeline) -> Pipeline:
    """Chain this task with another task or pipeline using the ``+`` operator.

    This returns a new ``Pipeline`` that executes this task first, followed by the
    task(s) in ``other``. The original task(s)/pipeline are not mutated.

    Cache semantics:
    - If ``other`` is a ``Pipeline``, the resulting pipeline adopts ``other``'s
      ``use_cache`` setting (because the left-hand side is a single task).
    - If ``other`` is a ``Task``, the resulting pipeline defaults to ``use_cache=True``.

    :param other: A ``Task`` or ``Pipeline`` to execute after this task.
    :return: A new ``Pipeline`` representing the chained execution.
    :raises TypeError: If ``other`` is not a ``Task`` or ``Pipeline``.
    """
    # Lazy import to avoid circular dependency at module import time.
    from sieves.pipeline import Pipeline

    if isinstance(other, Pipeline):
        return Pipeline(tasks=[self, *other.tasks], use_cache=other.use_cache)

    if isinstance(other, Task):
        return Pipeline(tasks=[self, other])

    raise TypeError(f"Cannot chain Task with {type(other).__name__}")

__call__(docs)

Process documents using Marker.

Parameters:

Name Type Description Default
docs Iterable[Doc]

Documents to process.

required

Returns:

Type Description
Iterable[Doc]

Processed documents.

Source code in sieves/tasks/preprocessing/ingestion/marker_.py
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def __call__(self, docs: Iterable[Doc]) -> Iterable[Doc]:
    """Process documents using Marker.

    :param docs: Documents to process.
    :return: Processed documents.
    """
    docs = list(docs)

    for doc in docs:
        # Convert URI to string if it's a Path
        uri = str(doc.uri) if isinstance(doc.uri, Path) else doc.uri
        # Process the document
        rendered = self._converter(uri)

        # Extract text and optionally images
        text, _, images = text_from_rendered(rendered)
        if self._extract_images:
            doc.images = images

        # Update document text
        doc.text = text

    for doc in docs:
        yield doc

__init__(converter=None, export_format='markdown', task_id=None, include_meta=False, batch_size=-1, extract_images=False)

Initialize the Marker task.

Parameters:

Name Type Description Default
converter Converter | None

Custom PdfConverter or TableConverter instance. If None, a default one will be created.

None
export_format str

Format to export the document in ("markdown", "html", or "json").

'markdown'
task_id str | None

Task ID.

None
include_meta bool

Whether to include meta information generated by the task.

False
batch_size int

Batch size to use for processing. Use -1 to process all documents at once.

-1
extract_images bool

Whether to extract images from the PDF.

False
Source code in sieves/tasks/preprocessing/ingestion/marker_.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
def __init__(
    self,
    converter: Converter | None = None,
    export_format: str = "markdown",
    task_id: str | None = None,
    include_meta: bool = False,
    batch_size: int = -1,
    extract_images: bool = False,
):
    """Initialize the Marker task.

    :param converter: Custom PdfConverter or TableConverter instance. If None, a default one will be created.
    :param export_format: Format to export the document in ("markdown", "html", or "json").
    :param task_id: Task ID.
    :param include_meta: Whether to include meta information generated by the task.
    :param batch_size: Batch size to use for processing. Use -1 to process all documents at once.
    :param extract_images: Whether to extract images from the PDF.
    """
    super().__init__(task_id=task_id, include_meta=include_meta, batch_size=batch_size)

    self._export_format = export_format
    self._converter = self._setup_converter(converter, self._export_format)
    self._extract_images = extract_images

deserialize(config, **kwargs) classmethod

Generate Task instance from config.

Parameters:

Name Type Description Default
config Config

Config to generate instance from.

required
kwargs dict[str, Any]

Values to inject into loaded config.

{}

Returns:

Type Description
Task

Deserialized Task instance.

Source code in sieves/tasks/core.py
 95
 96
 97
 98
 99
100
101
102
103
104
@classmethod
def deserialize(cls, config: Config, **kwargs: dict[str, Any]) -> Task:
    """Generate Task instance from config.

    :param config: Config to generate instance from.
    :param kwargs: Values to inject into loaded config.
    :return: Deserialized Task instance.
    """
    # Deserialize and inject engine.
    return cls(**config.to_init_dict(cls, **kwargs))

serialize()

Serialize task.

Returns:

Type Description
Config

Config instance.

Source code in sieves/tasks/core.py
88
89
90
91
92
93
def serialize(self) -> Config:
    """Serialize task.

    :return: Config instance.
    """
    return Config.create(self.__class__, {k: Attribute(value=v) for k, v in self._state.items()})