cloudflare-python/tests/api_resources/workers/scripts/test_schedules.py
2025-05-15 20:37:51 +00:00

224 lines
9.6 KiB
Python

# File generated from our OpenAPI spec by Stainless. See CONTRIBUTING.md for details.
from __future__ import annotations
import os
from typing import Any, cast
import pytest
from cloudflare import Cloudflare, AsyncCloudflare
from tests.utils import assert_matches_type
from cloudflare.types.workers.scripts import ScheduleGetResponse, ScheduleUpdateResponse
base_url = os.environ.get("TEST_API_BASE_URL", "http://127.0.0.1:4010")
class TestSchedules:
parametrize = pytest.mark.parametrize("client", [False, True], indirect=True, ids=["loose", "strict"])
@parametrize
def test_method_update(self, client: Cloudflare) -> None:
schedule = client.workers.scripts.schedules.update(
script_name="this-is_my_script-01",
account_id="023e105f4ecef8ad9ca31a8372d0c353",
body=[{"cron": "*/30 * * * *"}],
)
assert_matches_type(ScheduleUpdateResponse, schedule, path=["response"])
@parametrize
def test_raw_response_update(self, client: Cloudflare) -> None:
response = client.workers.scripts.schedules.with_raw_response.update(
script_name="this-is_my_script-01",
account_id="023e105f4ecef8ad9ca31a8372d0c353",
body=[{"cron": "*/30 * * * *"}],
)
assert response.is_closed is True
assert response.http_request.headers.get("X-Stainless-Lang") == "python"
schedule = response.parse()
assert_matches_type(ScheduleUpdateResponse, schedule, path=["response"])
@parametrize
def test_streaming_response_update(self, client: Cloudflare) -> None:
with client.workers.scripts.schedules.with_streaming_response.update(
script_name="this-is_my_script-01",
account_id="023e105f4ecef8ad9ca31a8372d0c353",
body=[{"cron": "*/30 * * * *"}],
) as response:
assert not response.is_closed
assert response.http_request.headers.get("X-Stainless-Lang") == "python"
schedule = response.parse()
assert_matches_type(ScheduleUpdateResponse, schedule, path=["response"])
assert cast(Any, response.is_closed) is True
@parametrize
def test_path_params_update(self, client: Cloudflare) -> None:
with pytest.raises(ValueError, match=r"Expected a non-empty value for `account_id` but received ''"):
client.workers.scripts.schedules.with_raw_response.update(
script_name="this-is_my_script-01",
account_id="",
body=[{"cron": "*/30 * * * *"}],
)
with pytest.raises(ValueError, match=r"Expected a non-empty value for `script_name` but received ''"):
client.workers.scripts.schedules.with_raw_response.update(
script_name="",
account_id="023e105f4ecef8ad9ca31a8372d0c353",
body=[{"cron": "*/30 * * * *"}],
)
@parametrize
def test_method_get(self, client: Cloudflare) -> None:
schedule = client.workers.scripts.schedules.get(
script_name="this-is_my_script-01",
account_id="023e105f4ecef8ad9ca31a8372d0c353",
)
assert_matches_type(ScheduleGetResponse, schedule, path=["response"])
@parametrize
def test_raw_response_get(self, client: Cloudflare) -> None:
response = client.workers.scripts.schedules.with_raw_response.get(
script_name="this-is_my_script-01",
account_id="023e105f4ecef8ad9ca31a8372d0c353",
)
assert response.is_closed is True
assert response.http_request.headers.get("X-Stainless-Lang") == "python"
schedule = response.parse()
assert_matches_type(ScheduleGetResponse, schedule, path=["response"])
@parametrize
def test_streaming_response_get(self, client: Cloudflare) -> None:
with client.workers.scripts.schedules.with_streaming_response.get(
script_name="this-is_my_script-01",
account_id="023e105f4ecef8ad9ca31a8372d0c353",
) as response:
assert not response.is_closed
assert response.http_request.headers.get("X-Stainless-Lang") == "python"
schedule = response.parse()
assert_matches_type(ScheduleGetResponse, schedule, path=["response"])
assert cast(Any, response.is_closed) is True
@parametrize
def test_path_params_get(self, client: Cloudflare) -> None:
with pytest.raises(ValueError, match=r"Expected a non-empty value for `account_id` but received ''"):
client.workers.scripts.schedules.with_raw_response.get(
script_name="this-is_my_script-01",
account_id="",
)
with pytest.raises(ValueError, match=r"Expected a non-empty value for `script_name` but received ''"):
client.workers.scripts.schedules.with_raw_response.get(
script_name="",
account_id="023e105f4ecef8ad9ca31a8372d0c353",
)
class TestAsyncSchedules:
parametrize = pytest.mark.parametrize("async_client", [False, True], indirect=True, ids=["loose", "strict"])
@parametrize
async def test_method_update(self, async_client: AsyncCloudflare) -> None:
schedule = await async_client.workers.scripts.schedules.update(
script_name="this-is_my_script-01",
account_id="023e105f4ecef8ad9ca31a8372d0c353",
body=[{"cron": "*/30 * * * *"}],
)
assert_matches_type(ScheduleUpdateResponse, schedule, path=["response"])
@parametrize
async def test_raw_response_update(self, async_client: AsyncCloudflare) -> None:
response = await async_client.workers.scripts.schedules.with_raw_response.update(
script_name="this-is_my_script-01",
account_id="023e105f4ecef8ad9ca31a8372d0c353",
body=[{"cron": "*/30 * * * *"}],
)
assert response.is_closed is True
assert response.http_request.headers.get("X-Stainless-Lang") == "python"
schedule = await response.parse()
assert_matches_type(ScheduleUpdateResponse, schedule, path=["response"])
@parametrize
async def test_streaming_response_update(self, async_client: AsyncCloudflare) -> None:
async with async_client.workers.scripts.schedules.with_streaming_response.update(
script_name="this-is_my_script-01",
account_id="023e105f4ecef8ad9ca31a8372d0c353",
body=[{"cron": "*/30 * * * *"}],
) as response:
assert not response.is_closed
assert response.http_request.headers.get("X-Stainless-Lang") == "python"
schedule = await response.parse()
assert_matches_type(ScheduleUpdateResponse, schedule, path=["response"])
assert cast(Any, response.is_closed) is True
@parametrize
async def test_path_params_update(self, async_client: AsyncCloudflare) -> None:
with pytest.raises(ValueError, match=r"Expected a non-empty value for `account_id` but received ''"):
await async_client.workers.scripts.schedules.with_raw_response.update(
script_name="this-is_my_script-01",
account_id="",
body=[{"cron": "*/30 * * * *"}],
)
with pytest.raises(ValueError, match=r"Expected a non-empty value for `script_name` but received ''"):
await async_client.workers.scripts.schedules.with_raw_response.update(
script_name="",
account_id="023e105f4ecef8ad9ca31a8372d0c353",
body=[{"cron": "*/30 * * * *"}],
)
@parametrize
async def test_method_get(self, async_client: AsyncCloudflare) -> None:
schedule = await async_client.workers.scripts.schedules.get(
script_name="this-is_my_script-01",
account_id="023e105f4ecef8ad9ca31a8372d0c353",
)
assert_matches_type(ScheduleGetResponse, schedule, path=["response"])
@parametrize
async def test_raw_response_get(self, async_client: AsyncCloudflare) -> None:
response = await async_client.workers.scripts.schedules.with_raw_response.get(
script_name="this-is_my_script-01",
account_id="023e105f4ecef8ad9ca31a8372d0c353",
)
assert response.is_closed is True
assert response.http_request.headers.get("X-Stainless-Lang") == "python"
schedule = await response.parse()
assert_matches_type(ScheduleGetResponse, schedule, path=["response"])
@parametrize
async def test_streaming_response_get(self, async_client: AsyncCloudflare) -> None:
async with async_client.workers.scripts.schedules.with_streaming_response.get(
script_name="this-is_my_script-01",
account_id="023e105f4ecef8ad9ca31a8372d0c353",
) as response:
assert not response.is_closed
assert response.http_request.headers.get("X-Stainless-Lang") == "python"
schedule = await response.parse()
assert_matches_type(ScheduleGetResponse, schedule, path=["response"])
assert cast(Any, response.is_closed) is True
@parametrize
async def test_path_params_get(self, async_client: AsyncCloudflare) -> None:
with pytest.raises(ValueError, match=r"Expected a non-empty value for `account_id` but received ''"):
await async_client.workers.scripts.schedules.with_raw_response.get(
script_name="this-is_my_script-01",
account_id="",
)
with pytest.raises(ValueError, match=r"Expected a non-empty value for `script_name` but received ''"):
await async_client.workers.scripts.schedules.with_raw_response.get(
script_name="",
account_id="023e105f4ecef8ad9ca31a8372d0c353",
)