Skip to content

Pipeline

Pipeline

Pipeline for executing tasks on documents.

Source code in sieves/pipeline/core.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
class Pipeline:
    """Pipeline for executing tasks on documents."""

    def __init__(
        self,
        tasks: Iterable[Task] | Task,
    ):
        """Initialize pipeline.
        :param tasks: List of tasks to execute.
        """
        self._tasks = [tasks] if isinstance(tasks, Task) else list(tasks)
        self._validate_tasks()

    def add_tasks(self, tasks: Iterable[Task]) -> None:
        """Adds tasks to pipeline. Revalidates pipeline.
        :param tasks: Tasks to be added.
        """
        self._tasks.extend(tasks)
        self._validate_tasks()

    def _validate_tasks(self) -> None:
        """Validate tasks.
        :raises: ValueError on pipeline component signature mismatch.
        """
        task_ids: set[str] = set()

        for i, task in enumerate(self._tasks):
            if task.id in task_ids:
                raise ValueError(f"Task with duplicate ID {task.id}. Ensure unique task IDs.")
            task_ids.add(task.id)

    def __call__(self, docs: Iterable[Doc], in_place: bool = False) -> Iterable[Doc]:
        """Process a list of documents through all tasks.

        :param docs: Documents to process.
        :param in_place: Whether to modify documents in-place or create copies.
        :return Iterable[Doc]: Processed documents.
        """
        processed_docs = docs if in_place else [copy.deepcopy(doc) for doc in docs]

        for i, task in enumerate(self._tasks):
            logger.info(f"Running task {task.id} ({i + 1}/{len(self._tasks)} tasks).")
            processed_docs = task(processed_docs)

        return processed_docs

    def dump(self, path: Path | str) -> None:
        """Save pipeline config to disk.
        :param path: Target path.
        """
        self.serialize().dump(path)

    @classmethod
    def load(cls, path: Path | str, task_kwargs: Iterable[dict[str, Any]]) -> Pipeline:
        """Generate pipeline from disk.
        :param path: Path to config file.
        :param task_kwargs: Values to inject into loaded config.
        :return: Pipeline instance.
        """
        return cls.deserialize(Config.load(path), task_kwargs)

    def serialize(self) -> Config:
        """Serializes pipeline object.
        :return: Serialized pipeline representation.
        """
        return Config.create(
            self.__class__,
            {"tasks": Attribute(value=[task.serialize() for task in self._tasks])},
        )

    @classmethod
    def deserialize(cls, config: Config, tasks_kwargs: Iterable[dict[str, Any]]) -> Pipeline:
        """Generates pipeline from config.
        :param config: Config to generate pipeline from.
        :param tasks_kwargs: Values to inject into task configs. One dict per task (dict can be empty).
        :return: Deserialized pipeline instance.
        """
        config.validate_init_params(cls)
        tasks_kwargs = tuple(tasks_kwargs)

        assert hasattr(config, "tasks")
        assert len(config.tasks.value) == len(tasks_kwargs), ValueError(
            f"len(tasks_kwargs) has to match the number of tasks in this pipeline ({len(config.tasks.value)}."
        )
        assert config.tasks.is_placeholder is False

        # Deserialize tasks.
        tasks: list[Task] = []
        for task_attr, task_kwargs in zip(config.tasks.value, tasks_kwargs):
            # Restore engine config for PredictiveTask config.
            if "engine" in task_attr:
                task_attr["engine"]["value"], engine_cls = Config.from_dict(task_attr["engine"]["value"])
            # Restore task config, if provided as dict.
            match task_attr:
                case dict():
                    task_config, task_cls = Config.from_dict(task_attr)
                case Config():
                    task_config = task_attr
                    task_cls = task_attr.config_cls
                case _:
                    raise TypeError(f"Deserialization can't handle configs of type {type(task_attr)}.")

            # Deserialize task.
            assert issubclass(task_cls, Serializable)
            assert issubclass(task_cls, Task)
            task = task_cls.deserialize(task_config, **task_kwargs)
            tasks.append(task)

        return cls(tasks=tasks)

    def __getitem__(self, task_id: str) -> Task:
        """Gets task with this ID.
        :param task_id: ID of task to fetch.
        :return: Task with specified ID.
        :raises: KeyError if no task with such ID exists.
        """
        for task in self._tasks:
            if task.id == task_id:
                return task

        raise KeyError(f"No task with ID {task_id} exists in this pipeline.")

__call__(docs, in_place=False)

Process a list of documents through all tasks.

Parameters:

Name Type Description Default
docs Iterable[Doc]

Documents to process.

required
in_place bool

Whether to modify documents in-place or create copies.

False

Returns:

Type Description
Iterable[Doc]

Processed documents.

Source code in sieves/pipeline/core.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def __call__(self, docs: Iterable[Doc], in_place: bool = False) -> Iterable[Doc]:
    """Process a list of documents through all tasks.

    :param docs: Documents to process.
    :param in_place: Whether to modify documents in-place or create copies.
    :return Iterable[Doc]: Processed documents.
    """
    processed_docs = docs if in_place else [copy.deepcopy(doc) for doc in docs]

    for i, task in enumerate(self._tasks):
        logger.info(f"Running task {task.id} ({i + 1}/{len(self._tasks)} tasks).")
        processed_docs = task(processed_docs)

    return processed_docs

__getitem__(task_id)

Gets task with this ID.

Parameters:

Name Type Description Default
task_id str

ID of task to fetch.

required

Returns:

Type Description
Task

Task with specified ID.

Source code in sieves/pipeline/core.py
125
126
127
128
129
130
131
132
133
134
135
def __getitem__(self, task_id: str) -> Task:
    """Gets task with this ID.
    :param task_id: ID of task to fetch.
    :return: Task with specified ID.
    :raises: KeyError if no task with such ID exists.
    """
    for task in self._tasks:
        if task.id == task_id:
            return task

    raise KeyError(f"No task with ID {task_id} exists in this pipeline.")

__init__(tasks)

Initialize pipeline.

Parameters:

Name Type Description Default
tasks Iterable[Task] | Task

List of tasks to execute.

required
Source code in sieves/pipeline/core.py
18
19
20
21
22
23
24
25
26
def __init__(
    self,
    tasks: Iterable[Task] | Task,
):
    """Initialize pipeline.
    :param tasks: List of tasks to execute.
    """
    self._tasks = [tasks] if isinstance(tasks, Task) else list(tasks)
    self._validate_tasks()

_validate_tasks()

Validate tasks.

Source code in sieves/pipeline/core.py
35
36
37
38
39
40
41
42
43
44
def _validate_tasks(self) -> None:
    """Validate tasks.
    :raises: ValueError on pipeline component signature mismatch.
    """
    task_ids: set[str] = set()

    for i, task in enumerate(self._tasks):
        if task.id in task_ids:
            raise ValueError(f"Task with duplicate ID {task.id}. Ensure unique task IDs.")
        task_ids.add(task.id)

add_tasks(tasks)

Adds tasks to pipeline. Revalidates pipeline.

Parameters:

Name Type Description Default
tasks Iterable[Task]

Tasks to be added.

required
Source code in sieves/pipeline/core.py
28
29
30
31
32
33
def add_tasks(self, tasks: Iterable[Task]) -> None:
    """Adds tasks to pipeline. Revalidates pipeline.
    :param tasks: Tasks to be added.
    """
    self._tasks.extend(tasks)
    self._validate_tasks()

deserialize(config, tasks_kwargs) classmethod

Generates pipeline from config.

Parameters:

Name Type Description Default
config Config

Config to generate pipeline from.

required
tasks_kwargs Iterable[dict[str, Any]]

Values to inject into task configs. One dict per task (dict can be empty).

required

Returns:

Type Description
Pipeline

Deserialized pipeline instance.

Source code in sieves/pipeline/core.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
@classmethod
def deserialize(cls, config: Config, tasks_kwargs: Iterable[dict[str, Any]]) -> Pipeline:
    """Generates pipeline from config.
    :param config: Config to generate pipeline from.
    :param tasks_kwargs: Values to inject into task configs. One dict per task (dict can be empty).
    :return: Deserialized pipeline instance.
    """
    config.validate_init_params(cls)
    tasks_kwargs = tuple(tasks_kwargs)

    assert hasattr(config, "tasks")
    assert len(config.tasks.value) == len(tasks_kwargs), ValueError(
        f"len(tasks_kwargs) has to match the number of tasks in this pipeline ({len(config.tasks.value)}."
    )
    assert config.tasks.is_placeholder is False

    # Deserialize tasks.
    tasks: list[Task] = []
    for task_attr, task_kwargs in zip(config.tasks.value, tasks_kwargs):
        # Restore engine config for PredictiveTask config.
        if "engine" in task_attr:
            task_attr["engine"]["value"], engine_cls = Config.from_dict(task_attr["engine"]["value"])
        # Restore task config, if provided as dict.
        match task_attr:
            case dict():
                task_config, task_cls = Config.from_dict(task_attr)
            case Config():
                task_config = task_attr
                task_cls = task_attr.config_cls
            case _:
                raise TypeError(f"Deserialization can't handle configs of type {type(task_attr)}.")

        # Deserialize task.
        assert issubclass(task_cls, Serializable)
        assert issubclass(task_cls, Task)
        task = task_cls.deserialize(task_config, **task_kwargs)
        tasks.append(task)

    return cls(tasks=tasks)

dump(path)

Save pipeline config to disk.

Parameters:

Name Type Description Default
path Path | str

Target path.

required
Source code in sieves/pipeline/core.py
61
62
63
64
65
def dump(self, path: Path | str) -> None:
    """Save pipeline config to disk.
    :param path: Target path.
    """
    self.serialize().dump(path)

load(path, task_kwargs) classmethod

Generate pipeline from disk.

Parameters:

Name Type Description Default
path Path | str

Path to config file.

required
task_kwargs Iterable[dict[str, Any]]

Values to inject into loaded config.

required

Returns:

Type Description
Pipeline

Pipeline instance.

Source code in sieves/pipeline/core.py
67
68
69
70
71
72
73
74
@classmethod
def load(cls, path: Path | str, task_kwargs: Iterable[dict[str, Any]]) -> Pipeline:
    """Generate pipeline from disk.
    :param path: Path to config file.
    :param task_kwargs: Values to inject into loaded config.
    :return: Pipeline instance.
    """
    return cls.deserialize(Config.load(path), task_kwargs)

serialize()

Serializes pipeline object.

Returns:

Type Description
Config

Serialized pipeline representation.

Source code in sieves/pipeline/core.py
76
77
78
79
80
81
82
83
def serialize(self) -> Config:
    """Serializes pipeline object.
    :return: Serialized pipeline representation.
    """
    return Config.create(
        self.__class__,
        {"tasks": Attribute(value=[task.serialize() for task in self._tasks])},
    )