1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
| """ etcd 服务发现客户端示例 使用 etcd3-py 库(推荐) """
import etcd3 import json from typing import List, Dict
class ServiceDiscovery: def __init__(self, host: str = '192.168.3.170', port: int = 2379): """ 初始化 etcd 客户端 Args: host: etcd 服务器地址 port: etcd 服务器端口 """ self.client = etcd3.client(host=host, port=port) self.service_prefix = '/services/' def list_services(self) -> List[str]: """ 列出所有已注册的服务 Returns: 服务名称列表 """ services = set() for value, metadata in self.client.get_prefix(self.service_prefix): if metadata: key = metadata.key.decode('utf-8') service_name = key.replace(self.service_prefix, '').split('/')[0] services.add(service_name) return list(services) def get_service_instances(self, service_name: str) -> List[Dict]: """ 获取指定服务的所有实例 Args: service_name: 服务名称 Returns: 服务实例列表,每个实例包含地址和端口信息 """ instances = [] service_path = f"{self.service_prefix}{service_name}/" for value, metadata in self.client.get_prefix(service_path): if value and metadata: try: service_info = json.loads(value.decode('utf-8')) instances.append({ 'key': metadata.key.decode('utf-8'), 'value': service_info, 'address': service_info.get('Addr', ''), 'port': service_info.get('Port', '') }) except json.JSONDecodeError: instances.append({ 'key': metadata.key.decode('utf-8'), 'value': value.decode('utf-8') }) return instances def watch_service(self, service_name: str, callback): """ 监听服务变化 Args: service_name: 服务名称 callback: 回调函数,接收 (event_type, key, value) 参数 """ service_path = f"{self.service_prefix}{service_name}/" def watch_callback(event): event_type = 'PUT' if isinstance(event, etcd3.events.PutEvent) else 'DELETE' key = event.key.decode('utf-8') if event.key else '' value = event.value.decode('utf-8') if hasattr(event, 'value') and event.value else '' callback(event_type, key, value) for event in self.client.watch_prefix(service_path): watch_callback(event)
def main(): """示例用法""" sd = ServiceDiscovery(host='192.168.3.170', port=2379) print("已注册的服务:") services = sd.list_services() for service in services: print(f" - {service}") if services: service_name = services[0] print(f"\n服务 '{service_name}' 的实例:") instances = sd.get_service_instances(service_name) for instance in instances: print(f" - {instance}") def on_service_change(event_type, key, value): print(f"\n服务变化: {event_type} - {key}") if value: print(f" 值: {value}") print("\n开始监听服务变化(按 Ctrl+C 停止)...") try: if services: sd.watch_service(services[0], on_service_change) except KeyboardInterrupt: print("\n停止监听")
if __name__ == '__main__': main()
|