Skip to content

unstructured

File preprocessing for converting raw files into documents.

Unstructured

Bases: Task

Parser wrapping the unstructured library to convert files into documents.

Source code in sieves/tasks/preprocessing/unstructured_.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
class Unstructured(Task):
    """Parser wrapping the unstructured library to convert files into documents."""

    def __init__(
        self,
        partition: PartitionType = unstructured.partition.auto.partition,
        cleaners: tuple[CleanerType, ...] = (),
        task_id: str | None = None,
        show_progress: bool = True,
        include_meta: bool = False,
        **kwargs: dict[str, Any],
    ):
        """Initialize the docling parser.
        :param partition: Function to use for partitioning.
        :param cleaners: Cleaning functions to apply.
        :param task_id: Task ID.
        :param show_progress: Whether to show progress bar for processed documents
        :param include_meta: Whether to include meta information generated by the task.
        :param kwargs: Kwargs to be supplied to partitioning call.
        """
        super().__init__(task_id=task_id, show_progress=show_progress, include_meta=include_meta)
        self._partition = partition
        self._partition_args = kwargs or {}
        self._cleaners = cleaners

        Unstructured._require()

    @staticmethod
    def _require() -> None:
        """Download all necessary resources that have to be installed from within Python."""
        # Some nltk resources seem necessary for basic functionality.
        for nltk_resource in ("punkt_tab", "averaged_perceptron_tagger_eng"):
            # Don't install if already available.
            try:
                nltk.data.find(nltk_resource)
            except LookupError:
                nltk.download(nltk_resource)

    def __call__(self, docs: Iterable[Doc]) -> Iterable[Doc]:
        """Parse resources using docling.
        :param docs: Resources to process.
        :return: Parsed documents.
        """
        docs = list(docs)

        # Validate docs.
        have_text = False
        for doc in docs:
            assert doc.uri, ValueError("Documents have to have a value for .uri.")
            if doc.text:
                have_text = True
        if have_text:
            warnings.warn(f"Task {self._task_id} is about to overwrite existing .text values.")

        # Wrap conversion in TQDM if progress should be shown.
        iterable = tqdm(docs, total=len(docs)) if self._show_progress else docs
        does_chunking = "chunking_strategy" in self._partition_args

        # import unstructured.documents.elements.Te
        for doc in iterable:
            try:
                # Parse and process document.
                parsed_resources: list[unstructured.documents.elements.Text] = self._partition(
                    doc.uri, **self._partition_args
                )

                # Apply specified cleaners.
                for cleaner in self._cleaners:
                    for pr in parsed_resources:
                        pr.apply(cleaner)

                # Integrate into Doc instances.
                if self._include_meta:
                    doc.meta |= {self.id: parsed_resources}

                # Use chunks.
                if does_chunking:
                    doc.chunks = [pr.text for pr in parsed_resources]

                # Merge texts from all elements into single string for the entire document.
                doc.text = "\n".join(resource.text for resource in parsed_resources)

            except FileNotFoundError as err:
                raise FileNotFoundError(
                    f"File at {doc.uri} not found. Ensure that this is a local file path - unstructured doesn't support"
                    f" loading files via network URIs."
                ) from err

        return docs

    @property
    def _state(self) -> dict[str, Any]:
        return {
            **super()._state,
            "partition": self._partition,
            "cleaners": self._cleaners,
            **self._partition_args,
        }

id property

Returns task ID. Used by pipeline for results and dependency management.

Returns:

Type Description
str

Task ID.

__call__(docs)

Parse resources using docling.

Parameters:

Name Type Description Default
docs Iterable[Doc]

Resources to process.

required

Returns:

Type Description
Iterable[Doc]

Parsed documents.

Source code in sieves/tasks/preprocessing/unstructured_.py
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def __call__(self, docs: Iterable[Doc]) -> Iterable[Doc]:
    """Parse resources using docling.
    :param docs: Resources to process.
    :return: Parsed documents.
    """
    docs = list(docs)

    # Validate docs.
    have_text = False
    for doc in docs:
        assert doc.uri, ValueError("Documents have to have a value for .uri.")
        if doc.text:
            have_text = True
    if have_text:
        warnings.warn(f"Task {self._task_id} is about to overwrite existing .text values.")

    # Wrap conversion in TQDM if progress should be shown.
    iterable = tqdm(docs, total=len(docs)) if self._show_progress else docs
    does_chunking = "chunking_strategy" in self._partition_args

    # import unstructured.documents.elements.Te
    for doc in iterable:
        try:
            # Parse and process document.
            parsed_resources: list[unstructured.documents.elements.Text] = self._partition(
                doc.uri, **self._partition_args
            )

            # Apply specified cleaners.
            for cleaner in self._cleaners:
                for pr in parsed_resources:
                    pr.apply(cleaner)

            # Integrate into Doc instances.
            if self._include_meta:
                doc.meta |= {self.id: parsed_resources}

            # Use chunks.
            if does_chunking:
                doc.chunks = [pr.text for pr in parsed_resources]

            # Merge texts from all elements into single string for the entire document.
            doc.text = "\n".join(resource.text for resource in parsed_resources)

        except FileNotFoundError as err:
            raise FileNotFoundError(
                f"File at {doc.uri} not found. Ensure that this is a local file path - unstructured doesn't support"
                f" loading files via network URIs."
            ) from err

    return docs

__init__(partition=unstructured.partition.auto.partition, cleaners=(), task_id=None, show_progress=True, include_meta=False, **kwargs)

Initialize the docling parser.

Parameters:

Name Type Description Default
partition PartitionType

Function to use for partitioning.

partition
cleaners tuple[CleanerType, ...]

Cleaning functions to apply.

()
task_id str | None

Task ID.

None
show_progress bool

Whether to show progress bar for processed documents

True
include_meta bool

Whether to include meta information generated by the task.

False
kwargs dict[str, Any]

Kwargs to be supplied to partitioning call.

{}
Source code in sieves/tasks/preprocessing/unstructured_.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
def __init__(
    self,
    partition: PartitionType = unstructured.partition.auto.partition,
    cleaners: tuple[CleanerType, ...] = (),
    task_id: str | None = None,
    show_progress: bool = True,
    include_meta: bool = False,
    **kwargs: dict[str, Any],
):
    """Initialize the docling parser.
    :param partition: Function to use for partitioning.
    :param cleaners: Cleaning functions to apply.
    :param task_id: Task ID.
    :param show_progress: Whether to show progress bar for processed documents
    :param include_meta: Whether to include meta information generated by the task.
    :param kwargs: Kwargs to be supplied to partitioning call.
    """
    super().__init__(task_id=task_id, show_progress=show_progress, include_meta=include_meta)
    self._partition = partition
    self._partition_args = kwargs or {}
    self._cleaners = cleaners

    Unstructured._require()

_require() staticmethod

Download all necessary resources that have to be installed from within Python.

Source code in sieves/tasks/preprocessing/unstructured_.py
46
47
48
49
50
51
52
53
54
55
@staticmethod
def _require() -> None:
    """Download all necessary resources that have to be installed from within Python."""
    # Some nltk resources seem necessary for basic functionality.
    for nltk_resource in ("punkt_tab", "averaged_perceptron_tagger_eng"):
        # Don't install if already available.
        try:
            nltk.data.find(nltk_resource)
        except LookupError:
            nltk.download(nltk_resource)

deserialize(config, **kwargs) classmethod

Generate Task instance from config.

Parameters:

Name Type Description Default
config Config

Config to generate instance from.

required
kwargs dict[str, Any]

Values to inject into loaded config.

{}

Returns:

Type Description
Task

Deserialized Task instance.

Source code in sieves/tasks/core.py
56
57
58
59
60
61
62
63
64
@classmethod
def deserialize(cls, config: Config, **kwargs: dict[str, Any]) -> Task:
    """Generate Task instance from config.
    :param config: Config to generate instance from.
    :param kwargs: Values to inject into loaded config.
    :return: Deserialized Task instance.
    """
    # Deserialize and inject engine.
    return cls(**config.to_init_dict(cls, **kwargs))

serialize()

Serializes task.

Returns:

Type Description
Config

Config instance.

Source code in sieves/tasks/core.py
50
51
52
53
54
def serialize(self) -> Config:
    """Serializes task.
    :return: Config instance.
    """
    return Config.create(self.__class__, {k: Attribute(value=v) for k, v in self._state.items()})