Python 全栈系列253 再梳理flask-celery的搭建

06-22 1072阅读

说明

最近做了几个实验,将结论梳理一下,方便以后翻看。

  • 1 flask-celery 主要用于数据流的同步任务,其执行由flask-aps发起,基于IO并发的方法,达到资源的高效利用,满足业务上的需求。
  • 2 目前部署环境有算网机和anygpu

    内容

    1 环境

    现在看起来,将flask-celery部署在宿主机上是可行的,一般来说我会先安装miniconda3,有些容器本身就已经是anaconda3的就不动了。

    apt的升级是可选的

    sudo apt update
    sudo apt upgrade -y
    wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh
    bash https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh
    ~/miniconda3/bin/conda init
    source ~/.bashrc
    conda 24.4.0
    which conda
    /root/miniconda3/bin/conda
    

    然后,一般就在base环境下安装包,如果本身已经是容器环境(租用机有些不允许再安装软件),就直接安装相关python3包

    pip3 install ipython -i https://mirrors.aliyun.com/pypi/simple/
    pip3 install requests -i https://mirrors.aliyun.com/pypi/simple/
    wget -NO Basefuncs-1.10-py3-none-any.whl http://YOURS/downup/view008_download_from_folder/pys.Basefuncs-1.10-py3-none-any.whl
    pip3 install Basefuncs-1.10-py3-none-any.whl
    pip3 install clickhouse_driver -i https://mirrors.aliyun.com/pypi/simple/
    pip3 install pandas -i https://mirrors.aliyun.com/pypi/simple/
    pip3 install numpy -i https://mirrors.aliyun.com/pypi/simple/
    pip3 install redis -i https://mirrors.aliyun.com/pypi/simple/
    pip3 install pydantic -i https://mirrors.aliyun.com/pypi/simple/
    pip3 install nest_asyncio -i https://mirrors.aliyun.com/pypi/simple/
    pip3 install aiohttp -i https://mirrors.aliyun.com/pypi/simple/
    pip3 install Flask -i https://mirrors.aliyun.com/pypi/simple/
    pip3 install Flask-APScheduler -i https://mirrors.aliyun.com/pypi/simple/
    pip3 install celery -i https://mirrors.aliyun.com/pypi/simple/
    pip3 install gunicorn -i https://mirrors.aliyun.com/pypi/simple/
    pip3 install mongoengine -i https://mirrors.aliyun.com/pypi/simple/
    pip3 install apscheduler -i https://mirrors.aliyun.com/pypi/simple/
    pip3 install tornado -i https://mirrors.aliyun.com/pypi/simple/
    pip3 install Pillow -i https://mirrors.aliyun.com/pypi/simple/
    pip3 install markdown -i https://mirrors.aliyun.com/pypi/simple/
    pip3 install pymysql -i https://mirrors.aliyun.com/pypi/simple/
    pip3 install gevent -i https://mirrors.aliyun.com/pypi/simple/
    pip3 install akshare -i https://mirrors.aliyun.com/pypi/simple/
    

    接下来需要在指定位置创建文件

    • 1 如果是在算网机,那么直接拉取项目文件,一般放在 /opt/project_notes/flask_celery 下面
    • 2 如果是在租用机,一般建立在/opt/flask_celery 下面,采用vim + 拷贝的方式
      systemd

      一般在宿主机级别可以有这个服务的控制,但是很多算力机也是没有的,只租用容器(怎么有种卖艺不卖身的感觉 - - !)。

      如果没有systemd,那么就需要切到对应目录下,直接用后台启动的方式

      直接用命令启动的方式,切换到对应目录下

      cd  /opt/project_notes/flask_celery 
      或
      cd /opt/flask_celery
      vim server_single_v2.py
      # 如果采用了非标的redis配置,需要到程序里面修改一下地址
      celery_broker = 'redis://:YOURS@127.0.0.1:24008/1'
      # 前台启动命令
      gunicorn server_single_v2:app -b 0.0.0.0:24104
      celery -A server_single_v2.celery_ worker
      

      写的时候刚打开,一堆积压的任务立即扑面而来。

      Python 全栈系列253 再梳理flask-celery的搭建

      我突然意识到celery的异步可能是线程级,而不是协程级的。(因为worker编号看起来很像我的cpu核数编号)

      Python 全栈系列253 再梳理flask-celery的搭建

      算了,先放过自己 。用一段时间看看,之后可以自己创建一个协程级的服务来进行流转和调度。我主要的应用应该就是在于异步请求API。我想以后有一块很大的改造就是将取数/查询部分的服务都使用异步进行优化。可以参考tornado同步转异步几种方式,先从搭建异步tornado开始。

      编辑启动脚本vim ~/start_flask_celery.sh。理论上需要source才能activate,虽然即使不activate也可以运行,但我猜主要是因为在服务里面写了conda环境,所以默认进入了base。

      #!/bin/bash
      # 激活 base 环境(或你创建的特定环境)
      source /root/miniconda3/etc/profile.d/conda.sh
      conda activate base
      #!/bin/bash
      #anaconda环境
      # 运行 Python 服务脚本
      # 算网机
      cd /opt/project_notes/flask_celery 
      # 租用机
      # cd /opt/flask_celery
      nohup gunicorn server_single_v2:app -b 0.0.0.0:24104 >/dev/null 2>&1 &
      nohup celery -A server_single_v2.celery_ worker >/dev/null 2>&1 &
      

      如果没有systemd,那么就开机的时候手动执行一下脚本,一般还是尽量注册为systemd服务,注意一下两种不同环境的配置。

      先修改脚本执行权限 chmod +x ~/start_flask_celery.sh ,vim /lib/systemd/system/flask_celery.service

      [Unit]
      Description=flask_celery_service
      After=network.target network-online.target syslog.target
      Wants=network.target network-online.target
      [Service]
      #启动服务的命令
      Type=forking
      ExecStart=/root/start_flask_celery.sh
      Restart=always
      RestartSec=5
      # miniconda
      Environment="PATH=/root/miniconda3/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"
      # anaconda
      #Environment="PATH=/root/anaconda3/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"
      [Install]
      WantedBy=multi-user.target
      

      然后配置重载、启动和自启动

      systemctl daemon-reload
      systemctl start flask_celery
      systemctl enable flask_celery
      systemctl status flask_celery
      ┌─root@m7:~
      └─ $ systemctl status  flask_celery
      ● flask_celery.service - flask_celery_service
         Loaded: loaded (/lib/systemd/system/flask_celery.service; enabled; vendor preset: enabled)
         Active: active (running) since Sat 2024-06-15 13:03:51 CST; 1s ago
        Process: 428 ExecStart=/root/start_flask_celery.sh (code=exited, status=0/SUCCESS)
          Tasks: 38 (limit: 4915)
         CGroup: /system.slice/flask_celery.service
                 ├─475 /root/miniconda3/bin/python /root/miniconda3/bin/gunicorn server_single_v2:app -b 0.0.0.0:24104
                 ├─476 /root/miniconda3/bin/python /root/miniconda3/bin/celery -A server_single_v2.celery_ worker
                 ├─556 /root/miniconda3/bin/python /root/miniconda3/bin/gunicorn server_single_v2:app -b 0.0.0.0:24104
                 ├─864 /root/miniconda3/bin/python /root/miniconda3/bin/celery -A server_single_v2.celery_ worker
                 ├─924 /root/miniconda3/bin/python /root/miniconda3/bin/celery -A server_single_v2.celery_ worker
                 ├─949 /root/miniconda3/bin/python /root/miniconda3/bin/celery -A server_single_v2.celery_ worker
                 └─975 /root/miniconda3/bin/python /root/miniconda3/bin/celery -A server_single_v2.celery_ worker
      6月 15 13:03:51 m7 systemd[1]: Starting flask_celery_service...
      6月 15 13:03:51 m7 systemd[1]: Started flask_celery_service.
      

      测试

      import requests as req 
      para_dict = {'arg1':111,
                   'arg2':222}
      resp = req.post('http://127.0.0.1:24104/sum_post/',json = para_dict )
      print('每个任务耗时10秒')
      import time 
      tick1 = time.time()
      print(req.get('http://127.0.0.1:24104/get_result/%s' % resp.text).text)
      tick2 = time.time()
      print('actually takes %.2f' % (tick2 - tick1 ))
      每个任务耗时10秒
      333
      actually takes 10.02
      
VPS购买请点击我

文章版权声明:除非注明,否则均为主机测评原创文章,转载或复制请以超链接形式并注明出处。

目录[+]