Python Celery 微服务异步任务处理指南

Python Celery 微服务异步任务处理指南

本文档介绍如何使用 Celery 构建异步任务处理微服务。Celery 是一个强大的分布式任务队列系统,常用于处理耗时任务、定时任务和异步任务。

📋 前置要求

  • Python 3.9+(推荐 Python 3.11+)
  • Redis 6.0+ 或 RabbitMQ 3.8+(作为消息代理)
  • Django 4.0+(可选,也可以使用纯 Python 项目)

🏗️ 项目结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
celery-demo-service/
├── demo/ # Django 项目目录
│ ├── __init__.py
│ ├── settings.py
│ ├── urls.py
│ ├── wsgi.py
│ └── celery.py # Celery 配置
├── demo_app/ # Django 应用
│ ├── __init__.py
│ ├── models.py
│ ├── views.py
│ └── tasks.py # Celery 任务定义
├── requirements.txt
├── manage.py
└── .env # 环境变量配置

📦 依赖安装

requirements.txt

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# Django 框架
Django==4.2.7

# Celery 相关
celery==5.3.4
celery[redis]==5.3.4

# Redis 客户端
redis==5.0.1

# 可选:Django Celery Beat(定时任务)
django-celery-beat==2.5.0

# 可选:Django Celery Results(结果存储)
django-celery-results==2.5.0

# 可选:Flower(Celery 监控)
flower==2.0.1

# 可选:gRPC(如果需要 gRPC 集成)
grpcio==1.60.0
grpcio-tools==1.60.0

安装依赖

1
2
3
4
5
6
7
8
9
10
11
# 创建虚拟环境(推荐)
python3 -m venv venv
source venv/bin/activate # Linux/Mac
# 或
venv\Scripts\activate # Windows

# 安装依赖
pip install -r requirements.txt

# 或使用国内镜像加速
pip install -r requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple

⚙️ Celery 配置

demo/celery.py(Django 集成方式)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
"""
Celery 配置
适用于 Django 项目
"""

import os
from celery import Celery

# 设置 Django 默认设置模块
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'demo.settings')

# 创建 Celery 应用实例
app = Celery('demo')

# 从 Django 设置中加载配置
# 所有以 CELERY_ 开头的配置项都会被加载
app.config_from_object('django.conf:settings', namespace='CELERY')

# 自动发现任务
# 会在所有已安装的 Django 应用中查找 tasks.py 文件
app.autodiscover_tasks()


@app.task(bind=True, ignore_result=True)
def debug_task(self):
"""调试任务"""
print(f'Request: {self.request!r}')

demo/init.py

1
2
3
4
5
6
7
"""
确保 Celery 应用在 Django 启动时被加载
"""

from .celery import app as celery_app

__all__ = ('celery_app',)

demo/settings.py(Django 设置)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
"""
Django 设置文件
包含 Celery 配置
"""

import os
from pathlib import Path

# 构建路径
BASE_DIR = Path(__file__).resolve().parent.parent

# 安全设置
SECRET_KEY = os.getenv('SECRET_KEY', 'django-insecure-change-me-in-production')
DEBUG = os.getenv('DEBUG', 'False') == 'True'
ALLOWED_HOSTS = os.getenv('ALLOWED_HOSTS', 'localhost,127.0.0.1').split(',')

# 应用定义
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',

# Celery 相关应用
'django_celery_beat', # 定时任务
'django_celery_results', # 结果存储

# 自定义应用
'demo_app',
]

MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
]

ROOT_URLCONF = 'demo.urls'

TEMPLATES = [
{
'BACKEND': 'django.template.backends.django.DjangoTemplates',
'DIRS': [],
'APP_DIRS': True,
'OPTIONS': {
'context_processors': [
'django.template.context_processors.debug',
'django.template.context_processors.request',
'django.contrib.auth.context_processors.auth',
'django.contrib.messages.context_processors.messages',
],
},
},
]

WSGI_APPLICATION = 'demo.wsgi.application'

# 数据库配置
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': BASE_DIR / 'db.sqlite3',
}
}

# Celery 配置
CELERY_BROKER_URL = os.getenv('CELERY_BROKER_URL', 'redis://localhost:6379/0')
CELERY_RESULT_BACKEND = os.getenv('CELERY_RESULT_BACKEND', 'redis://localhost:6379/0')

# Celery 任务配置
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = True

# Celery Beat 配置(定时任务)
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'

# Celery Results 配置(结果存储)
CELERY_RESULT_BACKEND_DB = os.getenv('CELERY_RESULT_BACKEND_DB', 'django-db')
CELERY_RESULT_EXTENDED = True

# 任务路由配置(可选)
CELERY_TASK_ROUTES = {
'demo_app.tasks.send_email': {'queue': 'email'},
'demo_app.tasks.process_image': {'queue': 'image'},
}

# 任务优先级配置(可选)
CELERY_TASK_DEFAULT_PRIORITY = 5
CELERY_TASK_MAX_PRIORITY = 10

# Worker 配置
CELERY_WORKER_PREFETCH_MULTIPLIER = 4
CELERY_WORKER_MAX_TASKS_PER_CHILD = 1000

📝 任务定义

demo_app/tasks.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
"""
Celery 任务定义
"""

from celery import shared_task
from celery.utils.log import get_task_logger
import time
import requests

logger = get_task_logger(__name__)


@shared_task(name='demo_app.tasks.add')
def add(x, y):
"""
简单加法任务

Args:
x: 第一个数字
y: 第二个数字

Returns:
两数之和
"""
logger.info(f'计算 {x} + {y}')
result = x + y
logger.info(f'结果: {result}')
return result


@shared_task(name='demo_app.tasks.multiply')
def multiply(x, y):
"""乘法任务"""
logger.info(f'计算 {x} * {y}')
return x * y


@shared_task(name='demo_app.tasks.send_email', bind=True, max_retries=3)
def send_email(self, to_email, subject, body):
"""
发送邮件任务(示例)

Args:
to_email: 收件人邮箱
subject: 邮件主题
body: 邮件内容

Returns:
发送结果
"""
try:
logger.info(f'发送邮件到 {to_email}')

# 模拟发送邮件
# 实际应用中应该调用邮件服务 API
time.sleep(2)

# 模拟可能的失败
if 'error' in to_email:
raise Exception('邮件发送失败')

logger.info(f'邮件发送成功: {to_email}')
return {'status': 'success', 'to': to_email}

except Exception as exc:
logger.error(f'邮件发送失败: {exc}')
# 重试任务
raise self.retry(exc=exc, countdown=60)


@shared_task(name='demo_app.tasks.process_image', bind=True)
def process_image(self, image_url):
"""
处理图片任务

Args:
image_url: 图片 URL

Returns:
处理结果
"""
try:
logger.info(f'处理图片: {image_url}')

# 下载图片
response = requests.get(image_url, timeout=30)
response.raise_for_status()

# 模拟图片处理
time.sleep(3)

logger.info(f'图片处理完成: {image_url}')
return {
'status': 'success',
'url': image_url,
'size': len(response.content)
}

except Exception as exc:
logger.error(f'图片处理失败: {exc}')
raise


@shared_task(name='demo_app.tasks.long_running_task', bind=True)
def long_running_task(self, duration=10):
"""
长时间运行的任务

Args:
duration: 任务持续时间(秒)

Returns:
任务结果
"""
logger.info(f'开始长时间任务,持续 {duration} 秒')

for i in range(duration):
time.sleep(1)
# 更新任务状态
self.update_state(
state='PROGRESS',
meta={'current': i + 1, 'total': duration}
)
logger.info(f'进度: {i + 1}/{duration}')

logger.info('长时间任务完成')
return {'status': 'completed', 'duration': duration}


@shared_task(name='demo_app.tasks.periodic_task')
def periodic_task():
"""
定时任务示例
通过 Django Celery Beat 配置执行频率
"""
logger.info('执行定时任务')
# 执行定时任务逻辑
return {'status': 'success', 'timestamp': time.time()}

demo_app/views.py(Django 视图)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
"""
Django 视图
演示如何调用 Celery 任务
"""

from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from demo_app.tasks import add, send_email, process_image, long_running_task
import json


@csrf_exempt
def trigger_add_task(request):
"""触发加法任务"""
if request.method == 'POST':
data = json.loads(request.body)
x = data.get('x', 0)
y = data.get('y', 0)

# 异步调用任务
task = add.delay(x, y)

return JsonResponse({
'task_id': task.id,
'status': 'pending'
})

return JsonResponse({'error': 'Method not allowed'}, status=405)


@csrf_exempt
def trigger_email_task(request):
"""触发邮件发送任务"""
if request.method == 'POST':
data = json.loads(request.body)
to_email = data.get('to_email')
subject = data.get('subject', 'Test Email')
body = data.get('body', 'Test Body')

task = send_email.delay(to_email, subject, body)

return JsonResponse({
'task_id': task.id,
'status': 'pending'
})

return JsonResponse({'error': 'Method not allowed'}, status=405)


@csrf_exempt
def trigger_image_task(request):
"""触发图片处理任务"""
if request.method == 'POST':
data = json.loads(request.body)
image_url = data.get('image_url')

task = process_image.delay(image_url)

return JsonResponse({
'task_id': task.id,
'status': 'pending'
})

return JsonResponse({'error': 'Method not allowed'}, status=405)


@csrf_exempt
def get_task_status(request, task_id):
"""获取任务状态"""
from celery.result import AsyncResult

task_result = AsyncResult(task_id)

response_data = {
'task_id': task_id,
'status': task_result.status,
}

if task_result.ready():
if task_result.successful():
response_data['result'] = task_result.result
else:
response_data['error'] = str(task_result.info)

return JsonResponse(response_data)

🚀 启动服务

1. 启动 Redis(如果未运行)

1
2
3
4
5
6
7
8
# 使用 Docker
docker run -d \
--name redis \
-p 6379:6379 \
redis:7-alpine

# 或使用本地 Redis
redis-server

2. 运行数据库迁移

1
python manage.py migrate

3. 启动 Celery Worker

1
2
3
4
5
6
7
8
9
10
11
# 基本启动
celery -A demo worker -l info

# 指定队列
celery -A demo worker -l info -Q email,image,celery

# 指定并发数
celery -A demo worker -l info --concurrency=4

# 后台运行(使用 supervisor 或 systemd)
celery -A demo worker -l info --detach

4. 启动 Celery Beat(定时任务)

1
2
3
4
celery -A demo beat -l info

# 使用数据库调度器(推荐)
celery -A demo beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler

5. 启动 Django 开发服务器

1
python manage.py runserver 0.0.0.0:8000

6. 启动 Flower(监控,可选)

1
celery -A demo flower --port=5555

访问 http://localhost:5555 查看 Celery 监控界面。

🧪 测试任务

使用 Django Shell

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from demo_app.tasks import add, send_email

# 同步调用(不推荐,会阻塞)
result = add(2, 3)
print(result)

# 异步调用
task = add.delay(2, 3)
print(f'Task ID: {task.id}')

# 获取结果
result = task.get(timeout=10)
print(f'Result: {result}')

# 发送邮件任务
email_task = send_email.delay(
'user@example.com',
'Test Subject',
'Test Body'
)
print(f'Email Task ID: {email_task.id}')

使用 HTTP API

1
2
3
4
5
6
7
# 触发加法任务
curl -X POST http://localhost:8000/api/tasks/add \
-H "Content-Type: application/json" \
-d '{"x": 5, "y": 3}'

# 检查任务状态
curl http://localhost:8000/api/tasks/status/<task_id>

🔒 生产环境建议

  1. 使用 Supervisor 或 systemd:管理 Celery Worker 和 Beat 进程
  2. 监控和告警:使用 Flower 或 Prometheus 监控任务执行
  3. 错误处理:配置任务重试和错误通知
  4. 资源限制:设置任务超时和内存限制
  5. 队列分离:为不同类型的任务使用不同的队列
  6. 结果存储:使用数据库或 Redis 存储任务结果
  7. 日志记录:配置结构化日志和日志聚合

📚 参考资源

0%