
| """ gRPC 客户端封装 提供连接池和错误处理 """
import grpc from typing import Optional import logging
import sys import os sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'generated'))
import demo_pb2 import demo_pb2_grpc
logger = logging.getLogger(__name__)
class GRPCClient: """gRPC 客户端封装类""" def __init__(self, host: str = 'localhost', port: int = 9090): """ 初始化 gRPC 客户端 Args: host: gRPC 服务器地址 port: gRPC 服务器端口 """ self.host = host self.port = port self.channel: Optional[grpc.Channel] = None self.stub: Optional[demo_pb2_grpc.DemoServiceStub] = None def connect(self): """建立 gRPC 连接""" try: self.channel = grpc.insecure_channel(f'{self.host}:{self.port}') self.stub = demo_pb2_grpc.DemoServiceStub(self.channel) grpc.channel_ready_future(self.channel).result(timeout=5) logger.info(f"gRPC 连接成功: {self.host}:{self.port}") except grpc.FutureTimeoutError: logger.error(f"gRPC 连接超时: {self.host}:{self.port}") raise except Exception as e: logger.error(f"gRPC 连接失败: {e}") raise def close(self): """关闭 gRPC 连接""" if self.channel: self.channel.close() logger.info("gRPC 连接已关闭") def create_one(self, data: str, metadata: Optional[dict] = None) -> dict: """ 调用 CreateOne RPC Args: data: 请求数据 metadata: 可选的元数据 Returns: 响应字典 """ if not self.stub: self.connect() try: request_metadata = {} if metadata: request_metadata = metadata request = demo_pb2.RequestData( data=data, metadata=request_metadata ) response = self.stub.CreateOne(request) return { 'return_code': response.return_code, 'message': response.message, 'data': response.data, 'metadata': dict(response.metadata) } except grpc.RpcError as e: logger.error(f"gRPC 调用失败: {e.code()} - {e.details()}") raise def delete_one(self, data: str, metadata: Optional[dict] = None) -> dict: """调用 DeleteOne RPC""" if not self.stub: self.connect() try: request = demo_pb2.RequestData( data=data, metadata=metadata or {} ) response = self.stub.DeleteOne(request) return { 'return_code': response.return_code, 'message': response.message, 'data': response.data, 'metadata': dict(response.metadata) } except grpc.RpcError as e: logger.error(f"gRPC 调用失败: {e.code()} - {e.details()}") raise def transfer_one(self, data: str, metadata: Optional[dict] = None) -> dict: """调用 TransferOne RPC""" if not self.stub: self.connect() try: request = demo_pb2.RequestData( data=data, metadata=metadata or {} ) response = self.stub.TransferOne(request) return { 'return_code': response.return_code, 'message': response.message, 'data': response.data, 'metadata': dict(response.metadata) } except grpc.RpcError as e: logger.error(f"gRPC 调用失败: {e.code()} - {e.details()}") raise def get_create_notify(self, data: str, metadata: Optional[dict] = None) -> dict: """调用 GetCreateNotify RPC""" if not self.stub: self.connect() try: request = demo_pb2.RequestData( data=data, metadata=metadata or {} ) response = self.stub.GetCreateNotify(request) return { 'return_code': response.return_code, 'message': response.message, 'data': response.data, 'metadata': dict(response.metadata) } except grpc.RpcError as e: logger.error(f"gRPC 调用失败: {e.code()} - {e.details()}") raise
_grpc_client: Optional[GRPCClient] = None
def get_grpc_client() -> GRPCClient: """获取全局 gRPC 客户端实例""" global _grpc_client if _grpc_client is None: import os grpc_host = os.getenv('GRPC_HOST', 'localhost') grpc_port = int(os.getenv('GRPC_PORT', '9090')) _grpc_client = GRPCClient(host=grpc_host, port=grpc_port) _grpc_client.connect() return _grpc_client
|