ai/ai
1
0
Fork 0
ai/tests/unit/test_provider_openai_completions.py

442 lines
13 KiB
Python

from __future__ import annotations
import asyncio
from collections.abc import AsyncIterator
import pytest
from app.config.models import LoadedProviderConfig, OAuthAuth, UrlAuth
from app.core.errors import UpstreamProviderError
from app.core.types import CoreMessage, ProviderChatRequest
from app.providers.openai_completions.provider import OpenAICompletionsProvider
class _FakeDelta:
def __init__(
self,
role: str | None = None,
content: str | None = None,
reasoning_content: str | None = None,
reasoning: object | None = None,
tool_calls: list[dict[str, object]] | None = None,
) -> None:
self.role = role
self.content = content
self.reasoning_content = reasoning_content
self.reasoning = reasoning
self.tool_calls = tool_calls
class _FakeChoice:
def __init__(
self,
*,
index: int,
delta: _FakeDelta | None = None,
finish_reason: str | None = None,
) -> None:
self.index = index
self.delta = delta
self.finish_reason = finish_reason
class _FakeChunk:
def __init__(self, choices: list[_FakeChoice]) -> None:
self.choices = choices
class _FakeStream:
def __init__(self, chunks: list[_FakeChunk]) -> None:
self._chunks = chunks
def __aiter__(self) -> AsyncIterator[_FakeChunk]:
async def _it() -> AsyncIterator[_FakeChunk]:
for chunk in self._chunks:
yield chunk
return _it()
class _FakeCompletions:
def __init__(self) -> None:
self.payload: dict | None = None
async def create(self, **kwargs):
self.payload = kwargs
return _FakeStream(
[
_FakeChunk([_FakeChoice(index=0, delta=_FakeDelta(role="assistant"))]),
_FakeChunk([_FakeChoice(index=0, delta=_FakeDelta(content="Hello"))]),
_FakeChunk([_FakeChoice(index=0, finish_reason="stop")]),
]
)
class _ReasoningCompletions:
async def create(self, **kwargs):
return _FakeStream(
[
_FakeChunk([_FakeChoice(index=0, delta=_FakeDelta(role="assistant"))]),
_FakeChunk(
[
_FakeChoice(
index=0,
delta=_FakeDelta(reasoning={"summary": "thinking"}),
)
]
),
_FakeChunk(
[
_FakeChoice(
index=0,
delta=_FakeDelta(
tool_calls=[
{
"index": 0,
"id": "call_1",
"type": "function",
"function": {
"name": "question",
"arguments": "{",
},
}
]
),
)
]
),
_FakeChunk([_FakeChoice(index=0, finish_reason="tool_calls")]),
]
)
class _FakeChat:
def __init__(self) -> None:
self.completions = _FakeCompletions()
class _ReasoningChat:
def __init__(self) -> None:
self.completions = _ReasoningCompletions()
class _FakeClient:
def __init__(self) -> None:
self.chat = _FakeChat()
self.models = _FakeModels()
class _ReasoningClient:
def __init__(self) -> None:
self.chat = _ReasoningChat()
self.models = _FakeModels()
class _FakeModels:
def __init__(self) -> None:
self.last_kwargs: dict[str, object] | None = None
async def list(self, **kwargs):
self.last_kwargs = kwargs
class _Resp:
data = [
{
"id": "minimax/minimax-m2.5:free",
"name": "MiniMax",
"description": "desc",
"context_length": 2048,
"architecture": {
"input_modalities": ["text"],
"tokenizer": "Other",
},
"pricing": {"prompt": "0"},
"supported_parameters": ["max_tokens"],
"settings": {"foo": "bar"},
"opencode": {"family": "x"},
}
]
return _Resp()
class _FailingCompletions:
async def create(self, **kwargs):
raise RuntimeError("boom")
class _FailingChat:
def __init__(self) -> None:
self.completions = _FailingCompletions()
class _FailingClient:
def __init__(self) -> None:
self.chat = _FailingChat()
self.models = _FailingModels()
class _FailingModels:
async def list(self, **kwargs):
raise RuntimeError("boom")
class _StaticAuth:
async def get_headers(self) -> dict[str, str]:
return {"Authorization": "Bearer fetched-token"}
def _collect(async_iter) -> list:
async def _inner() -> list:
out = []
async for item in async_iter:
out.append(item)
return out
return asyncio.run(_inner())
def test_openai_completions_provider_streams_internal_chunks() -> None:
client = _FakeClient()
provider = OpenAICompletionsProvider(
name="kilo",
base_url="https://api.kilo.ai/api/openrouter",
token="public",
client=client,
)
req = ProviderChatRequest(
model="minimax/minimax-m2.5:free",
messages=[CoreMessage(role="user", content="hello")],
top_p=0.9,
max_tokens=123,
)
chunks = _collect(provider.stream_chat(req))
assert chunks[0].role == "assistant"
assert chunks[1].content == "Hello"
assert chunks[2].finish_reason == "stop"
payload = client.chat.completions.payload
assert payload is not None
assert payload["model"] == "minimax/minimax-m2.5:free"
assert payload["stream"] is True
assert payload["top_p"] == 0.9
assert payload["max_tokens"] == 123
assert payload["messages"] == [{"role": "user", "content": "hello"}]
assert payload["extra_headers"] == {"Authorization": "Bearer public"}
def test_openai_completions_provider_streams_reasoning_details() -> None:
class _ReasoningDetailsChat:
def __init__(self) -> None:
self.completions = _ReasoningDetailsCompletions()
class _ReasoningDetailsCompletions:
async def create(self, **kwargs):
return _FakeStream(
[
_FakeChunk(
[
_FakeChoice(
index=0,
delta=_FakeDelta(
reasoning={
"details": [
{
"type": "reasoning.encrypted",
"data": "enc_123",
"format": "openai-responses-v1",
}
]
}
),
)
]
),
_FakeChunk([_FakeChoice(index=0, finish_reason="stop")]),
]
)
class _ReasoningDetailsClient:
def __init__(self) -> None:
self.chat = _ReasoningDetailsChat()
self.models = _FakeModels()
provider = OpenAICompletionsProvider(
name="kilo",
base_url="https://api.kilo.ai/api/openrouter",
token="public",
client=_ReasoningDetailsClient(),
)
req = ProviderChatRequest(
model="minimax/minimax-m2.5:free",
messages=[CoreMessage(role="user", content="hello")],
)
chunks = _collect(provider.stream_chat(req))
assert chunks[0].role == "assistant"
assert chunks[1].reasoning_details == [
{
"type": "reasoning.encrypted",
"data": "enc_123",
"format": "openai-responses-v1",
}
]
assert chunks[2].finish_reason == "stop"
def test_openai_completions_provider_wraps_upstream_error() -> None:
provider = OpenAICompletionsProvider(
name="kilo",
base_url="https://api.kilo.ai/api/openrouter",
token="public",
client=_FailingClient(),
)
req = ProviderChatRequest(
model="minimax/minimax-m2.5:free",
messages=[CoreMessage(role="user", content="hello")],
)
with pytest.raises(UpstreamProviderError, match="failed while streaming"):
_collect(provider.stream_chat(req))
def test_openai_completions_provider_streams_reasoning_and_tool_calls() -> None:
provider = OpenAICompletionsProvider(
name="kilo",
base_url="https://api.kilo.ai/api/openrouter",
token="public",
client=_ReasoningClient(),
)
req = ProviderChatRequest(
model="minimax/minimax-m2.5:free",
messages=[CoreMessage(role="user", content="hello")],
)
chunks = _collect(provider.stream_chat(req))
assert chunks[0].role == "assistant"
assert chunks[1].reasoning_content == "thinking"
assert chunks[2].tool_calls == [
{
"index": 0,
"id": "call_1",
"type": "function",
"function": {"name": "question", "arguments": "{"},
}
]
assert chunks[3].finish_reason == "tool_calls"
def test_openai_completions_provider_lists_models() -> None:
client = _FakeClient()
provider = OpenAICompletionsProvider(
name="kilo",
base_url="https://api.kilo.ai/api/openrouter",
token="public",
client=client,
)
models = asyncio.run(provider.list_models())
assert len(models) == 1
assert models[0].id == "minimax/minimax-m2.5:free"
assert models[0].context_length == 2048
assert models[0].architecture == {"input_modalities": ["text"]}
assert client.models.last_kwargs == {
"extra_headers": {"Authorization": "Bearer public"}
}
def test_openai_completions_provider_wraps_model_list_errors() -> None:
provider = OpenAICompletionsProvider(
name="kilo",
base_url="https://api.kilo.ai/api/openrouter",
token="public",
client=_FailingClient(),
)
with pytest.raises(UpstreamProviderError, match="failed while listing models"):
asyncio.run(provider.list_models())
def test_openai_completions_provider_from_config_supports_url_auth() -> None:
config = LoadedProviderConfig(
name="kilo",
url="https://api.kilo.ai/api/openrouter",
type="openai-completions",
auth=UrlAuth(url="https://auth.local/token"),
)
provider = OpenAICompletionsProvider.from_config(config)
assert isinstance(provider, OpenAICompletionsProvider)
def test_openai_completions_provider_from_config_rejects_oauth() -> None:
config = LoadedProviderConfig(
name="kilo",
url="https://api.kilo.ai/api/openrouter",
type="openai-completions",
auth=OAuthAuth(access="acc", refresh="ref", expires=1),
)
with pytest.raises(ValueError, match="requires token or url auth"):
OpenAICompletionsProvider.from_config(config)
def test_openai_completions_provider_uses_custom_auth_provider() -> None:
client = _FakeClient()
provider = OpenAICompletionsProvider(
name="kilo",
base_url="https://api.kilo.ai/api/openrouter",
auth_provider=_StaticAuth(),
client=client,
)
req = ProviderChatRequest(
model="minimax/minimax-m2.5:free",
messages=[CoreMessage(role="user", content="hello")],
)
_collect(provider.stream_chat(req))
payload = client.chat.completions.payload
assert payload is not None
assert payload["extra_headers"] == {"Authorization": "Bearer fetched-token"}
def test_openai_completions_provider_passes_schema_fields_via_extra_body() -> None:
client = _FakeClient()
provider = OpenAICompletionsProvider(
name="kilo",
base_url="https://api.kilo.ai/api/openrouter",
token="public",
client=client,
)
req = ProviderChatRequest(
model="minimax/minimax-m2.5:free",
messages=[CoreMessage(role="user", content="hello")],
reasoning={"effort": "high", "summary": "detailed"},
provider={"allow_fallbacks": False},
plugins=[{"id": "web", "enabled": True}],
session_id="ses_123",
trace={"trace_id": "tr_1"},
models=["openai/gpt-5"],
debug={"echo_upstream_body": True},
image_config={"size": "1024x1024"},
)
_collect(provider.stream_chat(req))
payload = client.chat.completions.payload
assert payload is not None
assert payload["extra_body"] == {
"reasoning": {"effort": "high", "summary": "detailed"},
"provider": {"allow_fallbacks": False},
"plugins": [{"id": "web", "enabled": True}],
"session_id": "ses_123",
"trace": {"trace_id": "tr_1"},
"models": ["openai/gpt-5"],
"debug": {"echo_upstream_body": True},
"image_config": {"size": "1024x1024"},
}