blog

πŸ” Djangoμ—μ„œ Redis둜 λ™μ‹œμ„± μ œμ–΄ν•˜κΈ°: concurrency_safe() μ»¨ν…μŠ€νŠΈ λ§€λ‹ˆμ € μ†Œκ°œ

λ‚ μ§œ: 2025-06-07

λͺ©λ‘μœΌλ‘œ


λ™μΌν•œ μ‚¬μš©μž μš”μ²­μ΄ λ™μ‹œμ— μ—¬λŸ¬ 번 처리되면 쀑볡 결제, 쀑볡 등둝, μƒνƒœ κΌ¬μž„ λ“±μ˜ λ¬Έμ œκ°€ 생기기 μ‰½μŠ΅λ‹ˆλ‹€.

이 κΈ€μ—μ„œλŠ” μ œκ°€ 직접 ν•„μš”ν•΄μ„œ λ§Œλ“€μ—ˆλ˜ Django + Redis 기반의 κ°„λ‹¨ν•˜κ³  μ•ˆμ „ν•˜κ²Œ λ™μ‹œμ„± μ œμ–΄λ₯Ό κ΅¬ν˜„ν•  수 μžˆλŠ” concurrency_safe() μ»¨ν…μŠ€νŠΈ λ§€λ‹ˆμ €λ₯Ό μ†Œκ°œν•©λ‹ˆλ‹€.


🎯 λͺ©μ 

concurrency_safe()λŠ” λ‹€μŒκ³Ό 같은 λͺ©μ μ„ κ°–κ³  μ„€κ³„λ˜μ—ˆμŠ΅λ‹ˆλ‹€:


πŸ’» μ½”λ“œ μ†Œκ°œ

import logging
import time
from contextlib import contextmanager
from django.core.cache import cache

logger = logging.getLogger(__name__)


class ConcurrencyError(Exception):
    """λ™μ‹œμ„± 좩돌둜 μΈν•œ μ—λŸ¬"""


@contextmanager
def concurrency_safe(
    cache_key: str,
    ttl: int = 3,
    wait_timeout: int = 0,
    exception_cls=ConcurrencyError,
    blocked_message="μž μ‹œ ν›„ λ‹€μ‹œ μ‹œλ„ν•΄μ£Όμ„Έμš”.",
    blocked_log_level=logging.WARNING,
):
    """
    Redis lock 기반 μ‚¬μš©μžλ³„ μš”μ²­ λ™μ‹œμ„± μ œμ–΄ context manager

    Args:
        cache_key (str): κ³ μœ ν•œ 락 식별 ν‚€ (ex. `"lock:purchase:{user_id}:{ticket_id}"`)
        ttl (int): lock μœ μ§€ μ‹œκ°„ (초)
        wait_timeout (int): lock νšλ“ λŒ€κΈ° μ‹œκ°„ (초), κΈ°λ³Έ 0초 β†’ λŒ€κΈ° 없이 μ¦‰μ‹œ μ‹€νŒ¨
        exception_cls: μš”μ²­μ΄ 차단 λ˜μ—ˆμ„λ•Œ λ°œμƒμ‹œν‚¬ 였λ₯˜ 클래슀
        blocked_message: μš”μ²­μ΄ 차단 λ˜μ—ˆμ„λ•Œ μœ μ €μ—κ²Œ ν‘œμ‹œν•  였λ₯˜ λ©”μ‹œμ§€
        blocked_log_level: μš”μ²­μ΄ 차단 λ˜μ—ˆμ„λ•Œ 남길 차단 둜그의 둜그레벨

    Usage:
        with concurrency_safe("my_key:{user_id}:{ticket_id}"):
            # λ™μ‹œμ„± μ œμ–΄κ°€ ν•„μš”ν•œ μ½”λ“œ
    """
    start_time = time.time()

    while True:
        acquired = cache.add(cache_key, "1", timeout=ttl)
        if acquired:
            try:
                yield
            finally:
                cache.delete(cache_key)
            return

        if time.time() - start_time > wait_timeout:
            logger.log(
                blocked_log_level, f"[concurrency_safe] lock νšλ“ μ‹€νŒ¨: {cache_key=}"
            )
            raise exception_cls(blocked_message)

        time.sleep(0.1)

πŸ§ͺ μ‚¬μš©λ²•

πŸ“Œ μ ν•©ν•œ 상황

μ‹€μ‚¬μš© 예제 1: 둜직 쀑볡 μ‹€ν–‰ λ°©μ§€

# νŠΉμ • μœ μ € - νŠΉμ • 보상 쀑볡 μ§€κΈ‰ λ°©μ§€
cache_key_enum = CacheKey.MISSION_USER_GIVE_REWARD_LOCK
cache_key = cache_key_enum.code.format(user_id=user_id, reward_id=reward_id)

with concurrency_safe(cache_key=cache_key, ttl=cache_key_enum.timeout):
    check_n_setup_mapping_n_give_reward(
        user,
        mission,
        reward,
        user_action_log,
        is_manual=is_manual,
        is_debug=is_debug,
    )

πŸ“Œ μ‹€μ‚¬μš© 예제 2: TTL 을 μ΄μš©ν•œ μš”μ²­ 간격 μ œμ–΄

@action(detail=False, methods=["put"], url_path="refresh")
def refresh(self, request, *args, **kwargs):

    if Environment.get_member(settings.ENV).is_dev():
        task = dodo_generate_buyer_interested_product.delay(self.partner.id)
        return TaskResponse(task)

    with concurrency_safe(  # 10λΆ„ 간격 μš”μ²­ μ œμ–΄
        cache_key=CacheKey.DODO_REFRESH_INTERESTED_PRODUCT.get(
            user_id=request.user.id
        ),
        ttl=CacheKey.DODO_REFRESH_INTERESTED_PRODUCT.timeout,
        exception_cls=ValidationError,
        blocked_message="μž μ‹œ ν›„ λ‹€μ‹œ μ‹œλ„ν•΄μ£Όμ„Έμš” (λ§ˆμ§€λ§‰ κ°±μ‹  ν›„, 10λΆ„ λ’€ λΆ€ν„° κ°€λŠ₯)",
    ):
        task = dodo_generate_buyer_interested_product.delay(self.partner.id)

    return TaskResponse(task)

βš™οΈ λ™μž‘ 원리

  1. Redis SETNX 역할을 ν•˜λŠ” cache.add()둜 락을 μ‹œλ„
  2. 락 νšλ“ 성곡 μ‹œ β†’ yield 블둝 μ‹€ν–‰
  3. μ’…λ£Œ ν›„ β†’ cache.delete()둜 락 ν•΄μ œ
  4. 락을 νšλ“ν•˜μ§€ λͺ»ν•œ 경우:

    • wait_timeout λ‚΄ μž¬μ‹œλ„ (0.1초 간격)
    • νƒ€μž„μ•„μ›ƒ μ‹œ μ˜ˆμ™Έ λ°œμƒ

βœ… μž₯점

❗ 단점 및 ν•œκ³„

πŸ“ 마무리

concurrency_safe()λŠ” Django + Redis ν™˜κ²½μ—μ„œ λ°œμƒν•  수 μžˆλŠ” 쀑볡 μš”μ²­ 문제λ₯Ό κ°„λ‹¨ν•˜κ²Œ ν•΄κ²°ν•  수 μžˆλŠ” λ„κ΅¬μž…λ‹ˆλ‹€. ν¬λ¦¬ν‹°μ»¬ν•œ μƒνƒœ λ³€ν™” λ‘œμ§μ— κ°€λ³κ²Œ λΆ™μ—¬ 쀑볡 싀행을 λ°©μ§€ν•˜κ³ , 운영 쀑 문제λ₯Ό μ€„μ΄λŠ” 데 큰 도움이 λ©λ‹ˆλ‹€.


λͺ©λ‘μœΌλ‘œ