Python FastAPI+Celery+RabbitMQ实现分布式图片水印处理系统

本文主要是介绍Python FastAPI+Celery+RabbitMQ实现分布式图片水印处理系统,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

《PythonFastAPI+Celery+RabbitMQ实现分布式图片水印处理系统》这篇文章主要为大家详细介绍了PythonFastAPI如何结合Celery以及RabbitMQ实现简单的分布式...

实现思路

  • FastAPI 服务器
  • Celery 任务队列
  • RabbitMQ 作为消息代理
  • 定时任务处理

完整步骤

首先创建项目结构:

c:\Users\Administrator\Desktop\meitu\
├── app/
│   ├── __init__.py
│   ├── main.py
│   ├── celery_app.py
│   ├── tasks.py
│   └── config.py
├── requirements.txt
└── celery_worker.py

1.首先创建 requirements.txt:

fastapi==0.104.1
uvicorn==0.24.0
celery==5.3.4
python-dotenv==1.0.0
requests=编程=2.31.0

2.创建配置文件:

from dotenv import load_dotenv
import os

load_dotenv()

# RabbitMQ配置
RABBITMQ_HOST = os.getenv("RABBITMQ_HOST", "localhost")
RABBITMQ_PORT = os.getenv("RABBITMQ_PORT", "5672")
RABBITMQ_USER = os.getenv("RABBITMQ_USER", "guest")
RABBITMQ_PASS = os.getenv("RABBITMQ_PASS", "guest")

# Celery配置
CELERY_BROKER_URL = f"amqp://{RABBITMQ_USER}:{RABBITMQ_PASS}@{RABBITMQ_HOST}:{RABBITMQ_PORT}//"
CELERY_RESULT_BACKEND = "rpc://"

# 定时任务配置
CELERY_BEAT_SCHEDULE = {
    'process-images-every-hour': {
        'task': 'app.tasks.process_images',
        'schedule': 3600.0,  # 每小时执行一次
    },
    'daily-cleanup': {
        'task': 'app.tasks.cleanup_old_images',
        'schedule': 86400.0,  # 每天执行一次
    }
}

3.创建 Celery 应用

from celery import Celery
from app.configphp import CELERY_BROKER_URL, CELERY_RESULT_BACKEND, CELERY_BEAT_SCHEDULE

celery_app = Celery(
    'image_processing',
    broker=CELERY_BROKER_URL,
    backend=CELERY_RESULT_BACKEND,
    include=['app.tasks']
)

# 配置定时任务
celery_app.conf.beat_schedule = CELERY_BEAT_SCHEDULE
celery_app.conf.timezone = 'Asia/Shanghai'

4.创建任务文件:

from app.celery_app import celery_app
from app.watermark import ImageWatermarker
import os
from datetime import datetime, timedelta

@celery_app.task
def add_watermark_task(image_path, text, position='center'):
    """异步添加水印任务"""
    watermarker = ImageWatermarker()
    try:
        result_path = watermarker.add_watermark(
            image_path=image_path,
            text=text,
       www.chinasem.cn     position=position
        )
        return {"status": "success", "output_path": result_path}
    except Exception as e:
        return {"status": "error", "message": str(e)}

@celery_app.task
def process_images():
    """定时处理图片任务"""
    image_dir = "images/pending"
    if not os.path.exists(image_dir):
        return {"status": "error", "message": "Pending directory not found"}

    processed = 0
    for image in os.listdir(image_dir):
        if image.lower().endswith(('.png', '.jpg', '.jpeg')):
            add_watermark_task.delay(
                os.path.join(image_dir, image),
                "自动处理水印",
                'center'
            )
            processed += 1

    return {"status": "success", "processed": processed}

@celery_app.task
def cleanup_old_images():
    """清理旧图片任务"""
    output_dir = "images/processed"
    if not os.path.exists(output_dir):
        return {"status": "error", "message": "Output directory not found"}

    threshold_date = datetime.now() - timedelta(days=7)
    cleaned = 0

    for image in os.listdir(output_dir):
        image_path = os.path.join(output_dir, image)
        if os.path.getctime(image_path) < threshold_date.timestamp():
            os.remove(image_path)
            cleaned += 1

    return {"status": "success", "cleaned": cleaned}

5.创建 FastAPI 应用:

from fastapi import FastAPI, File, UploadFile, BackgroundTasks
from fastapi.responses import jsONResponse
import os
from app.tasks import add_watermark_task
from app.celery_app import celery_app

app = FastAPI(title="图片水印处理服务")

@aphppp.post("/upload/")
async def upload_image(
    file: UploadFile = File(...),
    text: str = "水印文本",
    position: str = "center"
):
    # 保存上传的文件
    file_path = f"images/uploads/{file.filename}"
    os.makedirs(os.path.dirname(file_path), exist_ok=True)
    
    with open(file_path, "wb") as buffer:
        content = await file.read()
        buffer.write(content)
    
    # 创建异步任务
    task = add_watermark_task.delay(file_path, text, position)
    
    return JSONResponse({
        "status": "success",
        "message": "图片已上传并加入处理队列",
        "task_id": task.id
    })

@app.get("/task/{task_id}")
async def get_task_status(task_id: str):
    task = celery_app.AsyncResult(task_id)
    if task.ready():
        return {"status": "completed", "result": task.result}
    return {"status": "processing"}

@app.get("/tasks/scheduled")
async def get_scheduledChina编程_tasks():
    return {"tasks": celery_app.conf.beat_schedule}

6.创建 Celery worker 启动文件:

from app.celery_app import celery_app

if __name__ == '__main__':
    celery_app.start()

使用说明

首先安装依赖:

pip install -r requirements.txt

确保 RabbitMQ 服务已启动

启动 FastAPI 服务器:

uvicorn app.main:app --reload

启动 Celery worker:

celery -A celery_worker.celery_app worker --loglevel=info

启动 Celery beat(定时任务):

celery -A celery_worker.celery_app beat --loglevel=info

这个系统提供以下功能:

  • 通过 FastAPI 接口上传图片并异步处理水印
  • 使用 Celery 处理异步任务队列
  • 使用 RabbitMQ 作为消息代理
  • 支持定时任务:
    • 每小时自动处理待处理图片
    • 每天清理一周前的旧图片
  • 支持任务状态查询
  • 支持查看计划任务列表

API 端点:

  • POST /upload/ - 上传图片并创建水印任务
  • GET /task/{task_id} - 查询任务状态
  • GET /tasks/scheduled - 查看计划任务列表

以上就是Python FastAPI+Celery+RabbitMQ实现分布式图片水印处理系统的详细内容,更多关于Python图片水印的资料请关注China编程(www.chinasem.cn)其它相关文章!

这篇关于Python FastAPI+Celery+RabbitMQ实现分布式图片水印处理系统的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1154094

相关文章

SpringBoot实现数据库读写分离的3种方法小结

《SpringBoot实现数据库读写分离的3种方法小结》为了提高系统的读写性能和可用性,读写分离是一种经典的数据库架构模式,在SpringBoot应用中,有多种方式可以实现数据库读写分离,本文将介绍三... 目录一、数据库读写分离概述二、方案一:基于AbstractRoutingDataSource实现动态

Python Websockets库的使用指南

《PythonWebsockets库的使用指南》pythonwebsockets库是一个用于创建WebSocket服务器和客户端的Python库,它提供了一种简单的方式来实现实时通信,支持异步和同步... 目录一、WebSocket 简介二、python 的 websockets 库安装三、完整代码示例1.

揭秘Python Socket网络编程的7种硬核用法

《揭秘PythonSocket网络编程的7种硬核用法》Socket不仅能做聊天室,还能干一大堆硬核操作,这篇文章就带大家看看Python网络编程的7种超实用玩法,感兴趣的小伙伴可以跟随小编一起... 目录1.端口扫描器:探测开放端口2.简易 HTTP 服务器:10 秒搭个网页3.局域网游戏:多人联机对战4.

Java枚举类实现Key-Value映射的多种实现方式

《Java枚举类实现Key-Value映射的多种实现方式》在Java开发中,枚举(Enum)是一种特殊的类,本文将详细介绍Java枚举类实现key-value映射的多种方式,有需要的小伙伴可以根据需要... 目录前言一、基础实现方式1.1 为枚举添加属性和构造方法二、http://www.cppcns.co

使用Python实现快速搭建本地HTTP服务器

《使用Python实现快速搭建本地HTTP服务器》:本文主要介绍如何使用Python快速搭建本地HTTP服务器,轻松实现一键HTTP文件共享,同时结合二维码技术,让访问更简单,感兴趣的小伙伴可以了... 目录1. 概述2. 快速搭建 HTTP 文件共享服务2.1 核心思路2.2 代码实现2.3 代码解读3.

使用C#代码在PDF文档中添加、删除和替换图片

《使用C#代码在PDF文档中添加、删除和替换图片》在当今数字化文档处理场景中,动态操作PDF文档中的图像已成为企业级应用开发的核心需求之一,本文将介绍如何在.NET平台使用C#代码在PDF文档中添加、... 目录引言用C#添加图片到PDF文档用C#删除PDF文档中的图片用C#替换PDF文档中的图片引言在当

详解C#如何提取PDF文档中的图片

《详解C#如何提取PDF文档中的图片》提取图片可以将这些图像资源进行单独保存,方便后续在不同的项目中使用,下面我们就来看看如何使用C#通过代码从PDF文档中提取图片吧... 当 PDF 文件中包含有价值的图片,如艺术画作、设计素材、报告图表等,提取图片可以将这些图像资源进行单独保存,方便后续在不同的项目中使

Linux系统中卸载与安装JDK的详细教程

《Linux系统中卸载与安装JDK的详细教程》本文详细介绍了如何在Linux系统中通过Xshell和Xftp工具连接与传输文件,然后进行JDK的安装与卸载,安装步骤包括连接Linux、传输JDK安装包... 目录1、卸载1.1 linux删除自带的JDK1.2 Linux上卸载自己安装的JDK2、安装2.1

C#使用SQLite进行大数据量高效处理的代码示例

《C#使用SQLite进行大数据量高效处理的代码示例》在软件开发中,高效处理大数据量是一个常见且具有挑战性的任务,SQLite因其零配置、嵌入式、跨平台的特性,成为许多开发者的首选数据库,本文将深入探... 目录前言准备工作数据实体核心技术批量插入:从乌龟到猎豹的蜕变分页查询:加载百万数据异步处理:拒绝界面

MySQL双主搭建+keepalived高可用的实现

《MySQL双主搭建+keepalived高可用的实现》本文主要介绍了MySQL双主搭建+keepalived高可用的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,... 目录一、测试环境准备二、主从搭建1.创建复制用户2.创建复制关系3.开启复制,确认复制是否成功4.同