1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183
| """ gRPC 客户端封装 提供连接池和错误处理 """
import grpc from typing import Optional import logging
import sys import os sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'generated'))
import demo_pb2 import demo_pb2_grpc
logger = logging.getLogger(__name__)
class GRPCClient: """gRPC 客户端封装类""" def __init__(self, host: str = 'localhost', port: int = 9090): """ 初始化 gRPC 客户端 Args: host: gRPC 服务器地址 port: gRPC 服务器端口 """ self.host = host self.port = port self.channel: Optional[grpc.Channel] = None self.stub: Optional[demo_pb2_grpc.DemoServiceStub] = None def connect(self): """建立 gRPC 连接""" try: self.channel = grpc.insecure_channel(f'{self.host}:{self.port}') self.stub = demo_pb2_grpc.DemoServiceStub(self.channel) grpc.channel_ready_future(self.channel).result(timeout=5) logger.info(f"gRPC 连接成功: {self.host}:{self.port}") except grpc.FutureTimeoutError: logger.error(f"gRPC 连接超时: {self.host}:{self.port}") raise except Exception as e: logger.error(f"gRPC 连接失败: {e}") raise def close(self): """关闭 gRPC 连接""" if self.channel: self.channel.close() logger.info("gRPC 连接已关闭") def create_one(self, data: str, metadata: Optional[dict] = None) -> dict: """ 调用 CreateOne RPC Args: data: 请求数据 metadata: 可选的元数据 Returns: 响应字典 """ if not self.stub: self.connect() try: request_metadata = {} if metadata: request_metadata = metadata request = demo_pb2.RequestData( data=data, metadata=request_metadata ) response = self.stub.CreateOne(request) return { 'return_code': response.return_code, 'message': response.message, 'data': response.data, 'metadata': dict(response.metadata) } except grpc.RpcError as e: logger.error(f"gRPC 调用失败: {e.code()} - {e.details()}") raise def delete_one(self, data: str, metadata: Optional[dict] = None) -> dict: """调用 DeleteOne RPC""" if not self.stub: self.connect() try: request = demo_pb2.RequestData( data=data, metadata=metadata or {} ) response = self.stub.DeleteOne(request) return { 'return_code': response.return_code, 'message': response.message, 'data': response.data, 'metadata': dict(response.metadata) } except grpc.RpcError as e: logger.error(f"gRPC 调用失败: {e.code()} - {e.details()}") raise def transfer_one(self, data: str, metadata: Optional[dict] = None) -> dict: """调用 TransferOne RPC""" if not self.stub: self.connect() try: request = demo_pb2.RequestData( data=data, metadata=metadata or {} ) response = self.stub.TransferOne(request) return { 'return_code': response.return_code, 'message': response.message, 'data': response.data, 'metadata': dict(response.metadata) } except grpc.RpcError as e: logger.error(f"gRPC 调用失败: {e.code()} - {e.details()}") raise def get_create_notify(self, data: str, metadata: Optional[dict] = None) -> dict: """调用 GetCreateNotify RPC""" if not self.stub: self.connect() try: request = demo_pb2.RequestData( data=data, metadata=metadata or {} ) response = self.stub.GetCreateNotify(request) return { 'return_code': response.return_code, 'message': response.message, 'data': response.data, 'metadata': dict(response.metadata) } except grpc.RpcError as e: logger.error(f"gRPC 调用失败: {e.code()} - {e.details()}") raise
_grpc_client: Optional[GRPCClient] = None
def get_grpc_client() -> GRPCClient: """获取全局 gRPC 客户端实例""" global _grpc_client if _grpc_client is None: import os grpc_host = os.getenv('GRPC_HOST', 'localhost') grpc_port = int(os.getenv('GRPC_PORT', '9090')) _grpc_client = GRPCClient(host=grpc_host, port=grpc_port) _grpc_client.connect() return _grpc_client
|