Skip to content

NaiveChunker

Allows chunking of documents into segments.

NaiveChunker

Bases: Task

Chunks by sentence counts. Only for test purposes.

Source code in sieves/tasks/preprocessing/chunking/naive.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
class NaiveChunker(Task):
    """Chunks by sentence counts. Only for test purposes."""

    def __init__(
        self,
        interval: int,
        task_id: str | None = None,
        include_meta: bool = False,
        batch_size: int = -1,
    ):
        """Initialize chunker.

        :param interval: Token count interval for chunks.
        :param task_id: Task ID.
        :param include_meta: Whether to include meta information generated by the task.
        :param batch_size: Batch size to use for processing. Use -1 to process all documents at once.
        """
        super().__init__(task_id=task_id, include_meta=include_meta, batch_size=batch_size)
        self._interval = interval

    def __call__(self, docs: Iterable[Doc]) -> Iterable[Doc]:
        """Split documents into chunks.

        :param docs: Documents to split.
        :return: Split documents.
        """
        batch_size = self._batch_size if self._batch_size > 0 else sys.maxsize
        while docs_batch := [doc for doc in itertools.islice(docs, batch_size)]:
            if len(docs_batch) == 0:
                break

            for doc in docs_batch:
                assert doc.text
                sentences = [sent for sent in re.split("[?!.]", doc.text) if len(sent.strip())]
                doc.chunks = [
                    ".".join(sentences[i : i + self._interval]) for i in range(0, len(sentences), self._interval)
                ]

                yield doc

    @property
    def _state(self) -> dict[str, Any]:
        return {
            **super()._state,
            "interval": self._interval,
        }

id property

Return task ID.

Used by pipeline for results and dependency management.

Returns:

Type Description
str

Task ID.

__add__(other)

Chain this task with another task or pipeline using the + operator.

This returns a new Pipeline that executes this task first, followed by the task(s) in other. The original task(s)/pipeline are not mutated.

Cache semantics: - If other is a Pipeline, the resulting pipeline adopts other's use_cache setting (because the left-hand side is a single task). - If other is a Task, the resulting pipeline defaults to use_cache=True.

Parameters:

Name Type Description Default
other Task | Pipeline

A Task or Pipeline to execute after this task.

required

Returns:

Type Description
Pipeline

A new Pipeline representing the chained execution.

Raises:

Type Description
TypeError

If other is not a Task or Pipeline.

Source code in sieves/tasks/core.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
def __add__(self, other: Task | Pipeline) -> Pipeline:
    """Chain this task with another task or pipeline using the ``+`` operator.

    This returns a new ``Pipeline`` that executes this task first, followed by the
    task(s) in ``other``. The original task(s)/pipeline are not mutated.

    Cache semantics:
    - If ``other`` is a ``Pipeline``, the resulting pipeline adopts ``other``'s
      ``use_cache`` setting (because the left-hand side is a single task).
    - If ``other`` is a ``Task``, the resulting pipeline defaults to ``use_cache=True``.

    :param other: A ``Task`` or ``Pipeline`` to execute after this task.
    :return: A new ``Pipeline`` representing the chained execution.
    :raises TypeError: If ``other`` is not a ``Task`` or ``Pipeline``.
    """
    # Lazy import to avoid circular dependency at module import time.
    from sieves.pipeline import Pipeline

    if isinstance(other, Pipeline):
        return Pipeline(tasks=[self, *other.tasks], use_cache=other.use_cache)

    if isinstance(other, Task):
        return Pipeline(tasks=[self, other])

    raise TypeError(f"Cannot chain Task with {type(other).__name__}")

__call__(docs)

Split documents into chunks.

Parameters:

Name Type Description Default
docs Iterable[Doc]

Documents to split.

required

Returns:

Type Description
Iterable[Doc]

Split documents.

Source code in sieves/tasks/preprocessing/chunking/naive.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def __call__(self, docs: Iterable[Doc]) -> Iterable[Doc]:
    """Split documents into chunks.

    :param docs: Documents to split.
    :return: Split documents.
    """
    batch_size = self._batch_size if self._batch_size > 0 else sys.maxsize
    while docs_batch := [doc for doc in itertools.islice(docs, batch_size)]:
        if len(docs_batch) == 0:
            break

        for doc in docs_batch:
            assert doc.text
            sentences = [sent for sent in re.split("[?!.]", doc.text) if len(sent.strip())]
            doc.chunks = [
                ".".join(sentences[i : i + self._interval]) for i in range(0, len(sentences), self._interval)
            ]

            yield doc

__init__(interval, task_id=None, include_meta=False, batch_size=-1)

Initialize chunker.

Parameters:

Name Type Description Default
interval int

Token count interval for chunks.

required
task_id str | None

Task ID.

None
include_meta bool

Whether to include meta information generated by the task.

False
batch_size int

Batch size to use for processing. Use -1 to process all documents at once.

-1
Source code in sieves/tasks/preprocessing/chunking/naive.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def __init__(
    self,
    interval: int,
    task_id: str | None = None,
    include_meta: bool = False,
    batch_size: int = -1,
):
    """Initialize chunker.

    :param interval: Token count interval for chunks.
    :param task_id: Task ID.
    :param include_meta: Whether to include meta information generated by the task.
    :param batch_size: Batch size to use for processing. Use -1 to process all documents at once.
    """
    super().__init__(task_id=task_id, include_meta=include_meta, batch_size=batch_size)
    self._interval = interval

deserialize(config, **kwargs) classmethod

Generate Task instance from config.

Parameters:

Name Type Description Default
config Config

Config to generate instance from.

required
kwargs dict[str, Any]

Values to inject into loaded config.

{}

Returns:

Type Description
Task

Deserialized Task instance.

Source code in sieves/tasks/core.py
 95
 96
 97
 98
 99
100
101
102
103
104
@classmethod
def deserialize(cls, config: Config, **kwargs: dict[str, Any]) -> Task:
    """Generate Task instance from config.

    :param config: Config to generate instance from.
    :param kwargs: Values to inject into loaded config.
    :return: Deserialized Task instance.
    """
    # Deserialize and inject engine.
    return cls(**config.to_init_dict(cls, **kwargs))

serialize()

Serialize task.

Returns:

Type Description
Config

Config instance.

Source code in sieves/tasks/core.py
88
89
90
91
92
93
def serialize(self) -> Config:
    """Serialize task.

    :return: Config instance.
    """
    return Config.create(self.__class__, {k: Attribute(value=v) for k, v in self._state.items()})