운영환경에서 Python Celery 설정 가이드
운영환경에서 Celery를 안정적으로 운영하기 위해 필요한 핵심 설정값들과 실전 팁을 정리했습니다.
1. Celery 운영환경 개요
Celery의 역할
Celery는 비동기 작업 처리 시스템으로, 웹 서버의 응답 시간을 보장하기 위해 시간이 오래 걸리는 작업을 백그라운드에서 처리합니다.
Web 서버와 분리 운영하는 이유
- 응답 시간 보장: 웹 요청과 비동기 작업을 분리하여 사용자 경험 향상
- 리소스 독립성: Worker 프로세스가 메모리 누수나 장시간 실행으로 인해 문제가 생겨도 웹 서버에 영향 없음
- 확장성: Worker를 독립적으로 스케일링 가능
2. Broker / Backend 설정
Message Broker 선택
Redis (추천)
- 설정이 간단하고 운영이 쉬움
- Result Backend로도 활용 가능
- 대부분의 경우 Redis로 충분
RabbitMQ
- 더 복잡한 라우팅이 필요할 때
- 메시지 지속성이 중요한 경우
Result Backend 사용 여부
# settings.py
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' # Task 결과 저장
CELERY_RESULT_EXPIRES = 3600 # 결과 저장 시간 (초)
사용 시점: Task 실행 결과를 확인해야 할 때만 사용. 단순히 작업만 수행한다면 생략 가능.
연결 안정성 설정
CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = True
CELERY_BROKER_CONNECTION_RETRY = True
CELERY_BROKER_CONNECTION_MAX_RETRIES = 10
3. Worker 프로세스 설정
Worker 수 산정 기준
Worker 수 = (CPU 코어 수 - 1) ~ (CPU 코어 수 * 2)
- I/O 집약적 작업: 더 많은 Worker 가능
- CPU 집약적 작업: CPU 코어 수에 맞춰 설정
Concurrency 설정
celery -A myapp worker --concurrency=4
- 기본값: CPU 코어 수
- 각 Worker 프로세스 내에서 실행되는 스레드/프로세스 수
- I/O 대기 시간이 긴 작업은 높게 설정 가능
Prefetch 설정
CELERY_WORKER_PREFETCH_MULTIPLIER = 4 # 기본값: 4
- Worker가 한 번에 가져올 Task 수
- 주의: 너무 높으면 긴 작업이 있는 Worker가 다른 Task를 가져가서 불공평한 분배 발생
- 권장: 1~4 사이
Acks 설정
CELERY_TASK_ACKS_LATE = True # Task 완료 후 ACK
CELERY_WORKER_PREFETCH_MULTIPLIER = 1 # Acks Late 사용 시 1 권장
ACKS_LATE = True: Task 완료 후에만 ACK → Worker 장애 시 재시도 가능ACKS_LATE = False: Task 시작 시 ACK → 빠르지만 Worker 장애 시 유실 가능
메모리 누수 방지
CELERY_WORKER_MAX_TASKS_PER_CHILD = 1000 # 1000개 Task 후 재시작
CELERY_WORKER_MAX_MEMORY_PER_CHILD = 200000 # 200MB 초과 시 재시작 (KB 단위)
권장 전략:
max_tasks_per_child: 500~2000 사이worker_max_memory_per_child: 메모리 사용 패턴에 따라 조정- 주기적 재시작으로 메모리 누수 방지
4. Queue & Routing 전략
Queue 분리 기준
CELERY_TASK_ROUTES = {
'app.tasks.email.*': {'queue': 'email'},
'app.tasks.image.*': {'queue': 'image'},
'app.tasks.critical.*': {'queue': 'critical'},
}
분리 기준:
- 우선순위: 긴급한 작업은 별도 Queue
- 리소스 사용: CPU/메모리 집약적 작업 분리
- 실패 영향도: 한 작업의 실패가 다른 작업에 영향 주지 않도록
Priority Queue 활용
CELERY_TASK_ROUTES = {
'app.tasks.urgent.*': {'queue': 'high_priority'},
}
- RabbitMQ는 Priority Queue 지원
- Redis는 기본적으로 지원하지 않지만, 여러 Queue로 분리하여 우선순위 구현 가능
5. Task 안정성 관련 설정
Retry 정책
from celery import Task
class MyTask(Task):
autoretry_for = (Exception,)
retry_kwargs = {'max_retries': 3, 'countdown': 60}
retry_backoff = True # 지수 백오프
retry_backoff_max = 600 # 최대 600초
retry_jitter = True # 재시도 시간에 랜덤성 추가
권장 사항:
- 일시적 오류만 재시도 (네트워크 오류, 타임아웃 등)
- 영구적 오류는 재시도하지 않음 (ValidationError 등)
retry_backoff로 서버 부하 분산
Time Limit / Soft Time Limit
from celery.exceptions import SoftTimeLimitExceeded
@celery.task(time_limit=300, soft_time_limit=240)
def my_task():
try:
# time_limit: 하드 리밋 (강제 종료)
# soft_time_limit: 소프트 리밋 (SoftTimeLimitExceeded 예외 발생)
# Task Logic
pass
except SoftTimeLimitExceeded:
# Graceful 종료 처리
logger.warning("Task exceeded soft time limit")
# 정리 작업 수행
cleanup()
raise
soft_time_limit: 정상 종료 시도 (예외 처리 가능)time_limit: 강제 종료 (최후의 수단)- 전역 설정:
CELERY_TASK_TIME_LIMIT = 300 # 초 단위 CELERY_TASK_SOFT_TIME_LIMIT = 270 # 초 단위
Task 예외 처리
import logging
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
@celery.task
def my_task():
try:
# Task Logic
result = do_work()
logger.info(f"Task completed: {result}")
return result
except Exception as e:
# 예외 로깅
logger.error(f"Task failed: {e}", exc_info=True)
# 필요시 재시도를 위해 예외를 다시 던짐
raise
Idempotency 고려사항
@celery.task(bind=True)
def process_payment(self, order_id):
# 중복 실행 방지
if Payment.objects.filter(order_id=order_id).exists():
return "Already processed"
# 작업 수행
process(order_id)
- Task가 여러 번 실행되어도 같은 결과가 나오도록 설계
- DB 제약 조건, Redis Lock 등 활용
Deadlock 방지 (Redis Lock 활용)
import redis
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
redis_client = redis.StrictRedis()
@celery.task
def my_task(resource_id):
lock_key = f"task_lock:{resource_id}"
lock = redis_client.lock(lock_key, timeout=300) # 5분 타임아웃
if not lock.acquire(blocking=False):
logger.warning(f"Task is already running for resource {resource_id}")
return "Task already in progress"
try:
# Task Logic
process_resource(resource_id)
finally:
lock.release()
- 동일 리소스에 대한 중복 실행 방지
- Lock 타임아웃으로 Deadlock 방지
6. 장애 대응 및 복구 설정
Task 유실 방지 설정
CELERY_TASK_ACKS_LATE = True
CELERY_WORKER_PREFETCH_MULTIPLIER = 1
CELERY_TASK_REJECT_ON_WORKER_LOST = True
ACKS_LATE = True: Task 완료 후 ACKPREFETCH_MULTIPLIER = 1: 한 번에 하나씩만 가져오기REJECT_ON_WORKER_LOST = True: Worker 장애 시 Task 재배치
Worker 재시작 전략
# Graceful shutdown
celery -A myapp control shutdown
# 또는 Supervisor/systemd로 자동 재시작 설정
Graceful Shutdown 처리
import signal
from celery import Celery
app = Celery('myapp')
def shutdown_handler(signum, frame):
# 현재 실행 중인 Task 완료 대기
app.control.shutdown()
signal.signal(signal.SIGTERM, shutdown_handler)
7. 성능 최적화 관련 설정
Task Granularity 조절
- 너무 작은 Task: 오버헤드 증가
- 너무 큰 Task: Worker 점유 시간 증가, 재시도 비용 증가
- 권장: 1초~1분 정도 실행 시간이 적절
Chord / Group 사용 시 주의사항
from celery import group, chord
# Group: 병렬 실행
job = group(task.s(i) for i in range(10))
result = job.apply_async()
# Chord: Group 완료 후 콜백
chord(job)(callback.s())
주의사항:
- Chord는 모든 Task 완료를 기다리므로 하나라도 실패하면 콜백 실행 안 됨
- 대량의 Task는 메모리 사용량 증가
DB / 외부 API 병목 방지
@celery.task
def process_data():
# 배치 처리로 DB 쿼리 최소화
items = Item.objects.filter(status='pending')[:100]
for item in items:
process_item(item)
# 외부 API 호출은 타임아웃 설정
response = requests.get(url, timeout=30)
8. 로그 & 모니터링
Celery 로그 레벨 설정
CELERY_WORKER_LOG_FORMAT = '[%(asctime)s: %(levelname)s/%(processName)s] %(message)s'
CELERY_WORKER_TASK_LOG_FORMAT = '[%(asctime)s: %(levelname)s/%(processName)s][%(task_name)s(%(task_id)s)] %(message)s'
Task 단위 로그 전략
import logging
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
@celery.task
def my_task():
logger.info("Task started")
try:
result = do_work()
logger.info(f"Task completed: {result}")
except Exception as e:
logger.error(f"Task failed: {e}", exc_info=True)
raise
Flower / Prometheus 연동
# Flower 실행
celery -A myapp flower
# Prometheus 메트릭 수집
CELERY_WORKER_SEND_TASK_EVENTS = True
CELERY_TASK_SEND_SENT_EVENT = True
Dead Letter Queue 설정
실패한 Task를 별도의 Dead Letter Queue로 보내고 분석할 수 있습니다.
RabbitMQ 설정:
CELERY_TASK_ROUTES = {
'app.tasks.*': {'queue': 'default'},
}
CELERY_TASK_DEFAULT_QUEUE = 'default'
CELERY_TASK_DEFAULT_EXCHANGE = 'default'
CELERY_TASK_DEFAULT_ROUTING_KEY = 'default'
Redis에서 Dead Letter 처리:
from celery import Task
class CustomTask(Task):
def on_failure(self, exc, task_id, args, kwargs, einfo):
# 실패한 Task를 별도 저장소에 기록
save_failed_task(task_id, args, kwargs, str(exc))
super().on_failure(exc, task_id, args, kwargs, einfo)
@celery.task(base=CustomTask)
def my_task():
pass
9. 배포 및 운영 전략
무중단 배포 고려사항
- 롤링 업데이트: Worker를 하나씩 재시작
- Graceful Shutdown: 실행 중인 Task 완료 대기
- Queue 모니터링: 배포 중 Queue 길이 확인
롤링 업데이트 시 주의점
# 1. 새 Worker 시작
celery -A myapp worker --concurrency=4
# 2. 기존 Worker에 shutdown 신호
celery -A myapp control shutdown
# 3. Queue 길이 확인
celery -A myapp inspect active
환경별 설정 분리
# settings.py
if os.environ.get('ENV') == 'production':
CELERY_WORKER_CONCURRENCY = 8
CELERY_WORKER_MAX_TASKS_PER_CHILD = 1000
elif os.environ.get('ENV') == 'development':
CELERY_WORKER_CONCURRENCY = 2
CELERY_WORKER_MAX_TASKS_PER_CHILD = 50
10. 운영하면서 자주 겪는 실수들
Default 설정 그대로 사용
- 문제:
PREFETCH_MULTIPLIER = 4로 인해 긴 작업이 있는 Worker가 계속 Task를 가져감 - 해결:
ACKS_LATE = True와 함께PREFETCH_MULTIPLIER = 1사용
Worker 과다/과소 설정
- 과다 설정: 컨텍스트 스위칭 오버헤드, 메모리 부족
- 과소 설정: Queue 쌓임, 처리 지연
- 해결: 모니터링을 통해 점진적으로 조정
Retry 폭탄 이슈
# 잘못된 예
@celery.task(autoretry_for=(Exception,), max_retries=10)
def my_task():
if some_condition: # 영구적 실패 조건
raise ValueError("Permanent failure")
# 이 경우 계속 재시도되어 리소스 낭비
해결: 영구적 실패는 재시도하지 않도록 예외 타입 구분
11. 추천 설정 모음
운영환경에서 권장하는 설정값들을 모아놓은 예시입니다.
기본 안정성 설정
# settings.py
# Task Timeout
CELERY_TASK_TIME_LIMIT = 300 # 5분 (하드 리밋)
CELERY_TASK_SOFT_TIME_LIMIT = 270 # 4.5분 (소프트 리밋)
# Worker 안정성
CELERY_TASK_ACKS_LATE = True # Task 완료 후 ACK
CELERY_TASK_REJECT_ON_WORKER_LOST = True # Worker 장애 시 재배치
CELERY_WORKER_PREFETCH_MULTIPLIER = 1 # 한 번에 하나씩만 가져오기
# Worker 자동 재시작
CELERY_WORKER_MAX_TASKS_PER_CHILD = 1000 # 1000개 Task 후 재시작
CELERY_WORKER_MAX_MEMORY_PER_CHILD = 200000 # 200MB 초과 시 재시작 (KB 단위)
# 재시도 설정
CELERY_TASK_DEFAULT_RETRY_DELAY = 60 # 1분 대기 후 재시도
CELERY_TASK_MAX_RETRIES = 3 # 최대 3회 재시도
Worker 실행 명령어
celery -A myapp worker \
--concurrency=4 \
--max-tasks-per-child=1000 \
--loglevel=info
Task 예시 (권장 패턴)
from celery import shared_task
from celery.exceptions import SoftTimeLimitExceeded, Retry
from celery.utils.log import get_task_logger
import logging
logger = get_task_logger(__name__)
@shared_task(
bind=True,
time_limit=300,
soft_time_limit=270,
max_retries=3,
default_retry_delay=60,
autoretry_for=(ConnectionError, TimeoutError),
retry_backoff=True,
retry_backoff_max=600,
retry_jitter=True
)
def my_task(self, resource_id):
try:
logger.info(f"Task started for resource {resource_id}")
# Task Logic
result = process_resource(resource_id)
logger.info(f"Task completed: {result}")
return result
except SoftTimeLimitExceeded:
logger.warning("Task exceeded soft time limit")
cleanup()
raise
except (ConnectionError, TimeoutError) as e:
logger.error(f"Temporary error: {e}")
raise self.retry(exc=e)
except Exception as e:
logger.error(f"Task failed: {e}", exc_info=True)
raise
핵심요약
- Worker 설정:
ACKS_LATE = True,PREFETCH_MULTIPLIER = 1로 Task 유실 방지 - 메모리 관리:
max_tasks_per_child로 주기적 재시작하여 메모리 누수 방지 - Queue 분리: 우선순위와 리소스 사용에 따라 Queue 분리
- Retry 전략: 일시적 오류만 재시도, 지수 백오프 적용
- 모니터링: Flower/Prometheus로 Worker 상태와 Queue 길이 지속 모니터링
- 배포 전략: Graceful Shutdown으로 무중단 배포 구현
운영환경에서는 기본 설정을 그대로 사용하지 말고, 실제 워크로드에 맞춰 위 설정들을 조정하는 것이 중요합니다.