celery_elery -a celery_worker.worker.job beat -l info-程序员宅基地

技术标签: python  数据库  redis  

一 介绍

官网:https://docs.celeryq.dev/en/latest/index.html

celery是一个简单、灵活、可靠的分布式系统,用于 处理大量消息,同时为操作提供 维护此类系统所需的工具。

Celery架构

Celery的架构由三部分组成,消息中间件(message broker)、任务执行单元(worker)和 任务执行结果存储(task result store)组成。

在这里插入图片描述

消息中间件

Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等。

任务执行单元

Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

任务结果存储

Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等。

使用场景

异步执行:解决耗时任务,将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等。

延迟执行:解决延迟任务。

定时执行:解决周期(周期)任务,比如每天数据统计。

二 安装使用

安装

pip install celery
pip install eventlet

使用

tasks.py

from celery import Celery

# 任务提交保存的地方
broker = 'redis://127.0.0.1:6379/0'

# 任务执行完结果保存的地方
backend = 'redis://127.0.0.1:6379/1'

app = Celery(main=__name__, broker=broker, backend=backend)

# 创建任务
@app.task
def add(x, y):
    return x + y

提交任务

submit_task.py

from tasks import add

# 使用delay方法
res = add.delay(10, 10)

# 返回值是celery.result.AsyncResult类的对象,可以根据这个对象查看执行结果等。

# 也可以通过返回值直接查看任务的状态
print(res)  # 3dcef2f9-d266-4d70-8aab-73073ba9e691

# 这才是真正的id号
print(res.task_id)

执行后,会将任务保存到broker对应的redis缓存库中。
在这里插入图片描述

启动celery工作服务器

在工作路径下终端输入命令

celery -A tasks worker -l info -P eventlet 
或
celery -A tasks worker --loglevel=INFO -P eventlet

backend中查看任务执行结果

check_result.py

from tasks import app

from celery.result import AsyncResult

task_id = '3dcef2f9-d266-4d70-8aab-73073ba9e691'

if __name__ == '__main__':
    res = AsyncResult(id=task_id, app=app)
    if res.successful():
        result = res.get()
        print(result)
    # 等同上面代码
    # if res.state == 'SUCCESS':
    #     result = res.get()
    #     print(result)
    elif res.failed():
        print('任务失败')
    # elif res.state == 'FAILURE':
    #     print('任务失败')
    elif res.status == 'PENDING':
        print('任务等待中被执行')
    elif res.status == 'RETRY':
        print('任务异常后正在重试')
    elif res.status == 'STARTED':
        print('任务已经开始被执行')

AsyncResult下的方法

def failed(self):
    """Return :const:`True` if the task failed."""
    return self.state == states.FAILURE

def successful(self):
    """Return :const:`True` if the task executed successfully."""
    return self.state == states.SUCCESS

三 自定义celery包

新建包:celery_tasks。

在包先新建一个 celery.py,初始化app。

from celery import Celery

broker = 'redis://127.0.0.1:6379/0'
# backend='redis://:[email protected]:6379/1' 加密码
backend = 'redis://127.0.0.1:6379/1'
app = Celery(main=__name__, broker=broker, backend=backend,
             include=['celery_tasks.home_tasks', 'celery_tasks.user_tasks'])

在包里新建user_tasks.py 编写用户相关任务 。

# 用户相关任务
from .celery import app

在包里新建home_task.py 编写首页相关任务 。

# 首页相关任务
from .celery import app

其它程序,提交任务。

启动worker

celery -A celery_tasks worker -l info -P eventlet

四 celery异步任务,延迟任务,定时任务

异步任务

task.delay(*args, **kwargs)

延迟任务

task.apply_async(args=[参数,参数],eta=时间对象(utc时间))
from datetime import timedelta, datetime

res = add.apply_async(args=(1, 2), eta=(datetime.utcnow() + timedelta(seconds=20)))

print(res.task_id)  # c78505e2-614d-4bb2-930c-c73c325af519

在这里插入图片描述

定时任务

app的配置文件中配置

app.conf.beat_schedule = {
    
    'add': {
    
        'task': 'celery_tasks.home_tasks.add',
        'schedule': timedelta(seconds=5),  # 每隔五秒提交任务
        # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
        'args': ('100', '200'),
    },
}

启动worker

celery -A celery_tasks worker -l info -P eventlet

启动beat(真正干活的人)

celery -A celery_tasks beat -l info

在这里插入图片描述

五 django中使用celery

将包复制到django项目路径下

在包内的celery.py中添加代码

import os
from celery import Celery
from datetime import timedelta
from celery.schedules import crontab

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffy.settings.dev')
import django
django.setup()

broker = 'redis://127.0.0.1:6379/0'
backend = 'redis://127.0.0.1:6379/1'
app = Celery(main=__name__, broker=broker, backend=backend,
             include=['celery_tasks.home_tasks', 'celery_tasks.user_tasks'])

在任务中就可以用到django的ORM等。
在django的视图类中,导入任务,提交任务。
启动worker,beat。

六 秒杀接口

新建秒杀商品表

class Shop(models.Model):
    name = models.CharField(max_length=32)
    # 秒杀商品数量不能为负
    shop_num = models.PositiveIntegerField()

user_tasks.py

# 用户相关任务
from .celery import app


# 秒杀任务
@app.task
def seckill_task():
    from user.models import Shop
    try:
        from django.db.models import F
        import time
        Shop.objects.filter(name='秒杀商品').update(shop_num=F('shop_num') - 1)
        time.sleep(10)
        return True
    # 出错解释商品库存不足 不能秒杀
    except:
        return False

views.py

# 提交秒杀
 @action(methods=['GET'], detail=False, url_path='submit_seckill')
 def submit_seckill(self, request):
     from celery_tasks.user_tasks import seckill_task
     res = seckill_task.delay()
     return APIResponse(task_id=res.task_id)

 # 查看秒杀结果
 @action(methods=['GET'], detail=False, url_path='check_seckill')
 def check_seckill(self, request):
     from celery.result import AsyncResult
     from celery_tasks.celery import app
     task_id = request.query_params.get('task_id')
     res = AsyncResult(id=task_id, app=app)
     if res.successful():
         is_true = res.get()
         if is_true:
             return APIResponse(code=100, msg='秒杀成功')
         return APIResponse(code=101, msg='手慢了没秒到')
     elif res.status == 'PENDING':
         return APIResponse(code=102, msg='任务等待中被执行')
     elif res.status == 'RETRY':
         return APIResponse(code=103, msg='任务异常后正在重试')
     elif res.status == 'STARTED':
         return APIResponse(code=104, msg='任务已经开始被执行')

前端

<template>
    <div>
        <img src="https://img1.baidu.com/it/u=3467439571,3022033088&fm=253&app=138&size=w931&n=0&f=JPEG&fmt=auto?sec=1668704400&t=5ff8a17feab5b05d5e27c41ad2776bc9" alt="" width="300px" height="300px">
        <br>
        <el-button type="danger" round @click.once="submit">秒杀按钮</el-button>
    </div>
</template>

<script>
export default {
    
    name: 'Seckill',
    methods: {
    
        submit() {
    
            this.$axios.get(this.$settings.BASE_URL + 'user/submit_seckill/').then(res => {
    
                console.log(res)
                if (res.data.code === 100) {
    
                    let task_id = res.data.task_id
                    console.log(task_id)
                    let t = setInterval(() => {
    
                        this.$axios.get(this.$settings.BASE_URL + 'user/check_seckill/?task_id=' + task_id).then(re => {
    
                            console.log(re)
                            if (re.data.code === 100 || re.data.code === 101) {
    
                                this.$message({
    
                                    message: re.data.msg,
                                    type: 'success'
                                });
                                alert(re.data.msg)
                                clearInterval(t)
                            }
                        })
                    }, 1000)
                }
            })
        }
    }

}
</script>

在这里插入图片描述

七 双写一致性

7.1 轮播图接口加缓存

提交了接口的响应速度
提高并发量

class SlideShowView(GenericViewSet, ListMixinView):
    queryset = SlideShow.objects.all().filter(is_delete=False, is_show=True).order_by('orders')[
               :settings.SLIDE_SHOW_COUNT]
    serializer_class = SlideShowSer

    def list(self, request, *args, **kwargs):
        result = cache.get('banner_list')
        if result:
            print('走了缓存')
            return APIResponse(code=1001, result=result)
        res = super().list(request, *args, **kwargs)
        result = res.data.get('result')
        cache.set('banner_list', result)
        print('走了数据库')
        return res

7.2 celery定时任务实现双写一致性

加了缓存,如果mysql数据变了,由于请求的都是缓存的数据,导致mysql和redis的数据不一致。

双写一致性问题:

  1. 修改mysql数据库,删除缓存 【缓存的修改是在后】
  2. 修改数据库,修改缓存 【缓存的修改是在后】
  3. 定时更新缓存,针对于实时性不是很高的接口适合定时更新。

home_tasks.py

# 首页相关任务
import time

from .celery import app
from home.models import SlideShow
from django.conf import settings
from home.serializer import SlideShowSer
from django.core.cache import cache


@app.task
def update_banner():
    # 更新缓存
    queryset = SlideShow.objects.all().filter(is_delete=False, is_show=True).order_by('orders')[:settings.SLIDE_SHOW_COUNT]
    ser = SlideShowSer(instance=queryset, many=True)
    # print(ser.data)
    for item in ser.data:
        item['image'] = settings.HOST_URL + item['image']
    cache.set('banner_list', ser.data)
    return True

celery.py

import os
from celery import Celery
from datetime import timedelta
from celery.schedules import crontab

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffy.settings.dev')
import django

django.setup()

broker = 'redis://127.0.0.1:6379/0'
backend = 'redis://127.0.0.1:6379/1'
app = Celery(main=__name__, broker=broker, backend=backend,
             include=['celery_tasks.home_tasks', 'celery_tasks.user_tasks'])

app.conf.beat_schedule = {
    
    # 定时任务
    'update_banner': {
    
        'task': 'celery_tasks.home_tasks.update_banner',
        'schedule': timedelta(minutes=30),
        # 'schedule': crontab(hour=8, day_of_week=1),
        'args': (),
    },
}

启动django,worker,beat。

每隔30分钟查询数据库中的轮播图,放进缓存中,请求来之后,缓存中有先从缓存中拿,没有才去数据库拿。

mysql数据修改后,前端拿到的数据可能不一致,但是最多30分钟缓存中的数据就会更新。

八 @task与@shared_task的区别

当我们使用@app.task装饰器定义我们的异步任务时,那么这个任务依赖于Celery产生的实例app。

然而我们在进行Django开发时为了保证每个app的可重用性,我们经常会在每个app文件夹下编写异步任务,这些任务并不依赖于具体的Django项目名。使用@shared_task 装饰器能让我们避免对某个项目名对应Celery实例的依赖,使app的可移植性更强。

from celery import shared_task
 
 
@shared_task
def add(x, y):
    return x + y

九 APScheduler

如果只想做定时任务,Celery依赖的软件比较多,比较耗资源。最好的解决方案就是 APScheduler。

APScheduler使用起来十分方便。提供了基于日期、固定时间间隔以及 crontab类型的任务。还可以在程序运行过程中动态的新增任务和删除任务。在任务运行过程中,还可以把任务存储起来,下次启动运行依然保留之前的状态。另外最重要的一个特点是,因为他是基于 Python语言的库,所以是可以跨平台的,一段代码,处处运行!

安装

pip install apscheduler

简单使用

from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime


def task():
    print(f'现在时间:{
      datetime.now()}')


if __name__ == '__main__':
    scheduler = BlockingScheduler()
    scheduler.add_job(task, 'interval', seconds=3)
    scheduler.start()

每隔三秒执行一次。
在这里插入图片描述

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/weixin_68531269/article/details/127888583

智能推荐

免安装版jdk的配置及使用(附绿色版jdk1.7及jdk1.8)_jdk1.8.0_151 64位免安装-程序员宅基地

文章浏览阅读8.8w次,点赞21次,收藏98次。绿色版jdk1.7下载地址:https://pan.baidu.com/s/12E3WGsIJBCUCJTTeC41m_g 密码:no4v绿色版jdk1.8下载地址:https://pan.baidu.com/s/1GnGmmnBuw9SG_USrYxNsNg 密码:rgok将下载好的绿色版jdk解压,然后通过配置系统环境变量指向该文件即可系统环境变量配置过程如下:右击我的电脑(计算机)——属性—..._jdk1.8.0_151 64位免安装

教程 | 使用WeBASE开发智能合约Java应用-程序员宅基地

文章浏览阅读3.3k次,点赞5次,收藏30次。区块链中间件平台WeBASE功能丰富、操作友好,备受社区关注。WeBASE也不断迭代优化,致力于为社区带来更好的开发体验。在 WeBASE 的合约IDE中,“Java项目导出”功能除了提供..._fisco webasefront 导出java项目使用

MATLAB2019b安装详细教程_matlab2019要求配置-程序员宅基地

文章浏览阅读2.9w次,点赞46次,收藏137次。MATLAB2019b安装详细教程 W10 64位Matlab是由美国MathWorks公司出品的著名数学软件,软件的功能性十分强大。其中,MATLAB/Simulink也被某些推文称之万物皆可仿的工具。近年来,自动代码生成在人工智能、深度学习和汽车行业的快速发展,Matlab也变得热度很高,在这里将详细介绍一下Matlab的详细安装步骤和注意点,仅供大家学习交流使用。安装要求和注意点1.安装全程须断网,安装前先关闭360等所有杀毒软件,防止误杀补丁,否则可能安装不成功;2.Matlab2019b适_matlab2019要求配置

CrossOver 22 for Mac/Linux版 v22.1.1中文激活版_crossover22linux下载-程序员宅基地

文章浏览阅读3.2k次,点赞8次,收藏10次。使用 CROSSOVER,在 MAC 运行 WINDOWS 软件是如此容易 CrossOver 可以在 Mac/Linux 上运行成千上万的 Windows 程序。从办公软件、实用工具、游戏到设计软件,您只需在 Mac 的 dock 轻按一下便可运行。您可以 Windows 程序和 Mac 程序之间随意切换,而这一切无需重启、无需虚拟机,也无需购买 Windows 授权。Windows 软件就像 Mac 软件一样运行着,实现跨平台的复制粘贴、文件互通、快捷键和窗口管理。_crossover22linux下载

jdk1.8下载与安装教程(2023最新版)-程序员宅基地

文章浏览阅读4.2w次,点赞36次,收藏120次。JDK是 Java 语言的软件开发工具包,主要用于移动设备、嵌入式设备上的java应用程序。jdk1.8又称jdk8.0,是目前相对比较稳定的版本,不建议下载最新的jdk版本,因为最新版的jdk不稳定,在Java的学习中可能会出现各种各样的问题。_jdk1.8下载

Work Scheduling URAL - 1099 一般图的最大匹配(带花树)_work scheduling二分图-程序员宅基地

文章浏览阅读390次。做了很多二分图了,但是给出的都是没有环的,如果有了环,就不能转化为二分图。所以这就需要一个全新的算法,带花树算法;反正也看不懂,直接套板子把/* ***********************************************Author :kuangbinCreated Time :2013/8/21 22:56:05File Name :_work scheduling二分图

随便推点

环境光重要性采样_重要性采样 球面采样-程序员宅基地

文章浏览阅读758次。重要性采样概述计算积分需要利用蒙特卡洛方法去近似,蒙特卡洛方法我这里就不讲了,有兴趣的可以看看我的知乎:蒙特卡洛方法的简单总结 - 知乎重要性采样的目的就是加快收敛速度,所以选择pdf比较重要。所以对于环境光采样,我们需要知道环境光每个方向上的概率密度。环境光贴图环境光贴图能用整个球的所有方向去采样,球面方向能转成极坐标θ[0, π]和φ[0, 2π]表示,所以我们用经纬图(long_latitude_map)来做环境光贴图。维度可以用θ,经度用φ,例如地球仪的贴图展开:采样_重要性采样 球面采样

Kettle连接Oracle(Oracle19c&Oracle11g)-程序员宅基地

文章浏览阅读5.2k次,点赞4次,收藏8次。Kettle连接Oracle19c&Oracle11g_kettle连接oracle

WebDav-Milton之一_java webdav milton-程序员宅基地

文章浏览阅读1.6k次。这几篇主要介绍下Milton,一个用java实现了WebDav协议的开源软件,并且可以集成Spring(貌似是唯一一个实现WebDav协议支持Spring) 原文地址:http://milton.io/programs/milton/anno/anno1/pgetting-started-with-annotations.html (原文地址时不时的访问不了) 我的例子是在官网例子上修改的(主_java webdav milton

SpringBoot接收参数的三种方式,SpringBoot访问静态资源。_springboot接收路参-程序员宅基地

文章浏览阅读988次。接收:1、在路由处写好要参数2、参数列表用注解,如果路径上写的参数名称与函数接收的名称不一致可以使用value参数来与路径上的保持一致。_springboot接收路参

vue.config.js 完整配置_vueconfig.js配置文件完整-程序员宅基地

文章浏览阅读936次,点赞2次,收藏5次。vue-cli 3.x 及以上const path = require("path");const resolve = dir => path.join(__dirname, dir);//用于生产环境去除多余的cssconst PurgecssPlugin = require("purgecss-webpack-plugin");//全局文件路径const glob = require("glob-all");//压缩代码并去掉consoleconst UglifyJsPlugin _vueconfig.js配置文件完整

MyBatis-Plus——MyBatis-Plus概述与集成_mybatisplus gitee-程序员宅基地

文章浏览阅读2.2k次,点赞3次,收藏6次。MyBatis-Plus——MyBatis-Plus概述与集成1、MyBatis-Plus概述MyBatis-Plus 是什么MyBatis-Plus(简称MP)是一个 MyBatis的增强工具,在 MyBatis的基础上只做增强( 提供了一些高效、有用、开箱即用的功能)不做改变,为简化开发、提高效率而生。那么它是怎么增强的呢?其实就是它已经封装好了一些crud方法,我们不需要再写xml了,直接调用这些方法就行,就类似于JPA和 tk-mapper。特征与 MyBatis 完全兼容_mybatisplus gitee

推荐文章

热门文章

相关标签