mirror of
https://github.com/cloudflare/cloudflare-python.git
synced 2026-01-12 06:53:21 +00:00
250 lines
7.9 KiB
Python
250 lines
7.9 KiB
Python
from __future__ import annotations
|
||
|
||
from typing import Iterator, AsyncIterator
|
||
|
||
import httpx
|
||
import pytest
|
||
|
||
from cloudflare import Cloudflare, AsyncCloudflare
|
||
from cloudflare._streaming import Stream, AsyncStream, ServerSentEvent
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
@pytest.mark.parametrize("sync", [True, False], ids=["sync", "async"])
|
||
async def test_basic(sync: bool, client: Cloudflare, async_client: AsyncCloudflare) -> None:
|
||
def body() -> Iterator[bytes]:
|
||
yield b"event: completion\n"
|
||
yield b'data: {"foo":true}\n'
|
||
yield b"\n"
|
||
|
||
iterator = make_event_iterator(content=body(), sync=sync, client=client, async_client=async_client)
|
||
|
||
sse = await iter_next(iterator)
|
||
assert sse.event == "completion"
|
||
assert sse.json() == {"foo": True}
|
||
|
||
await assert_empty_iter(iterator)
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
@pytest.mark.parametrize("sync", [True, False], ids=["sync", "async"])
|
||
async def test_data_missing_event(sync: bool, client: Cloudflare, async_client: AsyncCloudflare) -> None:
|
||
def body() -> Iterator[bytes]:
|
||
yield b'data: {"foo":true}\n'
|
||
yield b"\n"
|
||
|
||
iterator = make_event_iterator(content=body(), sync=sync, client=client, async_client=async_client)
|
||
|
||
sse = await iter_next(iterator)
|
||
assert sse.event is None
|
||
assert sse.json() == {"foo": True}
|
||
|
||
await assert_empty_iter(iterator)
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
@pytest.mark.parametrize("sync", [True, False], ids=["sync", "async"])
|
||
async def test_event_missing_data(sync: bool, client: Cloudflare, async_client: AsyncCloudflare) -> None:
|
||
def body() -> Iterator[bytes]:
|
||
yield b"event: ping\n"
|
||
yield b"\n"
|
||
|
||
iterator = make_event_iterator(content=body(), sync=sync, client=client, async_client=async_client)
|
||
|
||
sse = await iter_next(iterator)
|
||
assert sse.event == "ping"
|
||
assert sse.data == ""
|
||
|
||
await assert_empty_iter(iterator)
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
@pytest.mark.parametrize("sync", [True, False], ids=["sync", "async"])
|
||
async def test_multiple_events(sync: bool, client: Cloudflare, async_client: AsyncCloudflare) -> None:
|
||
def body() -> Iterator[bytes]:
|
||
yield b"event: ping\n"
|
||
yield b"\n"
|
||
yield b"event: completion\n"
|
||
yield b"\n"
|
||
|
||
iterator = make_event_iterator(content=body(), sync=sync, client=client, async_client=async_client)
|
||
|
||
sse = await iter_next(iterator)
|
||
assert sse.event == "ping"
|
||
assert sse.data == ""
|
||
|
||
sse = await iter_next(iterator)
|
||
assert sse.event == "completion"
|
||
assert sse.data == ""
|
||
|
||
await assert_empty_iter(iterator)
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
@pytest.mark.parametrize("sync", [True, False], ids=["sync", "async"])
|
||
async def test_multiple_events_with_data(sync: bool, client: Cloudflare, async_client: AsyncCloudflare) -> None:
|
||
def body() -> Iterator[bytes]:
|
||
yield b"event: ping\n"
|
||
yield b'data: {"foo":true}\n'
|
||
yield b"\n"
|
||
yield b"event: completion\n"
|
||
yield b'data: {"bar":false}\n'
|
||
yield b"\n"
|
||
|
||
iterator = make_event_iterator(content=body(), sync=sync, client=client, async_client=async_client)
|
||
|
||
sse = await iter_next(iterator)
|
||
assert sse.event == "ping"
|
||
assert sse.json() == {"foo": True}
|
||
|
||
sse = await iter_next(iterator)
|
||
assert sse.event == "completion"
|
||
assert sse.json() == {"bar": False}
|
||
|
||
await assert_empty_iter(iterator)
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
@pytest.mark.parametrize("sync", [True, False], ids=["sync", "async"])
|
||
async def test_multiple_data_lines_with_empty_line(
|
||
sync: bool, client: Cloudflare, async_client: AsyncCloudflare
|
||
) -> None:
|
||
def body() -> Iterator[bytes]:
|
||
yield b"event: ping\n"
|
||
yield b"data: {\n"
|
||
yield b'data: "foo":\n'
|
||
yield b"data: \n"
|
||
yield b"data:\n"
|
||
yield b"data: true}\n"
|
||
yield b"\n\n"
|
||
|
||
iterator = make_event_iterator(content=body(), sync=sync, client=client, async_client=async_client)
|
||
|
||
sse = await iter_next(iterator)
|
||
assert sse.event == "ping"
|
||
assert sse.json() == {"foo": True}
|
||
assert sse.data == '{\n"foo":\n\n\ntrue}'
|
||
|
||
await assert_empty_iter(iterator)
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
@pytest.mark.parametrize("sync", [True, False], ids=["sync", "async"])
|
||
async def test_data_json_escaped_double_new_line(sync: bool, client: Cloudflare, async_client: AsyncCloudflare) -> None:
|
||
def body() -> Iterator[bytes]:
|
||
yield b"event: ping\n"
|
||
yield b'data: {"foo": "my long\\n\\ncontent"}'
|
||
yield b"\n\n"
|
||
|
||
iterator = make_event_iterator(content=body(), sync=sync, client=client, async_client=async_client)
|
||
|
||
sse = await iter_next(iterator)
|
||
assert sse.event == "ping"
|
||
assert sse.json() == {"foo": "my long\n\ncontent"}
|
||
|
||
await assert_empty_iter(iterator)
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
@pytest.mark.parametrize("sync", [True, False], ids=["sync", "async"])
|
||
async def test_multiple_data_lines(sync: bool, client: Cloudflare, async_client: AsyncCloudflare) -> None:
|
||
def body() -> Iterator[bytes]:
|
||
yield b"event: ping\n"
|
||
yield b"data: {\n"
|
||
yield b'data: "foo":\n'
|
||
yield b"data: true}\n"
|
||
yield b"\n\n"
|
||
|
||
iterator = make_event_iterator(content=body(), sync=sync, client=client, async_client=async_client)
|
||
|
||
sse = await iter_next(iterator)
|
||
assert sse.event == "ping"
|
||
assert sse.json() == {"foo": True}
|
||
|
||
await assert_empty_iter(iterator)
|
||
|
||
|
||
@pytest.mark.parametrize("sync", [True, False], ids=["sync", "async"])
|
||
async def test_special_new_line_character(
|
||
sync: bool,
|
||
client: Cloudflare,
|
||
async_client: AsyncCloudflare,
|
||
) -> None:
|
||
def body() -> Iterator[bytes]:
|
||
yield b'data: {"content":" culpa"}\n'
|
||
yield b"\n"
|
||
yield b'data: {"content":" \xe2\x80\xa8"}\n'
|
||
yield b"\n"
|
||
yield b'data: {"content":"foo"}\n'
|
||
yield b"\n"
|
||
|
||
iterator = make_event_iterator(content=body(), sync=sync, client=client, async_client=async_client)
|
||
|
||
sse = await iter_next(iterator)
|
||
assert sse.event is None
|
||
assert sse.json() == {"content": " culpa"}
|
||
|
||
sse = await iter_next(iterator)
|
||
assert sse.event is None
|
||
assert sse.json() == {"content": "
"}
|
||
|
||
sse = await iter_next(iterator)
|
||
assert sse.event is None
|
||
assert sse.json() == {"content": "foo"}
|
||
|
||
await assert_empty_iter(iterator)
|
||
|
||
|
||
@pytest.mark.parametrize("sync", [True, False], ids=["sync", "async"])
|
||
async def test_multi_byte_character_multiple_chunks(
|
||
sync: bool,
|
||
client: Cloudflare,
|
||
async_client: AsyncCloudflare,
|
||
) -> None:
|
||
def body() -> Iterator[bytes]:
|
||
yield b'data: {"content":"'
|
||
# bytes taken from the string 'известни' and arbitrarily split
|
||
# so that some multi-byte characters span multiple chunks
|
||
yield b"\xd0"
|
||
yield b"\xb8\xd0\xb7\xd0"
|
||
yield b"\xb2\xd0\xb5\xd1\x81\xd1\x82\xd0\xbd\xd0\xb8"
|
||
yield b'"}\n'
|
||
yield b"\n"
|
||
|
||
iterator = make_event_iterator(content=body(), sync=sync, client=client, async_client=async_client)
|
||
|
||
sse = await iter_next(iterator)
|
||
assert sse.event is None
|
||
assert sse.json() == {"content": "известни"}
|
||
|
||
|
||
async def to_aiter(iter: Iterator[bytes]) -> AsyncIterator[bytes]:
|
||
for chunk in iter:
|
||
yield chunk
|
||
|
||
|
||
async def iter_next(iter: Iterator[ServerSentEvent] | AsyncIterator[ServerSentEvent]) -> ServerSentEvent:
|
||
if isinstance(iter, AsyncIterator):
|
||
return await iter.__anext__()
|
||
|
||
return next(iter)
|
||
|
||
|
||
async def assert_empty_iter(iter: Iterator[ServerSentEvent] | AsyncIterator[ServerSentEvent]) -> None:
|
||
with pytest.raises((StopAsyncIteration, RuntimeError)):
|
||
await iter_next(iter)
|
||
|
||
|
||
def make_event_iterator(
|
||
content: Iterator[bytes],
|
||
*,
|
||
sync: bool,
|
||
client: Cloudflare,
|
||
async_client: AsyncCloudflare,
|
||
) -> Iterator[ServerSentEvent] | AsyncIterator[ServerSentEvent]:
|
||
if sync:
|
||
return Stream(cast_to=object, client=client, response=httpx.Response(200, content=content))._iter_events()
|
||
|
||
return AsyncStream(
|
||
cast_to=object, client=async_client, response=httpx.Response(200, content=to_aiter(content))
|
||
)._iter_events()
|