django-celeryでtaskのretry数を確認する方法

from celery import shared_task

@shared_task(bind=True)
def my_task(self, arg):
    try:
        # ここにタスクの主要な処理を記述
        pass
    except SomeException as e:
        # 現在のリトライ回数を取得
        current_retry_count = self.request.retries

        # ログに現在のリトライ回数を記録
        print(f"Retry attempt: {current_retry_count}")

        # タスクを再試行
        self.retry(exc=e)

bind=True とすることで、タスク関数の第一引数として self(タスク自体のインスタンス)を受け取ることができます。
self.request.retries を使用して現在のリトライ回数を取得できます。

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です