Python Flask gRPC Gateway 服务构建指南

Python Flask gRPC Gateway 服务构建指南

本文档介绍如何使用 Flask 构建 gRPC Gateway 服务,将 HTTP RESTful API 请求转换为 gRPC 调用。这种架构模式常用于微服务架构中,提供统一的 HTTP 入口。

📋 前置要求

  • Python 3.9+(推荐 Python 3.11+)
  • pip 或 poetry 包管理器
  • 已安装的 gRPC 服务(作为后端服务)

🏗️ 项目结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
flask-grpc-gateway/
├── proto/ # Protocol Buffers 定义文件
│ └── demo.proto
├── generated/ # 生成的 Python 代码(gitignore)
│ ├── demo_pb2.py
│ └── demo_pb2_grpc.py
├── app/
│ ├── __init__.py
│ ├── routes.py # Flask 路由定义
│ └── grpc_client.py # gRPC 客户端封装
├── requirements.txt
├── build.sh # 生成 gRPC 代码的脚本
├── run.sh # 启动脚本
└── app.py # Flask 应用入口

📦 依赖安装

requirements.txt

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# Web 框架
Flask==3.0.0
gunicorn==21.2.0

# gRPC 相关
grpcio==1.60.0
grpcio-tools==1.60.0
protobuf==4.25.1

# 可选:异步支持
gevent==23.9.1

# 可选:API 文档
flask-restx==1.3.0

# 可选:配置管理
python-dotenv==1.0.0

安装依赖

1
2
3
4
5
6
7
8
9
10
11
# 创建虚拟环境(推荐)
python3 -m venv venv
source venv/bin/activate # Linux/Mac
# 或
venv\Scripts\activate # Windows

# 安装依赖
pip install -r requirements.txt

# 或使用国内镜像加速
pip install -r requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple

📝 Protocol Buffers 定义

proto/demo.proto

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
syntax = "proto3";

package demo;

// 请求消息
message RequestData {
string data = 1;
map<string, string> metadata = 2; // 可选:元数据
}

// 响应消息
message ResponseData {
int64 return_code = 1;
string message = 2;
string data = 3;
map<string, string> metadata = 4; // 可选:元数据
}

// 服务定义
service DemoService {
rpc CreateOne(RequestData) returns (ResponseData) {}
rpc DeleteOne(RequestData) returns (ResponseData) {}
rpc TransferOne(RequestData) returns (ResponseData) {}
rpc GetCreateNotify(RequestData) returns (ResponseData) {}
}

🔧 生成 gRPC 代码

build.sh

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#!/bin/bash

# 创建生成目录
mkdir -p generated

# 生成 Python gRPC 代码
python -m grpc_tools.protoc \
-I./proto \
--python_out=./generated \
--grpc_python_out=./generated \
--pyi_out=./generated \
proto/demo.proto

echo "gRPC 代码生成完成!"
echo "生成的文件:"
ls -la generated/

设置执行权限:

1
2
chmod +x build.sh
./build.sh

注意:生成的代码应该添加到 .gitignore

1
2
3
4
5
generated/
*.pyc
__pycache__/
venv/
.env

💻 Flask 应用实现

app/grpc_client.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
"""
gRPC 客户端封装
提供连接池和错误处理
"""

import grpc
from typing import Optional
import logging

# 导入生成的 gRPC 代码
import sys
import os
sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'generated'))

import demo_pb2
import demo_pb2_grpc

logger = logging.getLogger(__name__)


class GRPCClient:
"""gRPC 客户端封装类"""

def __init__(self, host: str = 'localhost', port: int = 9090):
"""
初始化 gRPC 客户端

Args:
host: gRPC 服务器地址
port: gRPC 服务器端口
"""
self.host = host
self.port = port
self.channel: Optional[grpc.Channel] = None
self.stub: Optional[demo_pb2_grpc.DemoServiceStub] = None

def connect(self):
"""建立 gRPC 连接"""
try:
# 创建 gRPC 通道
self.channel = grpc.insecure_channel(f'{self.host}:{self.port}')

# 创建存根
self.stub = demo_pb2_grpc.DemoServiceStub(self.channel)

# 等待通道就绪(可选)
grpc.channel_ready_future(self.channel).result(timeout=5)

logger.info(f"gRPC 连接成功: {self.host}:{self.port}")
except grpc.FutureTimeoutError:
logger.error(f"gRPC 连接超时: {self.host}:{self.port}")
raise
except Exception as e:
logger.error(f"gRPC 连接失败: {e}")
raise

def close(self):
"""关闭 gRPC 连接"""
if self.channel:
self.channel.close()
logger.info("gRPC 连接已关闭")

def create_one(self, data: str, metadata: Optional[dict] = None) -> dict:
"""
调用 CreateOne RPC

Args:
data: 请求数据
metadata: 可选的元数据

Returns:
响应字典
"""
if not self.stub:
self.connect()

try:
# 构建请求
request_metadata = {}
if metadata:
request_metadata = metadata

request = demo_pb2.RequestData(
data=data,
metadata=request_metadata
)

# 调用 RPC
response = self.stub.CreateOne(request)

return {
'return_code': response.return_code,
'message': response.message,
'data': response.data,
'metadata': dict(response.metadata)
}
except grpc.RpcError as e:
logger.error(f"gRPC 调用失败: {e.code()} - {e.details()}")
raise

def delete_one(self, data: str, metadata: Optional[dict] = None) -> dict:
"""调用 DeleteOne RPC"""
if not self.stub:
self.connect()

try:
request = demo_pb2.RequestData(
data=data,
metadata=metadata or {}
)
response = self.stub.DeleteOne(request)

return {
'return_code': response.return_code,
'message': response.message,
'data': response.data,
'metadata': dict(response.metadata)
}
except grpc.RpcError as e:
logger.error(f"gRPC 调用失败: {e.code()} - {e.details()}")
raise

def transfer_one(self, data: str, metadata: Optional[dict] = None) -> dict:
"""调用 TransferOne RPC"""
if not self.stub:
self.connect()

try:
request = demo_pb2.RequestData(
data=data,
metadata=metadata or {}
)
response = self.stub.TransferOne(request)

return {
'return_code': response.return_code,
'message': response.message,
'data': response.data,
'metadata': dict(response.metadata)
}
except grpc.RpcError as e:
logger.error(f"gRPC 调用失败: {e.code()} - {e.details()}")
raise

def get_create_notify(self, data: str, metadata: Optional[dict] = None) -> dict:
"""调用 GetCreateNotify RPC"""
if not self.stub:
self.connect()

try:
request = demo_pb2.RequestData(
data=data,
metadata=metadata or {}
)
response = self.stub.GetCreateNotify(request)

return {
'return_code': response.return_code,
'message': response.message,
'data': response.data,
'metadata': dict(response.metadata)
}
except grpc.RpcError as e:
logger.error(f"gRPC 调用失败: {e.code()} - {e.details()}")
raise


# 全局客户端实例(可选:使用连接池)
_grpc_client: Optional[GRPCClient] = None


def get_grpc_client() -> GRPCClient:
"""获取全局 gRPC 客户端实例"""
global _grpc_client

if _grpc_client is None:
import os
grpc_host = os.getenv('GRPC_HOST', 'localhost')
grpc_port = int(os.getenv('GRPC_PORT', '9090'))
_grpc_client = GRPCClient(host=grpc_host, port=grpc_port)
_grpc_client.connect()

return _grpc_client

app/routes.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
"""
Flask 路由定义
"""

from flask import Blueprint, request, jsonify
from app.grpc_client import get_grpc_client
import logging

logger = logging.getLogger(__name__)

api = Blueprint('api', __name__)


@api.route('/health', methods=['GET'])
def health():
"""健康检查端点"""
return jsonify({
'status': 'healthy',
'service': 'flask-grpc-gateway'
}), 200


@api.route('/api/v1/create', methods=['POST'])
def create_one():
"""
创建资源

Request Body:
{
"data": "string",
"metadata": {} // 可选
}
"""
try:
data = request.get_json()

if not data or 'data' not in data:
return jsonify({
'error': 'Missing required field: data'
}), 400

# 获取 gRPC 客户端
client = get_grpc_client()

# 调用 gRPC 服务
result = client.create_one(
data=data['data'],
metadata=data.get('metadata')
)

return jsonify(result), 200

except Exception as e:
logger.error(f"CreateOne 失败: {e}")
return jsonify({
'error': str(e),
'return_code': -1
}), 500


@api.route('/api/v1/delete', methods=['POST'])
def delete_one():
"""删除资源"""
try:
data = request.get_json()

if not data or 'data' not in data:
return jsonify({
'error': 'Missing required field: data'
}), 400

client = get_grpc_client()
result = client.delete_one(
data=data['data'],
metadata=data.get('metadata')
)

return jsonify(result), 200

except Exception as e:
logger.error(f"DeleteOne 失败: {e}")
return jsonify({
'error': str(e),
'return_code': -1
}), 500


@api.route('/api/v1/transfer', methods=['POST'])
def transfer_one():
"""转移资源"""
try:
data = request.get_json()

if not data or 'data' not in data:
return jsonify({
'error': 'Missing required field: data'
}), 400

client = get_grpc_client()
result = client.transfer_one(
data=data['data'],
metadata=data.get('metadata')
)

return jsonify(result), 200

except Exception as e:
logger.error(f"TransferOne 失败: {e}")
return jsonify({
'error': str(e),
'return_code': -1
}), 500


@api.route('/api/v1/notify', methods=['POST'])
def get_create_notify():
"""获取创建通知"""
try:
data = request.get_json()

if not data or 'data' not in data:
return jsonify({
'error': 'Missing required field: data'
}), 400

client = get_grpc_client()
result = client.get_create_notify(
data=data['data'],
metadata=data.get('metadata')
)

return jsonify(result), 200

except Exception as e:
logger.error(f"GetCreateNotify 失败: {e}")
return jsonify({
'error': str(e),
'return_code': -1
}), 500

app/init.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
"""
Flask 应用初始化
"""

from flask import Flask
from app.routes import api
import logging

logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)


def create_app():
"""创建 Flask 应用"""
app = Flask(__name__)

# 注册蓝图
app.register_blueprint(api)

return app

app.py

1
2
3
4
5
6
7
8
9
10
11
"""
Flask 应用入口
"""

from app import create_app

app = create_app()

if __name__ == '__main__':
# 开发模式
app.run(host='0.0.0.0', port=5000, debug=True)

🚀 启动服务

run.sh(开发模式)

1
2
3
4
5
6
7
8
9
10
#!/bin/bash

# 设置环境变量
export FLASK_APP=app.py
export FLASK_ENV=development
export GRPC_HOST=localhost
export GRPC_PORT=9090

# 启动 Flask 开发服务器
flask run --host=0.0.0.0 --port=5000

run_prod.sh(生产模式)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#!/bin/bash

# 设置环境变量
export GRPC_HOST=${GRPC_HOST:-localhost}
export GRPC_PORT=${GRPC_PORT:-9090}

# 使用 Gunicorn 启动(推荐生产环境)
gunicorn \
--bind 0.0.0.0:5000 \
--workers 4 \
--worker-class gevent \
--worker-connections 1000 \
--timeout 30 \
--access-logfile - \
--error-logfile - \
app:app

设置执行权限:

1
chmod +x run.sh run_prod.sh

🧪 测试

使用 curl 测试

1
2
3
4
5
6
7
8
9
10
11
12
# 健康检查
curl http://localhost:5000/health

# 创建资源
curl -X POST http://localhost:5000/api/v1/create \
-H "Content-Type: application/json" \
-d '{"data": "test data", "metadata": {"key": "value"}}'

# 删除资源
curl -X POST http://localhost:5000/api/v1/delete \
-H "Content-Type: application/json" \
-d '{"data": "test data"}'

使用 Python requests 测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import requests

base_url = "http://localhost:5000"

# 健康检查
response = requests.get(f"{base_url}/health")
print(response.json())

# 创建资源
response = requests.post(
f"{base_url}/api/v1/create",
json={"data": "test data", "metadata": {"key": "value"}}
)
print(response.json())

🔒 生产环境建议

  1. 使用环境变量配置:使用 python-dotenv 管理配置
  2. 添加认证:使用 JWT 或 API Key 认证
  3. 限流:使用 Flask-Limiter 限制请求频率
  4. 日志记录:配置结构化日志和日志聚合
  5. 监控:添加 Prometheus 指标和健康检查
  6. TLS:在生产环境使用 HTTPS
  7. gRPC 连接池:使用连接池管理 gRPC 连接

📚 参考资源

0%