Skip to content

DSPy

Bases: Engine[PromptSignature, Result, Model, InferenceMode]

Engine for DSPy.

Source code in sieves/engines/dspy_.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
class DSPy(Engine[PromptSignature, Result, Model, InferenceMode]):
    """Engine for DSPy."""

    def __init__(
        self,
        model: Model,
        config_kwargs: dict[str, Any] | None = None,
        init_kwargs: dict[str, Any] | None = None,
        inference_kwargs: dict[str, Any] | None = None,
        batch_size: int = -1,
    ):
        """
        :param model: Model to run. Note: DSPy only runs with APIs. If you want to run a model locally from v2.5
            onwards, serve it with OLlama - see here: # https://dspy.ai/learn/programming/language_models/?h=models#__tabbed_1_5.
            In a nutshell:
            > curl -fsSL https://ollama.ai/install.sh | sh
            > ollama run MODEL_ID
            > `model = dspy.LM(MODEL_ID, api_base='http://localhost:11434', api_key='')`
        :param config_kwargs: Optional kwargs supplied to dspy.configure().
        :param init_kwargs: Optional kwargs to supply to engine executable at init time.
        :param inference_kwargs: Optional kwargs to supply to engine executable at inference time.
        :param batch_size: Batch size in processing prompts. -1 will batch all documents in one go. Not all engines
            support batching.
        """
        super().__init__(model, init_kwargs, inference_kwargs, batch_size=batch_size)
        config_kwargs = {"max_tokens": 100000} | (config_kwargs or {})
        dspy.configure(lm=model, **config_kwargs)

    @property
    def inference_modes(self) -> type[InferenceMode]:
        return InferenceMode

    @property
    def supports_few_shotting(self) -> bool:
        return True

    def build_executable(
        self,
        inference_mode: InferenceMode,
        prompt_template: str | None,  # noqa: UP007
        prompt_signature: type[PromptSignature] | PromptSignature,
        fewshot_examples: Iterable[pydantic.BaseModel] = tuple(),
    ) -> Executable[Result | None]:
        assert isinstance(prompt_signature, type)

        # Note: prompt_template is ignored here, as DSPy doesn't use it directly (only prompt_signature_description).
        def execute(values: Iterable[dict[str, Any]]) -> Iterable[Result | None]:
            # Handled differently than the other supported modules: dspy.Module serves as both the signature as well as
            # the inference generator.
            if inference_mode == InferenceMode.module:
                assert isinstance(prompt_signature, dspy.Module), ValueError(
                    "In inference mode 'module' the provided prompt signature has to be of type dspy.Module."
                )
                generator = inference_mode.value(**self._init_kwargs)
            else:
                assert issubclass(prompt_signature, dspy.Signature)
                generator = inference_mode.value(signature=prompt_signature, **self._init_kwargs)

            # Compile predictor with few-shot examples.
            fewshot_examples_dict = DSPy._convert_fewshot_examples(fewshot_examples)
            examples = [dspy.Example(**fs_example) for fs_example in fewshot_examples_dict]
            generator = dspy.asyncify(dspy.LabeledFewShot(k=5).compile(student=generator, trainset=examples))

            batch_size = self._batch_size if self._batch_size != -1 else sys.maxsize
            # Ensure values are read as generator for standardized batch handling (otherwise we'd have to use
            # different batch handling depending on whether lists/tuples or generators are used).
            values = (v for v in values)

            while batch := [vals for vals in itertools.islice(values, batch_size)]:
                if len(batch) == 0:
                    break

                try:
                    calls = [generator(**doc_values, **self._inference_kwargs) for doc_values in batch]
                    yield from asyncio.run(self._execute_async_calls(calls))

                except ValueError as err:
                    if self._strict_mode:
                        raise ValueError(
                            "Encountered problem when executing prompt. Ensure your few-shot examples and document "
                            "chunks contain sensible information."
                        ) from err
                    else:
                        yield [None] * len(batch)

        return execute

_attributes property

Returns attributes to serialize.

Returns:

Type Description
dict[str, Attribute]

Dict of attributes to serialize.

model property

Return model instance.

Returns:

Type Description
EngineModel

Model instance.

__init__(model, config_kwargs=None, init_kwargs=None, inference_kwargs=None, batch_size=-1)

Parameters:

Name Type Description Default
model Model

Model to run. Note: DSPy only runs with APIs. If you want to run a model locally from v2.5 onwards, serve it with OLlama - see here: # https://dspy.ai/learn/programming/language_models/?h=models#__tabbed_1_5. In a nutshell: > curl -fsSL https://ollama.ai/install.sh | sh > ollama run MODEL_ID > model = dspy.LM(MODEL_ID, api_base='http://localhost:11434', api_key='')

required
config_kwargs dict[str, Any] | None

Optional kwargs supplied to dspy.configure().

None
init_kwargs dict[str, Any] | None

Optional kwargs to supply to engine executable at init time.

None
inference_kwargs dict[str, Any] | None

Optional kwargs to supply to engine executable at inference time.

None
batch_size int

Batch size in processing prompts. -1 will batch all documents in one go. Not all engines support batching.

-1
Source code in sieves/engines/dspy_.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def __init__(
    self,
    model: Model,
    config_kwargs: dict[str, Any] | None = None,
    init_kwargs: dict[str, Any] | None = None,
    inference_kwargs: dict[str, Any] | None = None,
    batch_size: int = -1,
):
    """
    :param model: Model to run. Note: DSPy only runs with APIs. If you want to run a model locally from v2.5
        onwards, serve it with OLlama - see here: # https://dspy.ai/learn/programming/language_models/?h=models#__tabbed_1_5.
        In a nutshell:
        > curl -fsSL https://ollama.ai/install.sh | sh
        > ollama run MODEL_ID
        > `model = dspy.LM(MODEL_ID, api_base='http://localhost:11434', api_key='')`
    :param config_kwargs: Optional kwargs supplied to dspy.configure().
    :param init_kwargs: Optional kwargs to supply to engine executable at init time.
    :param inference_kwargs: Optional kwargs to supply to engine executable at inference time.
    :param batch_size: Batch size in processing prompts. -1 will batch all documents in one go. Not all engines
        support batching.
    """
    super().__init__(model, init_kwargs, inference_kwargs, batch_size=batch_size)
    config_kwargs = {"max_tokens": 100000} | (config_kwargs or {})
    dspy.configure(lm=model, **config_kwargs)

_convert_fewshot_examples(fewshot_examples) staticmethod

Convert fewshot examples from pydantic.BaseModel instance to dicts.

Parameters:

Name Type Description Default
fewshot_examples Iterable[BaseModel]

Fewshot examples to convert.

required

Returns:

Type Description
list[dict[str, Any]]

Fewshot examples as dicts.

Source code in sieves/engines/core.py
 96
 97
 98
 99
100
101
102
103
@staticmethod
def _convert_fewshot_examples(fewshot_examples: Iterable[pydantic.BaseModel]) -> list[dict[str, Any]]:
    """
    Convert fewshot examples from pydantic.BaseModel instance to dicts.
    :param fewshot_examples: Fewshot examples to convert.
    :return: Fewshot examples as dicts.
    """
    return [fs_example.model_dump(serialize_as_any=True) for fs_example in fewshot_examples]

_execute_async_calls(calls) async staticmethod

Executes batch of async functions.

Parameters:

Name Type Description Default
calls list[Coroutine[Any, Any, Any]] | list[Awaitable[Any]]

Async calls to execute.

required

Returns:

Type Description
Any

Parsed response objects.

Source code in sieves/engines/core.py
135
136
137
138
139
140
141
@staticmethod
async def _execute_async_calls(calls: list[Coroutine[Any, Any, Any]] | list[Awaitable[Any]]) -> Any:
    """Executes batch of async functions.
    :param calls: Async calls to execute.
    :return: Parsed response objects.
    """
    return await asyncio.gather(*calls)

_validate_batch_size(batch_size)

Validates batch_size. Noop by default.

Parameters:

Name Type Description Default
batch_size int

Specified batch size.

required

Returns:

Type Description
int

Validated batch size.

Source code in sieves/engines/core.py
50
51
52
53
54
55
def _validate_batch_size(self, batch_size: int) -> int:
    """Validates batch_size. Noop by default.
    :param batch_size: Specified batch size.
    :returns int: Validated batch size.
    """
    return batch_size

deserialize(config, **kwargs) classmethod

Generate Engine instance from config.

Parameters:

Name Type Description Default
config Config

Config to generate instance from.

required
kwargs dict[str, Any]

Values to inject into loaded config.

{}

Returns:

Type Description
Engine[EnginePromptSignature, EngineResult, EngineModel, EngineInferenceMode]

Deserialized Engine instance.

Source code in sieves/engines/core.py
124
125
126
127
128
129
130
131
132
133
@classmethod
def deserialize(
    cls, config: Config, **kwargs: dict[str, Any]
) -> Engine[EnginePromptSignature, EngineResult, EngineModel, EngineInferenceMode]:
    """Generate Engine instance from config.
    :param config: Config to generate instance from.
    :param kwargs: Values to inject into loaded config.
    :return: Deserialized Engine instance.
    """
    return cls(**config.to_init_dict(cls, **kwargs))

serialize()

Serializes engine.

Returns:

Type Description
Config

Config instance.

Source code in sieves/engines/core.py
118
119
120
121
122
def serialize(self) -> Config:
    """Serializes engine.
    :return: Config instance.
    """
    return Config.create(self.__class__, self._attributes)