Skip to content

NaiveChunker

Allows chunking of documents into segments.

NaiveChunker

Bases: Task

Chunks by sentence counts. Only for test purposes.

Source code in sieves/tasks/preprocessing/chunking/naive.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
class NaiveChunker(Task):
    """Chunks by sentence counts. Only for test purposes."""

    def __init__(
        self,
        interval: int,
        task_id: str | None = None,
        include_meta: bool = False,
        batch_size: int = -1,
        condition: Callable[[Doc], bool] | None = None,
    ):
        """Initialize chunker.

        :param interval: Token count interval for chunks.
        :param task_id: Task ID.
        :param include_meta: Whether to include meta information generated by the task.
        :param batch_size: Batch size to use for processing. Use -1 to process all documents at once.
        :param condition: Optional callable that determines whether to process each document.
        """
        super().__init__(task_id=task_id, include_meta=include_meta, batch_size=batch_size, condition=condition)
        self._interval = interval

    def _call(self, docs: Iterable[Doc]) -> Iterable[Doc]:
        """Split documents into chunks.

        :param docs: Documents to split.
        :return: Split documents.
        """
        batch_size = self._batch_size if self._batch_size > 0 else sys.maxsize
        while docs_batch := [doc for doc in itertools.islice(docs, batch_size)]:
            if len(docs_batch) == 0:
                break

            for doc in docs_batch:
                assert doc.text
                sentences = [sent for sent in re.split("[?!.]", doc.text) if len(sent.strip())]
                doc.chunks = [
                    ".".join(sentences[i : i + self._interval]) for i in range(0, len(sentences), self._interval)
                ]

                yield doc

    @property
    def _state(self) -> dict[str, Any]:
        return {
            **super()._state,
            "interval": self._interval,
        }

id property

Return task ID.

Used by pipeline for results and dependency management.

Returns:

Type Description
str

Task ID.

__add__(other)

Chain this task with another task or pipeline using the + operator.

This returns a new Pipeline that executes this task first, followed by the task(s) in other. The original task(s)/pipeline are not mutated.

Cache semantics: - If other is a Pipeline, the resulting pipeline adopts other's use_cache setting (because the left-hand side is a single task). - If other is a Task, the resulting pipeline defaults to use_cache=True.

Parameters:

Name Type Description Default
other Task | Pipeline

A Task or Pipeline to execute after this task.

required

Returns:

Type Description
Pipeline

A new Pipeline representing the chained execution.

Raises:

Type Description
TypeError

If other is not a Task or Pipeline.

Source code in sieves/tasks/core.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def __add__(self, other: Task | Pipeline) -> Pipeline:
    """Chain this task with another task or pipeline using the ``+`` operator.

    This returns a new ``Pipeline`` that executes this task first, followed by the
    task(s) in ``other``. The original task(s)/pipeline are not mutated.

    Cache semantics:
    - If ``other`` is a ``Pipeline``, the resulting pipeline adopts ``other``'s
      ``use_cache`` setting (because the left-hand side is a single task).
    - If ``other`` is a ``Task``, the resulting pipeline defaults to ``use_cache=True``.

    :param other: A ``Task`` or ``Pipeline`` to execute after this task.
    :return: A new ``Pipeline`` representing the chained execution.
    :raises TypeError: If ``other`` is not a ``Task`` or ``Pipeline``.
    """
    # Lazy import to avoid circular dependency at module import time.
    from sieves.pipeline import Pipeline

    if isinstance(other, Pipeline):
        return Pipeline(tasks=[self, *other.tasks], use_cache=other.use_cache)

    if isinstance(other, Task):
        return Pipeline(tasks=[self, other])

    raise TypeError(f"Cannot chain Task with {type(other).__name__}")

__call__(docs)

Execute task with conditional logic.

Checks the condition for each document without materializing all docs upfront. Passes all documents that pass the condition to _call() for proper batching. Documents that fail the condition have results[task_id] set to None.

Parameters:

Name Type Description Default
docs Iterable[Doc]

Docs to process.

required

Returns:

Type Description
Iterable[Doc]

Processed docs (in original order).

Source code in sieves/tasks/core.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def __call__(self, docs: Iterable[Doc]) -> Iterable[Doc]:
    """Execute task with conditional logic.

    Checks the condition for each document without materializing all docs upfront.
    Passes all documents that pass the condition to _call() for proper batching.
    Documents that fail the condition have results[task_id] set to None.

    :param docs: Docs to process.
    :return: Processed docs (in original order).
    """
    # Create three independent iterators:
    #   1. Check which docs pass condition.
    #   2. Yield only passing docs to _call().
    #   3. Iterate and yield results in order.
    docs_iters = itertools.tee(docs, 3)

    # First pass: determine which docs pass the condition by index
    passing_indices: set[int] = set()

    for idx, doc in enumerate(docs_iters[0]):
        if self._condition is None or self._condition(doc):
            passing_indices.add(idx)

    # Process all passing docs together.
    processed = self._call(d for i, d in enumerate(docs_iters[1]) if i in passing_indices)
    processed_iter = iter(processed) if not isinstance(processed, Iterator) else processed

    # Iterate through original docs in order and yield results
    for idx, doc in enumerate(docs_iters[2]):
        if idx in passing_indices:
            # Doc passed condition - use processed result.
            yield next(processed_iter)
        else:
            # Doc failed condition - set None result and yield original.
            doc.results[self.id] = None
            yield doc

__init__(interval, task_id=None, include_meta=False, batch_size=-1, condition=None)

Initialize chunker.

Parameters:

Name Type Description Default
interval int

Token count interval for chunks.

required
task_id str | None

Task ID.

None
include_meta bool

Whether to include meta information generated by the task.

False
batch_size int

Batch size to use for processing. Use -1 to process all documents at once.

-1
condition Callable[[Doc], bool] | None

Optional callable that determines whether to process each document.

None
Source code in sieves/tasks/preprocessing/chunking/naive.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def __init__(
    self,
    interval: int,
    task_id: str | None = None,
    include_meta: bool = False,
    batch_size: int = -1,
    condition: Callable[[Doc], bool] | None = None,
):
    """Initialize chunker.

    :param interval: Token count interval for chunks.
    :param task_id: Task ID.
    :param include_meta: Whether to include meta information generated by the task.
    :param batch_size: Batch size to use for processing. Use -1 to process all documents at once.
    :param condition: Optional callable that determines whether to process each document.
    """
    super().__init__(task_id=task_id, include_meta=include_meta, batch_size=batch_size, condition=condition)
    self._interval = interval

deserialize(config, **kwargs) classmethod

Generate Task instance from config.

Parameters:

Name Type Description Default
config Config

Config to generate instance from.

required
kwargs dict[str, Any]

Values to inject into loaded config.

{}

Returns:

Type Description
Task

Deserialized Task instance.

Source code in sieves/tasks/core.py
144
145
146
147
148
149
150
151
152
153
@classmethod
def deserialize(cls, config: Config, **kwargs: dict[str, Any]) -> Task:
    """Generate Task instance from config.

    :param config: Config to generate instance from.
    :param kwargs: Values to inject into loaded config.
    :return: Deserialized Task instance.
    """
    # Deserialize and inject engine.
    return cls(**config.to_init_dict(cls, **kwargs))

serialize()

Serialize task.

Returns:

Type Description
Config

Config instance.

Source code in sieves/tasks/core.py
137
138
139
140
141
142
def serialize(self) -> Config:
    """Serialize task.

    :return: Config instance.
    """
    return Config.create(self.__class__, {k: Attribute(value=v) for k, v in self._state.items()})