Skip to content

Creating Custom Tasks

This guide explains how to create custom tasks. sieves distinguishes two types of tasks: 1. Ordinary tasks inherit from Task. Pretty much their only requirement is to process a bunch of documents and output the same set of documents with their modifications. 2. Predictive tasks inherit from PredictiveTask (which inherits from Task). Those are for tasks using engines (i.e. zero-shot models). They are more complex, as they need to implement the required interface to integrate with at least one engines.

While there are a bunch of pre-built tasks available for you to use, you might want to write your own to match your use-case. This guide describes how to do that.

If you feel like your task might be useful for others, we'd happy to see you submit a PR!

Tasks

Inherit from Task whenever you want to implement something that doesn't require interacting with engines. That can be document pre- or postprocessing, or something completely different - you could e.g. run an agent following instructions provided in docs, and then follow this up with a subsequent task in your pipeline analyzing and structuring those results.

To create a basic custom task, inherit from the Task class and implement the required abstract methods. In this case we'll implement a dummy task that counts how many characters are in the document's text and stores that as a result.

from typing import Iterable
from sieves.tasks.core import Task
from sieves.data import Doc

class CharCountTask(Task):    
    def __call__(self, docs: Iterable[Doc]) -> Iterable[Doc]:
        """Counts characters in doc.text.

        :param docs: Documents to process.
        :return Iterable[Doc]: Processed documents.
        """
        for doc in docs:
            doc.results[self.id] = len(doc.text)
            yield doc

That's it! You can customize this, of course. You might also want to extend __init__() to allow for initializing what you need.

Predictive Tasks

Inherit from PredictiveTask whenever you want to make use of the structured generation capabilities in sieves. PredictiveTask requires you to implement a few methods that define how your task expects results to be structured, how few-shot examples are expected to look like, which prompt to use etc.

We'll break down how to create a predictive task step by step. For this example, let's implement a sentiment analysis task using outlines.

1. Implement a Bridge

A Bridge defines how to solve a task for a certain engine. We decided to go with outlines as our engine (you can allow multiple engines for a task by implementing corresponding bridges, but for simplicity's sake we'll stick with DSPy only here).

A Bridge requires you to implement/specify the following: - A prompt template (optional depending on the engine used). - A prompt signature description (optional depending on the engine used). - A prompt signature describing how results have to be structured. - Which inference mode to use. This defines how to engine queries the model and parses the results. - How to integrate results into docs. - How to consolidate results from multiple doc chunks into one result per doc.

We'll save this in sentiment_analysis_bridges.py.

from collections.abc import Iterable
from functools import cached_property

import pydantic

from sieves.data import Doc
from sieves.engines import EngineInferenceMode, outlines_
from sieves.tasks.predictive.bridges import Bridge


# This is how we require our response to look like - we require not just the score, but also a reasoning/justification 
# for why this model assigns this score. We also force the score to be between 0 and 1.
class SentimentEstimate(pydantic.BaseModel):
   reasoning: str
   score: pydantic.confloat(ge=0, le=1)


# This is the bridge class.
class OutlinesSentimentAnalysis(Bridge[SentimentEstimate, SentimentEstimate, EngineInferenceMode]):
    # This defines the default prompt template as Jinja2 template string.
    # We include an example block allowing us to include fewshot examples.
    @property
    def _prompt_template(self) -> str | None:
        return """
        Estimate the sentiment in this text as a float between 0 and 1. 0 is negative, 1 is positive. Provide your 
        reasoning for why you estimate this score before you output the score.

        {% if examples|length > 0 -%}
            Examples:
            ----------
            {%- for example in examples %}
                Text: "{{ example.text }}":
                Output:
                    Reasoning: "{{ example.reasoning }}":
                    Sentiment: "{{ example.sentiment }}"
            {% endfor -%}
            ----------
        {% endif -%}

        ========
        Text: {{ text }}
        Output: 
        """

    # Outlines doesn't make use of a prompt signature description, hence we return None here. 
    @property
    def _prompt_signature_description(self) -> str | None:
        return None

    # We return our SentimentEstimate as prompt signature. 
    @cached_property
    def prompt_signature(self) -> type[pydantic.BaseModel]:
        return SentimentEstimate

    # Outlines as a JSON mode, which allows complex return objects to be specified and parsed as Pydantic objects.
    @property
    def inference_mode(self) -> outlines_.InferenceMode:
        return outlines_.InferenceMode.json

    # We copy the result score into our doc's results attribute.
    def integrate(self, results: Iterable[SentimentEstimate], docs: Iterable[Doc]) -> Iterable[Doc]:
        for doc, result in zip(docs, results):
            assert isinstance(result, SentimentEstimate)
            # doc.results is a dict, with the task ID being the key to store our results under for the corresponding 
            # task.
            doc.results[self._task_id] = result.score
        return docs

    # Consolidating multiple chunks for sentiment analysis can be pretty straightforward: we compute the average over 
    # all chunks and assume this to be the sentiment score for the doc.
    def consolidate(
        self, results: Iterable[SentimentEstimate], docs_offsets: list[tuple[int, int]]
    ) -> Iterable[SentimentEstimate]:
        results = list(results)

        # Iterate over indices that determine which chunks belong to which documents.
        for doc_offset in docs_offsets:
            # Keep track of all reasonings and the total score.
            reasonings: list[str] = []
            scores = 0.

            # Iterate over chunks' results.
            for chunk_result in results[doc_offset[0] : doc_offset[1]]:
                # Engines may return None results if they encounter errors and run in permissive mode. We ignore such 
                # results.
                if chunk_result:
                    assert isinstance(chunk_result, SentimentEstimate)
                    reasonings.append(chunk_result.reasoning)
                    scores += chunk_result.score

            yield SentimentEstimate(
               # Average the score.
               score=scores / (doc_offset[1] - doc_offset[0]),
               # Concatenate all reasonings.
               reasoning=str(reasonings)
            )

Our bridge takes care of most of the heavy lifting: it defines how we expect our results to look like, it consolidates the results we're getting back from the engine, and integrates them into our docs.

2. Build a SentimentAnalysisTask

The task class itself is mostly glue code: we instantiate our bridge(s) and provide other auxiliary, engine-agnostic functionality. We'll save this in sentiment_analysis_task.py

from __future__ import annotations

from collections.abc import Iterable
from typing import Any

import datasets

from sieves.data import Doc
from sieves.engines import EngineType
from sieves.serialization import Config
from .sentiment_analysis_bridges import OutlinesSentimentAnalysis, SentimentEstimate
from sieves.tasks.predictive.core import PredictiveTask


# We'll define that class we require fewshot examples to be provided in. In our case we can just inherit from our 
# prompt signature class and add a `text` property.
class FewshotExample(SentimentEstimate):
    text: str


class SentimentAnalysis(PredictiveTask[SentimentEstimate, SentimentEstimate, OutlinesSentimentAnalysis]):
    # For the initialization of the bridge. We raise an error if an engine has been specified that we don't support (due 
    # to us not having a bridge implemented that would support this engine type).
    def _init_bridge(self, engine_type: EngineType) -> OutlinesSentimentAnalysis:
       if engine_type == EngineType.outlines:
           return OutlinesSentimentAnalysis(
               task_id=self._task_id,
               prompt_template=self._custom_prompt_template,
               prompt_signature_desc=self._custom_prompt_signature_desc,
           )
       else:
          raise KeyError(f"Engine type {engine_type} is not supported by {self.__class__.__name__}.")

    # Represents set of supported engine types.
    @property
    def supports(self) -> set[EngineType]:
        return {EngineType.outlines}

    # This implements the conversion of a set of docs to a Hugging Face datasets.Dataset.
    # You can implement this as `raise NotImplementedError` if you're not interested in generating a Hugging Face 
    # dataset from your result data. 
    def to_dataset(self, docs: Iterable[Doc]) -> datasets.Dataset:
        # Define metadata.
        info = datasets.DatasetInfo(
            description=f"Sentiment estimation dataset. Generated with sieves"
            f"v{Config.get_version()}.",
            features=datasets.Features({"text": datasets.Value("string"), "score": datasets.Value("float32")}),
        )

        def generate_data() -> Iterable[dict[str, Any]]:
            """Yields results as dicts.
            :return Iterable[dict[str, Any]]: Results as dicts.
            """
            for doc in docs:
                yield {"text": doc.text, "score": doc.results[self._task_id]}

        # Create dataset.
        return datasets.Dataset.from_generator(generate_data, features=info.features, info=info)

And that's it! Our sentiment analysis task is finished.

3. Running our task

We can now use our sentiment analysis task like every built-in task:

from sieves import Doc, Pipeline, engines
from .sentiment_analysis_task import SentimentAnalysis

model_name = "HuggingFaceTB/SmolLM-135M-Instruct"
engine = engines.outlines_.Outlines(model=outlines.models.transformers(model_name))

docs = [Doc(text="I'm feeling happy today."), Doc(text="I was sad yesterday.")]
pipe = Pipeline([SentimentAnalysis(engine=engine)])

for doc in pipe(docs):
    print(doc.text, doc.results["SentimentAnalysis"])