Python gRPC 微服务构建指南

Python gRPC 微服务构建指南

本文档介绍如何使用 Python 构建 gRPC 微服务。gRPC 是一个高性能、开源的 RPC 框架,支持多种编程语言,非常适合构建微服务架构。

📋 前置要求

  • Python 3.9+(推荐 Python 3.11+)
  • pip 或 poetry 包管理器
  • Protocol Buffers 编译器(protoc)

🏗️ 项目结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
python-grpc-service/
├── proto/ # Protocol Buffers 定义文件
│ └── demo.proto
├── generated/ # 生成的 Python 代码(gitignore)
│ ├── demo_pb2.py
│ ├── demo_pb2_grpc.py
│ └── demo_pb2.pyi
├── server/ # 服务器代码
│ ├── __init__.py
│ └── server.py
├── client/ # 客户端代码
│ ├── __init__.py
│ └── client.py
├── requirements.txt
├── build.sh # 生成 gRPC 代码的脚本
└── README.md

📦 依赖安装

requirements.txt

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# gRPC 核心库
grpcio==1.60.0
grpcio-tools==1.60.0
protobuf==4.25.1

# 可选:异步支持
grpcio[aio]==1.60.0

# 可选:健康检查
grpcio-health-checking==1.60.0

# 可选:反射支持
grpcio-reflection==1.60.0

# 可选:其他工具
python-dotenv==1.0.0

安装依赖

1
2
3
4
5
6
7
8
9
10
11
# 创建虚拟环境
python3 -m venv venv
source venv/bin/activate # Linux/Mac
# 或
venv\Scripts\activate # Windows

# 安装依赖
pip install -r requirements.txt

# 或使用国内镜像
pip install -r requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple

安装 Protocol Buffers 编译器

Linux (Ubuntu/Debian):

1
2
sudo apt-get update
sudo apt-get install -y protobuf-compiler

macOS:

1
brew install protobuf

Windows:

下载并安装 Protocol Buffers 编译器

验证安装:

1
protoc --version

📝 Protocol Buffers 定义

proto/demo.proto

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
syntax = "proto3";

package demo;

// 导入 Google 的常用类型(可选)
import "google/protobuf/timestamp.proto";
import "google/protobuf/empty.proto";

// 请求消息
message RequestData {
string data = 1;
map<string, string> metadata = 2; // 可选:元数据
google.protobuf.Timestamp timestamp = 3; // 可选:时间戳
}

// 响应消息
message ResponseData {
int64 return_code = 1;
string message = 2;
string data = 3;
map<string, string> metadata = 4; // 可选:元数据
google.protobuf.Timestamp timestamp = 5; // 可选:时间戳
}

// 服务定义
service DemoService {
// 创建资源
rpc CreateOne(RequestData) returns (ResponseData) {}

// 删除资源
rpc DeleteOne(RequestData) returns (ResponseData) {}

// 转移资源
rpc TransferOne(RequestData) returns (ResponseData) {}

// 获取创建通知
rpc GetCreateNotify(RequestData) returns (ResponseData) {}

// 流式响应示例(可选)
rpc StreamResponse(RequestData) returns (stream ResponseData) {}

// 流式请求示例(可选)
rpc StreamRequest(stream RequestData) returns (ResponseData) {}

// 双向流示例(可选)
rpc StreamBidirectional(stream RequestData) returns (stream ResponseData) {}
}

🔧 生成 gRPC 代码

build.sh

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#!/bin/bash

# 创建生成目录
mkdir -p generated

# 下载 Google API 定义(如果需要)
mkdir -p proto/google/protobuf
# 注意:实际使用时,google/protobuf 定义通常已经包含在 protobuf 安装中

# 生成 Python gRPC 代码
python -m grpc_tools.protoc \
-I./proto \
-I./proto/google \
--python_out=./generated \
--grpc_python_out=./generated \
--pyi_out=./generated \
proto/demo.proto

# 创建 __init__.py
touch generated/__init__.py

echo "gRPC 代码生成完成!"
echo "生成的文件:"
ls -la generated/

设置执行权限并运行:

1
2
chmod +x build.sh
./build.sh

注意:生成的代码应该添加到 .gitignore

1
2
3
4
5
generated/
*.pyc
__pycache__/
venv/
.env

💻 服务器实现

server/server.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
"""
gRPC 服务器实现
"""

import logging
import time
from concurrent import futures
from datetime import datetime

import grpc
from google.protobuf import timestamp_pb2

# 导入生成的 gRPC 代码
import sys
import os
sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'generated'))

import demo_pb2
import demo_pb2_grpc

logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)


class DemoServiceServicer(demo_pb2_grpc.DemoServiceServicer):
"""DemoService 服务实现"""

def CreateOne(self, request, context):
"""
创建资源

Args:
request: RequestData 消息
context: gRPC 上下文

Returns:
ResponseData 消息
"""
logger.info(f"CreateOne 请求: {request.data}")
logger.info(f"元数据: {dict(request.metadata)}")

# 处理业务逻辑
# ...

# 构建响应
response = demo_pb2.ResponseData(
return_code=0,
message="success",
data=f"Created: {request.data}",
metadata={"server": "python-grpc-service"}
)

# 设置时间戳
now = timestamp_pb2.Timestamp()
now.FromDatetime(datetime.utcnow())
response.timestamp.CopyFrom(now)

logger.info(f"CreateOne 响应: {response.message}")
return response

def DeleteOne(self, request, context):
"""删除资源"""
logger.info(f"DeleteOne 请求: {request.data}")

response = demo_pb2.ResponseData(
return_code=0,
message="success",
data=f"Deleted: {request.data}",
metadata={"server": "python-grpc-service"}
)

now = timestamp_pb2.Timestamp()
now.FromDatetime(datetime.utcnow())
response.timestamp.CopyFrom(now)

return response

def TransferOne(self, request, context):
"""转移资源"""
logger.info(f"TransferOne 请求: {request.data}")

response = demo_pb2.ResponseData(
return_code=0,
message="success",
data=f"Transferred: {request.data}",
metadata={"server": "python-grpc-service"}
)

now = timestamp_pb2.Timestamp()
now.FromDatetime(datetime.utcnow())
response.timestamp.CopyFrom(now)

return response

def GetCreateNotify(self, request, context):
"""获取创建通知"""
logger.info(f"GetCreateNotify 请求: {request.data}")

response = demo_pb2.ResponseData(
return_code=0,
message="success",
data=f"Notify for: {request.data}",
metadata={"server": "python-grpc-service"}
)

now = timestamp_pb2.Timestamp()
now.FromDatetime(datetime.utcnow())
response.timestamp.CopyFrom(now)

return response

def StreamResponse(self, request, context):
"""流式响应示例"""
logger.info(f"StreamResponse 请求: {request.data}")

for i in range(5):
response = demo_pb2.ResponseData(
return_code=0,
message="success",
data=f"Stream item {i+1}",
metadata={"index": str(i+1)}
)
yield response
time.sleep(1)

def StreamRequest(self, request_iterator, context):
"""流式请求示例"""
items = []
for request in request_iterator:
logger.info(f"收到流式请求: {request.data}")
items.append(request.data)

response = demo_pb2.ResponseData(
return_code=0,
message="success",
data=f"收到 {len(items)} 个请求",
metadata={"count": str(len(items))}
)
return response

def StreamBidirectional(self, request_iterator, context):
"""双向流示例"""
for request in request_iterator:
logger.info(f"收到双向流请求: {request.data}")
response = demo_pb2.ResponseData(
return_code=0,
message="success",
data=f"Echo: {request.data}",
metadata={"echo": "true"}
)
yield response


def serve():
"""启动 gRPC 服务器"""
# 创建服务器
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=10),
options=[
('grpc.keepalive_time_ms', 30000),
('grpc.keepalive_timeout_ms', 5000),
('grpc.keepalive_permit_without_calls', True),
('grpc.http2.max_pings_without_data', 0),
('grpc.http2.min_time_between_pings_ms', 10000),
('grpc.http2.min_ping_interval_without_data_ms', 300000),
]
)

# 添加服务
demo_pb2_grpc.add_DemoServiceServicer_to_server(
DemoServiceServicer(),
server
)

# 监听端口
listen_addr = '[::]:9090'
server.add_insecure_port(listen_addr)

# 启动服务器
server.start()
logger.info(f"gRPC 服务器启动,监听: {listen_addr}")

try:
# 保持运行
server.wait_for_termination()
except KeyboardInterrupt:
logger.info("收到停止信号,正在关闭服务器...")
server.stop(0)
logger.info("服务器已关闭")


if __name__ == '__main__':
serve()

💻 客户端实现

client/client.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
"""
gRPC 客户端实现
"""

import logging
import sys
import os

import grpc
from google.protobuf import timestamp_pb2
from datetime import datetime

# 导入生成的 gRPC 代码
sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'generated'))

import demo_pb2
import demo_pb2_grpc

logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)


class GRPCClient:
"""gRPC 客户端封装"""

def __init__(self, host: str = 'localhost', port: int = 9090):
"""
初始化客户端

Args:
host: 服务器地址
port: 服务器端口
"""
self.host = host
self.port = port
self.channel = None
self.stub = None

def connect(self):
"""建立连接"""
server_address = f'{self.host}:{self.port}'

# 创建通道
self.channel = grpc.insecure_channel(
server_address,
options=[
('grpc.keepalive_time_ms', 30000),
('grpc.keepalive_timeout_ms', 5000),
('grpc.keepalive_permit_without_calls', True),
]
)

# 创建存根
self.stub = demo_pb2_grpc.DemoServiceStub(self.channel)

# 等待通道就绪
try:
grpc.channel_ready_future(self.channel).result(timeout=5)
logger.info(f"连接成功: {server_address}")
except grpc.FutureTimeoutError:
logger.error(f"连接超时: {server_address}")
raise

def close(self):
"""关闭连接"""
if self.channel:
self.channel.close()
logger.info("连接已关闭")

def create_one(self, data: str, metadata: dict = None):
"""调用 CreateOne"""
if not self.stub:
self.connect()

request = demo_pb2.RequestData(
data=data,
metadata=metadata or {}
)

# 设置时间戳
now = timestamp_pb2.Timestamp()
now.FromDatetime(datetime.utcnow())
request.timestamp.CopyFrom(now)

try:
response = self.stub.CreateOne(request)
logger.info(f"CreateOne 响应: {response.message}")
return response
except grpc.RpcError as e:
logger.error(f"RPC 错误: {e.code()} - {e.details()}")
raise

def delete_one(self, data: str, metadata: dict = None):
"""调用 DeleteOne"""
if not self.stub:
self.connect()

request = demo_pb2.RequestData(data=data, metadata=metadata or {})
response = self.stub.DeleteOne(request)
logger.info(f"DeleteOne 响应: {response.message}")
return response

def transfer_one(self, data: str, metadata: dict = None):
"""调用 TransferOne"""
if not self.stub:
self.connect()

request = demo_pb2.RequestData(data=data, metadata=metadata or {})
response = self.stub.TransferOne(request)
logger.info(f"TransferOne 响应: {response.message}")
return response

def get_create_notify(self, data: str, metadata: dict = None):
"""调用 GetCreateNotify"""
if not self.stub:
self.connect()

request = demo_pb2.RequestData(data=data, metadata=metadata or {})
response = self.stub.GetCreateNotify(request)
logger.info(f"GetCreateNotify 响应: {response.message}")
return response

def stream_response(self, data: str):
"""调用流式响应"""
if not self.stub:
self.connect()

request = demo_pb2.RequestData(data=data)

logger.info("开始接收流式响应...")
for response in self.stub.StreamResponse(request):
logger.info(f"收到: {response.data}")

def stream_request(self, data_list: list):
"""调用流式请求"""
if not self.stub:
self.connect()

def request_generator():
for data in data_list:
yield demo_pb2.RequestData(data=data)

response = self.stub.StreamRequest(request_generator())
logger.info(f"流式请求响应: {response.data}")
return response


def main():
"""示例用法"""
client = GRPCClient(host='localhost', port=9090)

try:
# 测试基本 RPC
print("\n=== 测试 CreateOne ===")
response = client.create_one("test data", {"key": "value"})
print(f"返回码: {response.return_code}")
print(f"消息: {response.message}")
print(f"数据: {response.data}")

print("\n=== 测试 DeleteOne ===")
response = client.delete_one("test data")
print(f"返回码: {response.return_code}")
print(f"消息: {response.message}")

print("\n=== 测试 TransferOne ===")
response = client.transfer_one("test data")
print(f"返回码: {response.return_code}")

print("\n=== 测试 GetCreateNotify ===")
response = client.get_create_notify("test data")
print(f"返回码: {response.return_code}")

print("\n=== 测试流式响应 ===")
client.stream_response("stream test")

print("\n=== 测试流式请求 ===")
client.stream_request(["item1", "item2", "item3"])

finally:
client.close()


if __name__ == '__main__':
main()

🚀 运行服务

启动服务器

1
2
3
4
5
# 开发模式
python server/server.py

# 或使用后台运行
nohup python server/server.py > server.log 2>&1 &

运行客户端

1
python client/client.py

🧪 测试

使用 grpcurl(命令行工具)

安装 grpcurl:

1
2
3
4
5
# macOS
brew install grpcurl

# Linux
# 下载二进制文件:https://github.com/fullstorydev/grpcurl/releases

测试服务:

1
2
3
4
5
6
7
8
# 列出服务
grpcurl -plaintext localhost:9090 list

# 调用 CreateOne
grpcurl -plaintext \
-d '{"data": "test"}' \
localhost:9090 \
demo.DemoService/CreateOne

🔒 生产环境建议

  1. 使用 TLS:在生产环境启用 TLS 加密
  2. 认证授权:实现 gRPC 拦截器进行认证
  3. 监控:集成 Prometheus 和 Grafana
  4. 日志:配置结构化日志和日志聚合
  5. 负载均衡:使用 gRPC 负载均衡器
  6. 超时和重试:配置合理的超时和重试策略
  7. 健康检查:实现 gRPC 健康检查服务

📚 参考资源

0%