有机化学、化学信息学、生物化学、生物信息学、机器学习、深度学习、药物设计、网站建设关注我!Bilibili
通过建立websocket,来监控任务的完成状态
流程介绍:
1、前端提交任务请求
2、后端接收到请求以后,使用celery进行任务排队和状态变换(依赖于配置内的进程数量配置),同时把任务ID返回给前端
3、前端拿着任务ID,和后端建立websocket通信
4、当任务状态变化的时候,通知前端(比如从-排队中-转变为-已开始-)
5、结束通信(前端退出页面主动结束、后端任务状态为success的时候主动结束)
celery的任务ID可以查询当前的任务状态
关于celery和redis的配置请看:
settings.py内增加channel的配置:
# 指定 ASGI 应用  ASGI 来处理 WebSocket、SSE、异步后台任务通知等
ASGI_APPLICATION = 'aiochemserver.asgi.application'
CHANNEL_LAYERS = {
    'default': {
        'BACKEND': 'channels_redis.core.RedisChannelLayer',
        'CONFIG': {
            'hosts': [('127.0.0.1', 6379)],
        },
    },
}
INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'appchemnote',
    'appuser',
    # ——— 把channels写入进来 ———
    'channels',
    'django.contrib.postgres',
]settings同级目录建立routing.py
# websocket的路由
from django.urls import path
from aiochemserver.consumers import TaskProgressConsumer
# 通用任务查询接口
websocket_urlpatterns = [
    path('ws/task_progress/', TaskProgressConsumer.as_asgi()),
]settings同级目录建立consumers.py
# aiochemserver/consumers.py
from channels.generic.websocket import AsyncJsonWebsocketConsumer
from asgiref.sync import sync_to_async
from celery.result import AsyncResult
# 异步安全地从 Celery 后端读取 state 和 info.progress
@sync_to_async
def fetch_task_meta(task_id: str):
    result = AsyncResult(task_id)
    state = result.state
    info = result.info or {}
    progress = info.get("progress", 0) if isinstance(info, dict) else 0
    return state, progress
class TaskProgressConsumer(AsyncJsonWebsocketConsumer):
    async def connect(self):
        # 接受连接
        await self.accept()
    async def disconnect(self, close_code):
        # 断开时退出所有 group
        if hasattr(self, "task_ids"):
            for tid in self.task_ids:
                await self.channel_layer.group_discard(f"task_{tid}", self.channel_name)
    async def receive_json(self, content, **kwargs):
        """
        content: { "task_ids": [...] }
        """
        self.task_ids = content.get("task_ids", [])
        # 先推送一次当前所有任务的“初始状态”
        for task_id in self.task_ids:
            # 加入对应组,方便后续 group_send
            await self.channel_layer.group_add(f"task_{task_id}", self.channel_name)
            # 拉取当前状态
            state, progress = await fetch_task_meta(task_id)
            # 直接发给当前 WebSocket 客户端
            await self.send_json({
                "task_id": task_id,
                "status": state,
                "progress": progress,
            })
    async def task_update(self, event):
        """
        当后端做了
          await channel_layer.group_send(
            f"task_{task_id}",
            {
              "type": "task_update",
              "task_id": task_id,
              "status": new_state,
              "progress": new_progress,
            }
          )
        时,就会执行这里,推送给前端
        """
        await self.send_json({
            "task_id": event["task_id"],
            "status": event["status"],
            "progress": event.get("progress", 0),
        })在这两个文件里面转路由,由于channels属于asgi,所以以后启动服务都需要使用支持asgi的服务
按照下面的写法,普通的http请求就走普通服务,channel服务就会走websocket
import os, django
from django.core.asgi import get_asgi_application
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
import aiochemserver.routing  # 确保这个模块里有 websocket_urlpatterns
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'aiochemserver.settings')
django.setup()
application = ProtocolTypeRouter({
    "http": get_asgi_application(),
    "websocket": AuthMiddlewareStack(
        URLRouter(aiochemserver.routing.websocket_urlpatterns)
    ),
})wsgi.py内还正常写就行:
普通的url服务,依旧走wsgi
import os
from django.core.wsgi import get_wsgi_application
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'aiochemserver.settings')
application = get_wsgi_application()
在utils下新建一个task_notify.py
用来复用状态查询的逻辑:
from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer
def notify_task_state(task_id, status, progress=None):
    """
    统一封装任务状态通知函数,支持进度信息推送给 WebSocket 客户端。
    """
    channel_layer = get_channel_layer()
    group_name = f"task_{task_id}"
    message = {
        "type": "task_update",
        "task_id": task_id,
        "status": status,
    }
    if progress is not None:
        message["progress"] = progress
    async_to_sync(channel_layer.group_send)(group_name, message)
这里写一个测试,在tasks.py中的调用函数内,
需要设置bind=True
from utils.task_notify import notify_task_state
@shared_task(bind=True, soft_time_limit=600)  # 10分钟超时
def save_note(self, data, note_id, type):
    task_id = self.request.id
    # 任务开始
    self.update_state(state="STARTED", meta={"progress": 0})
    notify_task_state(task_id, 'STARTED', progress=0)self.update_state这个是通知本身的进程和数据库,写入celery的任务状态
notify_task_state是通知前端任务现在的状态,注意区分
关于view的解释:
对于一个异步任务,上面已经介绍过,是前端发起任务请求,后端再执行
from appchemnote.tasks import save_note
class ReceiveNote(APIView):
    def post(self, request, *args, **kwargs):
        async_res = save_note.delay(request_data, note.note_id, 'create')
        return JsonResponse({
                'message': '笔记存储任务提交成功',
                'result': {
                    'rd_task_id': async_res.id
                }
            }, status=status.HTTP_200_OK)
解释一下整个流程:
1、在一个任务被前端提交进来的时候,后端会把任务调度交给celery,并且立即给前端一个task_id。
2、这时候,这个任务在后端还没有完成,所以在有需求的时候,前端就通过task_id来查询任务状态,当这个任务不是success的时候,拿这个task_id和Django建立websocket通信。
任务在后端的进程progress,完全依赖于在tasks的函数内,我们自己定义的任务进度和progress(所以进度条都是后端控制的,不会特别精准)
3、当ws通信建立后,每次后端的状态更新的时候,Django都会通过websocket告诉前端,我的任务状态更新了,这是新的状态数据。
4、前端每次拿到新的数据后,就会把状态展示在页面上,从而实现任务状态实时更新,用户可以感知到的这个需求。
没有推荐的文章...
没有对应的文章...