| | from datetime import timedelta |
| |
|
| | from celery import Celery, Task |
| | from celery.schedules import crontab |
| | from flask import Flask |
| |
|
| | from configs import dify_config |
| |
|
| |
|
| | def init_app(app: Flask) -> Celery: |
| | class FlaskTask(Task): |
| | def __call__(self, *args: object, **kwargs: object) -> object: |
| | with app.app_context(): |
| | return self.run(*args, **kwargs) |
| |
|
| | broker_transport_options = {} |
| |
|
| | if dify_config.CELERY_USE_SENTINEL: |
| | broker_transport_options = { |
| | "master_name": dify_config.CELERY_SENTINEL_MASTER_NAME, |
| | "sentinel_kwargs": { |
| | "socket_timeout": dify_config.CELERY_SENTINEL_SOCKET_TIMEOUT, |
| | }, |
| | } |
| |
|
| | celery_app = Celery( |
| | app.name, |
| | task_cls=FlaskTask, |
| | broker=dify_config.CELERY_BROKER_URL, |
| | backend=dify_config.CELERY_BACKEND, |
| | task_ignore_result=True, |
| | ) |
| |
|
| | |
| | ssl_options = { |
| | "ssl_cert_reqs": None, |
| | "ssl_ca_certs": None, |
| | "ssl_certfile": None, |
| | "ssl_keyfile": None, |
| | } |
| |
|
| | celery_app.conf.update( |
| | result_backend=dify_config.CELERY_RESULT_BACKEND, |
| | broker_transport_options=broker_transport_options, |
| | broker_connection_retry_on_startup=True, |
| | ) |
| |
|
| | if dify_config.BROKER_USE_SSL: |
| | celery_app.conf.update( |
| | broker_use_ssl=ssl_options, |
| | ) |
| |
|
| | celery_app.set_default() |
| | app.extensions["celery"] = celery_app |
| |
|
| | imports = [ |
| | "schedule.clean_embedding_cache_task", |
| | "schedule.clean_unused_datasets_task", |
| | "schedule.create_tidb_serverless_task", |
| | "schedule.update_tidb_serverless_status_task", |
| | ] |
| | day = dify_config.CELERY_BEAT_SCHEDULER_TIME |
| | beat_schedule = { |
| | "clean_embedding_cache_task": { |
| | "task": "schedule.clean_embedding_cache_task.clean_embedding_cache_task", |
| | "schedule": timedelta(days=day), |
| | }, |
| | "clean_unused_datasets_task": { |
| | "task": "schedule.clean_unused_datasets_task.clean_unused_datasets_task", |
| | "schedule": timedelta(days=day), |
| | }, |
| | "create_tidb_serverless_task": { |
| | "task": "schedule.create_tidb_serverless_task.create_tidb_serverless_task", |
| | "schedule": crontab(minute="0", hour="*"), |
| | }, |
| | "update_tidb_serverless_status_task": { |
| | "task": "schedule.update_tidb_serverless_status_task.update_tidb_serverless_status_task", |
| | "schedule": crontab(minute="30", hour="*"), |
| | }, |
| | } |
| | celery_app.conf.update(beat_schedule=beat_schedule, imports=imports) |
| |
|
| | return celery_app |
| |
|