Skip to content

Chunking

The Chunking task splits long documents into smaller chunks, facilitating processing by models with limited context windows.

Usage

import chonkie
from sieves import tasks

# Define a chunker (e.g., using Chonkie)
chunker_model = chonkie.RecursiveChunker()

task = tasks.Chunking(
    chunker=chunker_model,
)

Chunking task.

Chunking

Bases: Task

Task for chunking documents using different strategies.

This task acts as a wrapper around specific chunker implementations, allowing for flexible configuration based on the provided chunker object or interval.

Source code in sieves/tasks/preprocessing/chunking/core.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
class Chunking(Task):
    """Task for chunking documents using different strategies.

    This task acts as a wrapper around specific chunker implementations,
    allowing for flexible configuration based on the provided chunker object or interval.
    """

    def __init__(
        self,
        chunker: _ChunkerArgType,
        task_id: str | None = None,
        include_meta: bool = False,
        batch_size: int = -1,
        condition: Callable[[Doc], bool] | None = None,
    ):
        """Initialize the Chunker task.

        :param chunker: The chunker instance (chonkie.BaseChunker) or the interval (int) for NaiveChunker.
        :param task_id: Task ID.
        :param include_meta: Whether to include meta information generated by the task.
        :param batch_size: Batch size to use for processing. Use -1 to process all documents at once.
        :param condition: Optional callable that determines whether to process each document.
        """
        super().__init__(task_id=task_id, include_meta=include_meta, batch_size=batch_size, condition=condition)
        self._chunker_arg = chunker
        self._task = self._init_chunker_task()

    def _init_chunker_task(self) -> _ChunkerType:
        """Initialize the specific chunker task based on the type of _chunker_arg.

        :return: Initialized chunker task instance.
        :raises TypeError: If the type of _chunker_arg is not supported.
        """
        chunker_task: _ChunkerType

        match self._chunker_arg:
            case chunker if isinstance(chunker, chonkie.BaseChunker):
                chunker_task = chunking.chonkie_.Chonkie(
                    chunker=chunker,
                    task_id=self.id,
                    include_meta=self._include_meta,
                    batch_size=self._batch_size,
                )
            case interval if isinstance(interval, int):
                chunker_task = chunking.naive.NaiveChunker(
                    interval=interval,
                    task_id=self.id,
                    include_meta=self._include_meta,
                    batch_size=self._batch_size,
                )
            case _:
                raise TypeError(
                    f"Unsupported type for 'chunker' argument: {type(self._chunker_arg)}. "
                    f"Expected chonkie.BaseChunker or int."
                )

        return chunker_task

    def _call(self, docs: Iterable[Doc]) -> Iterable[Doc]:
        """Process documents by chunking their text.

        :param docs: Documents to process.
        :return: Processed documents with chunks added.
        """
        docs_iters = itertools.tee(docs, 2)
        assert all(doc.text for doc in docs_iters[0]), ValueError("Documents have to have a value for .text.")
        yield from self._task(docs_iters[1])

    @property
    def _state(self) -> dict[str, Any]:
        """Return attributes to serialize.

        :return: Dict of attributes to serialize.
        """
        return {
            **super()._state,
            "chunker": self._chunker_arg,
        }

    @classmethod
    def deserialize(cls, config: Config, **kwargs: dict[str, Any]) -> Chunking:
        """Generate Chunker instance from config.

        :param config: Config to generate instance from.
        :param kwargs: Values to inject into loaded config.
        :return: Deserialized Chunker instance.
        """
        return cls(**config.to_init_dict(cls, **kwargs))

id property

Return task ID.

Used by pipeline for results and dependency management.

Returns:

Type Description
str

Task ID.

__add__(other)

Chain this task with another task or pipeline using the + operator.

This returns a new Pipeline that executes this task first, followed by the task(s) in other. The original task(s)/pipeline are not mutated.

Cache semantics: - If other is a Pipeline, the resulting pipeline adopts other's use_cache setting (because the left-hand side is a single task). - If other is a Task, the resulting pipeline defaults to use_cache=True.

Parameters:

Name Type Description Default
other Task | Pipeline

A Task or Pipeline to execute after this task.

required

Returns:

Type Description
Pipeline

A new Pipeline representing the chained execution.

Raises:

Type Description
TypeError

If other is not a Task or Pipeline.

Source code in sieves/tasks/core.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
def __add__(self, other: Task | Pipeline) -> Pipeline:
    """Chain this task with another task or pipeline using the ``+`` operator.

    This returns a new ``Pipeline`` that executes this task first, followed by the
    task(s) in ``other``. The original task(s)/pipeline are not mutated.

    Cache semantics:
    - If ``other`` is a ``Pipeline``, the resulting pipeline adopts ``other``'s
      ``use_cache`` setting (because the left-hand side is a single task).
    - If ``other`` is a ``Task``, the resulting pipeline defaults to ``use_cache=True``.

    :param other: A ``Task`` or ``Pipeline`` to execute after this task.
    :return: A new ``Pipeline`` representing the chained execution.
    :raises TypeError: If ``other`` is not a ``Task`` or ``Pipeline``.
    """
    # Lazy import to avoid circular dependency at module import time.
    from sieves.pipeline import Pipeline

    if isinstance(other, Pipeline):
        return Pipeline(tasks=[self, *other.tasks], use_cache=other.use_cache)

    if isinstance(other, Task):
        return Pipeline(tasks=[self, other])

    raise TypeError(f"Cannot chain Task with {type(other).__name__}")

__call__(docs)

Execute task with conditional logic.

Checks the condition for each document without materializing all docs upfront. Passes all documents that pass the condition to _call() for proper batching. Documents that fail the condition have results[task_id] set to None.

Parameters:

Name Type Description Default
docs Iterable[Doc]

Docs to process.

required

Returns:

Type Description
Iterable[Doc]

Processed docs (in original order).

Source code in sieves/tasks/core.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def __call__(self, docs: Iterable[Doc]) -> Iterable[Doc]:
    """Execute task with conditional logic.

    Checks the condition for each document without materializing all docs upfront.
    Passes all documents that pass the condition to _call() for proper batching.
    Documents that fail the condition have results[task_id] set to None.

    :param docs: Docs to process.
    :return: Processed docs (in original order).
    """
    docs = iter(docs) if not isinstance(docs, Iterator) else docs

    # Materialize docs in batches. This doesn't incur additional memory overhead, as docs are materialized in
    # batches downstream anyway.
    batch_size = self._batch_size if self._batch_size > 0 else sys.maxsize
    while docs_batch := [doc for doc in itertools.islice(docs, batch_size)]:
        # First pass: determine which docs pass the condition by index.
        passing_indices: set[int] = {
            idx for idx, doc in enumerate(docs_batch) if self._condition is None or self._condition(doc)
        }

        # Process all passing docs in one batch.
        processed = self._call(d for i, d in enumerate(docs_batch) if i in passing_indices)
        processed_iter = iter(processed) if not isinstance(processed, Iterator) else processed

        # Iterate through original docs in order and yield results.
        for idx, doc in enumerate(docs_batch):
            if idx in passing_indices:
                # Doc passed condition - use processed result.
                yield next(processed_iter)
            else:
                # Doc failed condition - set `None` result and yield original.
                doc.results[self.id] = None
                yield doc

__init__(chunker, task_id=None, include_meta=False, batch_size=-1, condition=None)

Initialize the Chunker task.

Parameters:

Name Type Description Default
chunker _ChunkerArgType

The chunker instance (chonkie.BaseChunker) or the interval (int) for NaiveChunker.

required
task_id str | None

Task ID.

None
include_meta bool

Whether to include meta information generated by the task.

False
batch_size int

Batch size to use for processing. Use -1 to process all documents at once.

-1
condition Callable[[Doc], bool] | None

Optional callable that determines whether to process each document.

None
Source code in sieves/tasks/preprocessing/chunking/core.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
def __init__(
    self,
    chunker: _ChunkerArgType,
    task_id: str | None = None,
    include_meta: bool = False,
    batch_size: int = -1,
    condition: Callable[[Doc], bool] | None = None,
):
    """Initialize the Chunker task.

    :param chunker: The chunker instance (chonkie.BaseChunker) or the interval (int) for NaiveChunker.
    :param task_id: Task ID.
    :param include_meta: Whether to include meta information generated by the task.
    :param batch_size: Batch size to use for processing. Use -1 to process all documents at once.
    :param condition: Optional callable that determines whether to process each document.
    """
    super().__init__(task_id=task_id, include_meta=include_meta, batch_size=batch_size, condition=condition)
    self._chunker_arg = chunker
    self._task = self._init_chunker_task()

deserialize(config, **kwargs) classmethod

Generate Chunker instance from config.

Parameters:

Name Type Description Default
config Config

Config to generate instance from.

required
kwargs dict[str, Any]

Values to inject into loaded config.

{}

Returns:

Type Description
Chunking

Deserialized Chunker instance.

Source code in sieves/tasks/preprocessing/chunking/core.py
100
101
102
103
104
105
106
107
108
@classmethod
def deserialize(cls, config: Config, **kwargs: dict[str, Any]) -> Chunking:
    """Generate Chunker instance from config.

    :param config: Config to generate instance from.
    :param kwargs: Values to inject into loaded config.
    :return: Deserialized Chunker instance.
    """
    return cls(**config.to_init_dict(cls, **kwargs))

evaluate(docs, judge=None)

Evaluate task performance.

Parameters:

Name Type Description Default
docs Iterable[Doc]

Documents to evaluate.

required
judge LM | None

Optional judge model for evaluation.

None

Returns:

Type Description
TaskEvaluationReport

Evaluation report.

Source code in sieves/tasks/core.py
 99
100
101
102
103
104
105
106
def evaluate(self, docs: Iterable[Doc], judge: dspy.LM | None = None) -> TaskEvaluationReport:
    """Evaluate task performance.

    :param docs: Documents to evaluate.
    :param judge: Optional judge model for evaluation.
    :return: Evaluation report.
    """
    raise NotImplementedError(f"Evaluation not implemented for task {self.__class__.__name__}")

serialize()

Serialize task.

Returns:

Type Description
Config

Config instance.

Source code in sieves/tasks/core.py
147
148
149
150
151
152
def serialize(self) -> Config:
    """Serialize task.

    :return: Config instance.
    """
    return Config.create(self.__class__, {k: Attribute(value=v) for k, v in self._state.items()})