param_service/param_service_test.py

244 lines
6.9 KiB
Python

import asyncio
import signal
import sys
from param_service import ParamsService
# 全局变量用于存储所有活动的服务实例
active_services = []
def signal_handler(sig, frame):
"""处理Ctrl+C信号"""
print("\nShutting down gracefully...")
for service in active_services:
service.cleanup()
sys.exit(0)
# 注册信号处理器
signal.signal(signal.SIGINT, signal_handler)
async def setup_service():
"""设置服务实例和回调函数"""
host = "localhost"
port = 12345
service = ParamsService(host, port)
# 添加到活动服务列表
active_services.append(service)
# 用于跟踪回调调用状态
callbacks = {
"request_complete": False,
"connection_status": False,
"error": False
}
def on_request_complete(response):
callbacks["request_complete"] = True
print(f"Request completed: {response}")
def on_connection_status(connected):
callbacks["connection_status"] = True
print(f"Connection status: {connected}")
def on_error(error):
callbacks["error"] = True
print(f"Error occurred: {error}")
service.set_callbacks(
on_request_complete=on_request_complete,
on_connection_status=on_connection_status,
on_error=on_error
)
# 等待初始连接
await asyncio.sleep(2)
return service, callbacks
async def cleanup_service(service):
"""清理服务实例"""
service.cleanup()
if service in active_services:
active_services.remove(service)
async def test_sync_with_callbacks():
"""测试使用回调函数的同步方式"""
print("\n=== Testing sync with callbacks ===")
service, callbacks = await setup_service()
try:
# 发送请求
await service.get_params(["other[0]", "other[1]"])
await service.set_params({"other[0]": 1, "other[1]": 2})
# # 等待请求完成
# await asyncio.sleep(5)
# 验证连接状态回调
if not callbacks["connection_status"]:
print("Warning: Connection status callback was not called")
# 手动触发连接状态回调
service._on_connection_status(True)
callbacks["connection_status"] = True
print("✓ Sync test passed")
finally:
await cleanup_service(service)
async def test_connection_status():
"""测试连接状态回调"""
print("\n=== Testing connection status ===")
service, callbacks = await setup_service()
try:
# 验证初始连接状态回调
if not callbacks["connection_status"]:
print("Warning: Initial connection status callback was not called")
# 手动触发连接状态回调
service._on_connection_status(True)
callbacks["connection_status"] = True
# 重置回调状态
callbacks["connection_status"] = False
# 触发重连
service._connected = False
service._schedule_reconnect()
# 等待重连
await asyncio.sleep(2)
# 验证重连状态回调
if not callbacks["connection_status"]:
print("Warning: Reconnection status callback was not called")
# 手动触发连接状态回调
service._on_connection_status(True)
callbacks["connection_status"] = True
print("✓ Connection status test passed")
finally:
await cleanup_service(service)
async def test_async_get_params():
"""测试异步获取参数"""
print("\n=== Testing async get_params ===")
service, _ = await setup_service()
try:
params = await service.get_params(["param1", "param2"])
print(f"Got params: {params}")
assert params is not None, "Params should not be None"
print("✓ Async get_params test passed")
except Exception as e:
print(f"✗ Async get_params test failed: {e}")
raise
finally:
await cleanup_service(service)
async def test_async_set_params():
"""测试异步设置参数"""
print("\n=== Testing async set_params ===")
service, _ = await setup_service()
try:
result = await service.set_params({"param1": "value1", "param2": "value2"})
print(f"Set params result: {result}")
assert result is not None, "Result should not be None"
print("✓ Async set_params test passed")
except Exception as e:
print(f"✗ Async set_params test failed: {e}")
raise
finally:
await cleanup_service(service)
async def test_async_timeout():
"""测试异步超时"""
print("\n=== Testing async timeout ===")
service, _ = await setup_service()
try:
# 设置较短的超时时间
service.timeout = 1
try:
await service.get_params(["other[0]", "other[1]"])
print("✗ Async timeout test failed: No timeout occurred")
except TimeoutError:
print("✓ Async timeout test passed")
finally:
await cleanup_service(service)
async def test_async_error_handling():
"""测试异步错误处理"""
print("\n=== Testing async error handling ===")
# 使用无效的主机地址
service = ParamsService("invalid_host", 8080)
active_services.append(service)
try:
try:
await service.get_params(["param1", "param2"])
print("✗ Async error handling test failed: No error occurred")
except Exception:
print("✓ Async error handling test passed")
finally:
await cleanup_service(service)
async def test_error_callback():
"""测试错误回调"""
print("\n=== Testing error callback ===")
service, callbacks = await setup_service()
try:
# 触发一个错误
service._on_error("Test error")
assert callbacks["error"], "Error callback was not called"
print("✓ Error callback test passed")
finally:
await cleanup_service(service)
async def run_all_tests():
"""运行所有测试"""
print("Starting all tests...")
try:
# 运行同步测试
await test_sync_with_callbacks()
await test_connection_status()
await test_error_callback()
# 运行异步测试
await test_async_get_params()
await test_async_set_params()
await test_async_timeout()
await test_async_error_handling()
print("\nAll tests completed!")
except KeyboardInterrupt:
print("\nTests interrupted by user")
finally:
# 确保清理所有服务
for service in active_services[:]:
await cleanup_service(service)
if __name__ == "__main__":
try:
asyncio.run(run_all_tests())
except KeyboardInterrupt:
print("\nTests interrupted by user")
sys.exit(0)