Skip to content

Task

Bases: ABC

Abstract base class for tasks that can be executed on documents.

Source code in sieves/tasks/core.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
class Task(abc.ABC):
    """Abstract base class for tasks that can be executed on documents."""

    def __init__(self, task_id: str | None, include_meta: bool, batch_size: int):
        """
        Initiate new Task.

        :param task_id: Task ID.
        :param include_meta: Whether to include meta information generated by the task.
        :param batch_size: Batch size for processing documents. Use -1 to process all documents at once.
        """
        self._task_id = task_id if task_id else self.__class__.__name__
        self._include_meta = include_meta
        self._batch_size = batch_size

    @property
    def id(self) -> str:
        """Return task ID.

        Used by pipeline for results and dependency management.

        :return: Task ID.
        """
        return self._task_id

    @abc.abstractmethod
    def __call__(self, docs: Iterable[Doc]) -> Iterable[Doc]:
        """Execute task.

        :param docs: Docs to process.
        :return: Processed docs.
        """

    def __add__(self, other: Task | Pipeline) -> Pipeline:
        """Chain this task with another task or pipeline using the ``+`` operator.

        This returns a new ``Pipeline`` that executes this task first, followed by the
        task(s) in ``other``. The original task(s)/pipeline are not mutated.

        Cache semantics:
        - If ``other`` is a ``Pipeline``, the resulting pipeline adopts ``other``'s
          ``use_cache`` setting (because the left-hand side is a single task).
        - If ``other`` is a ``Task``, the resulting pipeline defaults to ``use_cache=True``.

        :param other: A ``Task`` or ``Pipeline`` to execute after this task.
        :return: A new ``Pipeline`` representing the chained execution.
        :raises TypeError: If ``other`` is not a ``Task`` or ``Pipeline``.
        """
        # Lazy import to avoid circular dependency at module import time.
        from sieves.pipeline import Pipeline

        if isinstance(other, Pipeline):
            return Pipeline(tasks=[self, *other.tasks], use_cache=other.use_cache)

        if isinstance(other, Task):
            return Pipeline(tasks=[self, other])

        raise TypeError(f"Cannot chain Task with {type(other).__name__}")

    @property
    def _state(self) -> dict[str, Any]:
        """Return attributes to serialize.

        :return: Dict of attributes to serialize.
        """
        return {
            "task_id": self._task_id,
            "include_meta": self._include_meta,
            "batch_size": self._batch_size,
        }

    def serialize(self) -> Config:
        """Serialize task.

        :return: Config instance.
        """
        return Config.create(self.__class__, {k: Attribute(value=v) for k, v in self._state.items()})

    @classmethod
    def deserialize(cls, config: Config, **kwargs: dict[str, Any]) -> Task:
        """Generate Task instance from config.

        :param config: Config to generate instance from.
        :param kwargs: Values to inject into loaded config.
        :return: Deserialized Task instance.
        """
        # Deserialize and inject engine.
        return cls(**config.to_init_dict(cls, **kwargs))

id property

Return task ID.

Used by pipeline for results and dependency management.

Returns:

Type Description
str

Task ID.

__add__(other)

Chain this task with another task or pipeline using the + operator.

This returns a new Pipeline that executes this task first, followed by the task(s) in other. The original task(s)/pipeline are not mutated.

Cache semantics: - If other is a Pipeline, the resulting pipeline adopts other's use_cache setting (because the left-hand side is a single task). - If other is a Task, the resulting pipeline defaults to use_cache=True.

Parameters:

Name Type Description Default
other Task | Pipeline

A Task or Pipeline to execute after this task.

required

Returns:

Type Description
Pipeline

A new Pipeline representing the chained execution.

Raises:

Type Description
TypeError

If other is not a Task or Pipeline.

Source code in sieves/tasks/core.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
def __add__(self, other: Task | Pipeline) -> Pipeline:
    """Chain this task with another task or pipeline using the ``+`` operator.

    This returns a new ``Pipeline`` that executes this task first, followed by the
    task(s) in ``other``. The original task(s)/pipeline are not mutated.

    Cache semantics:
    - If ``other`` is a ``Pipeline``, the resulting pipeline adopts ``other``'s
      ``use_cache`` setting (because the left-hand side is a single task).
    - If ``other`` is a ``Task``, the resulting pipeline defaults to ``use_cache=True``.

    :param other: A ``Task`` or ``Pipeline`` to execute after this task.
    :return: A new ``Pipeline`` representing the chained execution.
    :raises TypeError: If ``other`` is not a ``Task`` or ``Pipeline``.
    """
    # Lazy import to avoid circular dependency at module import time.
    from sieves.pipeline import Pipeline

    if isinstance(other, Pipeline):
        return Pipeline(tasks=[self, *other.tasks], use_cache=other.use_cache)

    if isinstance(other, Task):
        return Pipeline(tasks=[self, other])

    raise TypeError(f"Cannot chain Task with {type(other).__name__}")

__call__(docs) abstractmethod

Execute task.

Parameters:

Name Type Description Default
docs Iterable[Doc]

Docs to process.

required

Returns:

Type Description
Iterable[Doc]

Processed docs.

Source code in sieves/tasks/core.py
42
43
44
45
46
47
48
@abc.abstractmethod
def __call__(self, docs: Iterable[Doc]) -> Iterable[Doc]:
    """Execute task.

    :param docs: Docs to process.
    :return: Processed docs.
    """

__init__(task_id, include_meta, batch_size)

Initiate new Task.

Parameters:

Name Type Description Default
task_id str | None

Task ID.

required
include_meta bool

Whether to include meta information generated by the task.

required
batch_size int

Batch size for processing documents. Use -1 to process all documents at once.

required
Source code in sieves/tasks/core.py
20
21
22
23
24
25
26
27
28
29
30
def __init__(self, task_id: str | None, include_meta: bool, batch_size: int):
    """
    Initiate new Task.

    :param task_id: Task ID.
    :param include_meta: Whether to include meta information generated by the task.
    :param batch_size: Batch size for processing documents. Use -1 to process all documents at once.
    """
    self._task_id = task_id if task_id else self.__class__.__name__
    self._include_meta = include_meta
    self._batch_size = batch_size

deserialize(config, **kwargs) classmethod

Generate Task instance from config.

Parameters:

Name Type Description Default
config Config

Config to generate instance from.

required
kwargs dict[str, Any]

Values to inject into loaded config.

{}

Returns:

Type Description
Task

Deserialized Task instance.

Source code in sieves/tasks/core.py
 95
 96
 97
 98
 99
100
101
102
103
104
@classmethod
def deserialize(cls, config: Config, **kwargs: dict[str, Any]) -> Task:
    """Generate Task instance from config.

    :param config: Config to generate instance from.
    :param kwargs: Values to inject into loaded config.
    :return: Deserialized Task instance.
    """
    # Deserialize and inject engine.
    return cls(**config.to_init_dict(cls, **kwargs))

serialize()

Serialize task.

Returns:

Type Description
Config

Config instance.

Source code in sieves/tasks/core.py
88
89
90
91
92
93
def serialize(self) -> Config:
    """Serialize task.

    :return: Config instance.
    """
    return Config.create(self.__class__, {k: Attribute(value=v) for k, v in self._state.items()})