blog

๐Ÿ“„ Django + DRF + Celery ์—์„œ ์‚ฌ์šฉ๊ฐ€๋Šฅํ•œ ๋Œ€์šฉ๋Ÿ‰ CSV ๋‹ค์šด๋กœ๋“œ ๋ชจ๋“ˆ โ€“ CSVDownloader ์†Œ๊ฐœ

๋‚ ์งœ: 2025-06-08

๋ชฉ๋ก์œผ๋กœ


1. ๋ฐฐ๊ฒฝ ๋ฐ ๋ชฉ์ 

CSV ๋‹ค์šด๋กœ๋“œ ๊ธฐ๋Šฅ์€ ๊ด€๋ฆฌ ์„œ๋น„์Šค์—์„œ ์ž์ฃผ ์‚ฌ์šฉ๋˜๋ฉฐ, ํ•„ํ„ฐ๋ง, ์ •๋ ฌ, ํฌ๋งท ์ œ์–ด, ๋น„๋™๊ธฐ ์ฒ˜๋ฆฌ, ํŒŒ์ผ ์—…๋กœ๋“œ ๋“ฑ ๋ณต์žกํ•œ ์š”๊ตฌ์‚ฌํ•ญ์ด ๋งŽ๋‹ค. CSVDownloader๋Š” ์ด๋Ÿฐ ๋ฐ˜๋ณต ์ž‘์—…์„ ๊ณตํ†ตํ™”ํ•˜์—ฌ ๋น ๋ฅด๊ณ  ์•ˆ์ •์ ์œผ๋กœ ์ฒ˜๋ฆฌํ•˜๊ธฐ ์œ„ํ•ด ๋งŒ๋“ค์–ด์กŒ๋‹ค.

๋‚ด๊ฐ€ ๋งŒ๋“  CSVDownloader๋Š” ์•„๋ž˜์˜ ๊ธฐ๋Šฅ์„ ์ œ๊ณตํ•œ๋‹ค.

CSVDownloader๋Š” ์‹ค๋ฌด์—์„œ ํ•„์š”ํ•œ CSV ๋‹ค์šด๋กœ๋“œ ๊ธฐ๋Šฅ์„ ํ•˜๋‚˜์˜ ํด๋ž˜์Šค๋กœ ๊น”๋”ํ•˜๊ฒŒ ์บก์Аํ™”ํ•œ ์œ ํ‹ธ๋ฆฌํ‹ฐ์ด๋‹ค.

Django + DRF ๊ธฐ๋ฐ˜์—์„œ ๊ด€๋ฆฌ ํ™”๋ฉด, ์–ด๋“œ๋ฏผํˆด, ๋ฐฑ์˜คํ”ผ์Šค ๋‹ค์šด๋กœ๋“œ ๊ธฐ๋Šฅ์„ ๊ตฌํ˜„ํ•  ๋•Œ ๋น ๋ฅด๊ฒŒ ์ ์šฉํ•  ์ˆ˜ ์žˆ๊ณ , S3 ์—…๋กœ๋“œ๊นŒ์ง€ ์ž๋™์œผ๋กœ ์ฒ˜๋ฆฌํ•ด์ฃผ๋Š” ๊ตฌ์กฐ๋กœ ๋Œ€๋Ÿ‰ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ๋„ ๋ฌด๋ฆฌ ์—†์ด ๋Œ€์‘ ๊ฐ€๋Šฅํ•˜๋‹ค.


2. ์ฝ”๋“œ ์†Œ๊ฐœ

CSVDownloader๋Š” ๋‹ค์Œ ์ปดํฌ๋„ŒํŠธ๋“ค๋กœ ๊ตฌ์„ฑ๋ฉ๋‹ˆ๋‹ค:

import csv
from io import StringIO
from typing import List

from django.conf import settings
from django.http import StreamingHttpResponse
from django.utils import timezone
from rest_framework.exceptions import ValidationError


def set_progress(task_id, progress, total):
    """Celery task ์˜ ์ง„ํ–‰ํ˜„ํ™ฉ ์ •๋ณด๋ฅผ redis ์— set ํ•ด์ฃผ๋Š” ํ•จ์ˆ˜"""
    try:
        cache.set(
            CacheKey.CELERY_TASK_PROGRESS.get(task_id=task_id),
            {"progress": progress, "total": total},
            CacheKey.CELERY_TASK_PROGRESS.timeout,
        )
    except Exception as e:
        logger.warning(f"๋น„๋™๊ธฐ ์ฒ˜๋ฆฌ set_process ์˜ค๋ฅ˜: {e}")


def upload_s3(temp_path: str, upload_path: str, bucket: str) -> str:
    try:
        s3_client = AWSS3ClientService.get_client()
        s3_client.upload_file(temp_path, bucket, upload_path)

        download_url = AWSS3ClientService.generate_presigned_url(
            "get",
            bucket,
            upload_path,
            expires_in=24 * 60 * 60,
        )
        return download_url
    except Exception as e:
        logger.error(f"s3 ์—…๋กœ๋“œ ์‹คํŒจ {e}")
        raise


class CSVDownloader:
    """
    CSV ๋‹ค์šด๋กœ๋“œ ์‹œ, ์ข€ ๋” ์‰ฝ๊ณ  ๋น ๋ฅด๊ฒŒ ๊ฐœ๋ฐœํ•˜๊ธฐ์œ„ํ•˜์—ฌ
    Django + DRF + Celery ์˜ ์œ ์šฉํ•œ ๊ธฐ๋Šฅ์„ ์ตœ๋Œ€ํ•œ ํ™œ์šฉํ•˜์—ฌ ๋งŒ๋“  ๋ชจ๋“ˆ

    - ์ด ํด๋ž˜์Šค๋ฅผ ์ƒ์†๋ฐ›์€ ํ•˜์œ„ ํด๋ž˜์Šค๋ฅผ ๋งŒ๋“ค๊ณ 
    - ํ•„์š”์‹œ class ์†์„ฑ์„ override ํ•˜์—ฌ
    - ์›ํ•˜๋Š” ๋ฐ์ดํ„ฐ๋ฅผ ์›ํ•˜๋Š” ๋ฐฉ์‹์œผ๋กœ CSV ์ถœ๋ ฅํ•˜์—ฌ ์‚ฌ์šฉํ•˜์„ธ์š”
    """
 
    filterset_class = None  # queryset ํ•„ํ„ฐ ํด๋ž˜์Šค
    query_params = None  # GET request ์˜ query_param ๊ทธ๋Œ€๋กœ ์ „๋‹ฌ
    queryset = None   # Django ORM ์˜ QuerySet ๊ฐ์ฒด (DB ์—์„œ ๋ถˆ๋Ÿฌ์˜ฌ ๋ฐ์ดํ„ฐ)
    ordering_fields = None  # ์ •๋ ฌ ์กฐ๊ฑด ["-id", "-created_at"]
    serializer_class = None  # CSV ๋ฐ์ดํ„ฐ์šฉ ์‹œ๋ฆฌ์–ผ๋ผ์ด์ ธ ํด๋ž˜์Šค
    value_convert_map = {  # CSV ๋กœ ์ถœ๋ ฅํ• ๋•Œ ์ž๋™ ๋ณ€ํ™˜ํ•  ํ•„๋“œ ๋ฐ์ดํ„ฐ
        "None": "-",
        "True": "Y",
        "False": "N",
    }

    # ๊ธฐํƒ€ ์„ธํŒ…
    allow_row_limit = 100000
    s3_bucket = settings.AWS_STORAGE_PRIVATE_BUCKET_NAME
    s3_path = None
    file_prefix = "csv_download"  # ํŒŒ์ผ๋ช…
    celery_task_id = None  # progress ์ฒดํ‚น์šฉ (redis ์— ์ €์žฅ)
    celery_progress_row_offset = 50  # progress ๋ฐ์ดํ„ฐ๋ฅผ ๊ฐฑ์‹ ํ•  ์ฒ˜๋ฆฌ row ๊ฐœ์ˆ˜ ๊ธฐ์ค€

    def __init__(self, *args, **kwargs):
        self.user_id = kwargs.get("user_id")
        self.query_params = kwargs.get("query_params", {})
        self.filterset_class = kwargs.get("filterset_class", self.filterset_class)
        self.serializer_class = kwargs.get("serializer_class", self.serializer_class)
        self.value_convert_map = kwargs.get("value_convert_map", self.value_convert_map)
        self.allow_row_limit = kwargs.get("allow_row_limit", self.allow_row_limit)
        self.file_prefix = kwargs.get("file_prefix", self.file_prefix)
        self.s3_bucket = kwargs.get("s3_bucket", self.s3_bucket)

        if self.s3_path is None:
            today = timezone.now().strftime("%Y%m%d")
            self.s3_path = kwargs.get("s3_path", f"download/{today}/csv")

        queryset = kwargs.get("queryset")
        if queryset is None:
            raise ValidationError("queryset is required")
        self.queryset = self.get_queryset(queryset)

        self.celery_task_id = kwargs.get("celery_task_id")
        if self.celery_task_id:
            set_progress(self.celery_task_id, 1, 100)

    def get_queryset(self, queryset):
        if self.filterset_class:
            filterset = self.filterset_class(self.query_params, queryset=queryset)
            queryset = filterset.qs
        requested_record_count = queryset.count()

        if self.allow_row_limit and requested_record_count > self.allow_row_limit:
            raise ValidationError(
                f"๋‹ค์šด๋กœ๋“œ ํ•œ๋„ ์ดˆ๊ณผ: ์ตœ๋Œ€ {self.allow_row_limit} ๊ฐœ ๊นŒ์ง€ ์กฐํšŒ ๊ฐ€๋Šฅํ•ฉ๋‹ˆ๋‹ค. "
                f"(์š”์ฒญ ๋ ˆ์ฝ”๋“œ ์ˆ˜: {requested_record_count})"
            )

        if self.ordering_fields:
            queryset = queryset.order_by(*self.ordering_fields)
        else:
            queryset = queryset.order_by("-pk")

        return queryset

    def get_result_as_download_url(self) -> str:
        """์—…๋กœ๋“œํ•œ CSV ํŒŒ์ผ์˜ ๋‹ค์šด๋กœ๋“œ URL ๋ฆฌํ„ด"""
        csv_content = self.generate_csv()
        server_file_path = f"{settings.TMP_FILE_DIR}/{self.get_filename()}"
        s3_upload_path = f"{self.s3_path}/{self.get_filename()}"
        with open(server_file_path, "w", newline="", encoding="utf-8-sig") as f:
            f.write(csv_content.getvalue())

        download_url = upload_s3(server_file_path, s3_upload_path, self.s3_bucket)
        return download_url

    def get_result_as_streaming_response(self) -> StreamingHttpResponse:
        """CSV ํŒŒ์ผ์„ ์ŠคํŠธ๋ฆฌ๋ฐ ์‘๋‹ต์œผ๋กœ ๋ฆฌํ„ด"""
        response = StreamingHttpResponse(
            (line + "\n" for line in self.generate_csv_lines()),  # generator ์‚ฌ์šฉ
            content_type="text/csv"
        )
        response["Content-Disposition"] = f'attachment; filename="{self.get_filename()}"'
        return response

    def generate_csv_lines(self):
        yield ",".join(self.get_csv_headers())
        for idx, item in enumerate(self.queryset.iterator()):  # iterator() ์‚ฌ์šฉ
            serializer = self.get_serializer_class()(item)
            yield ",".join(self.get_csv_row(serializer.data))

            if self.celery_task_id and idx % self.celery_progress_row_offset == 0:
                set_progress(self.celery_task_id, int(idx / len(self.queryset) * 100), 100)

    def generate_csv(self) -> StringIO:
        csv_file = StringIO()
        writer = csv.writer(csv_file, quoting=csv.QUOTE_ALL)

        headers = self.get_csv_headers()
        writer.writerow(headers)

        total = len(self.queryset)
        for idx, item in enumerate(self.queryset):
            serializer_class = self.get_serializer_class()
            serializer = serializer_class(item)
            row = self.get_csv_row(serializer.data)
            writer.writerow(row)

            if (
                idx % self.celery_progress_row_offset == 0
            ) and self.celery_task_id is not None:
                set_progress(self.celery_task_id, int(idx / total * 100), 100)

        csv_file.seek(0)
        return csv_file

    def get_serializer_class(self):
        if self.serializer_class is None:
            raise NotImplementedError("serializer_class is required")
        return self.serializer_class

    def get_filename(self):
        timestamp = timezone.now().strftime("%Y%m%d_%H%M%S")
        return f"{self.file_prefix}_{timestamp}.csv"

    def get_csv_headers(self) -> List[str]:
        serializer_class = self.get_serializer_class()
        serializer = serializer_class()
        headers = []
        for field_name, field in serializer.fields.items():
            if field.label is not None:
                headers.append(str(field.label))
            elif hasattr(field, "Meta") and hasattr(field.Meta, "model"):
                model_field = field.Meta.model._meta.get_field(field_name)
                headers.append(str(model_field.verbose_name))
            else:
                headers.append(field_name)
        return headers

    def get_csv_row(self, item) -> List[str]:
        row_data = []
        for value in item.values():
            str_val = str(value)
            if str_val in self.value_convert_map:
                converted_val = self.value_convert_map[str_val]
                row_data.append(converted_val)
            else:
                row_data.append(str_val)

        return row_data

3. ์‚ฌ์šฉ๋ฒ• (์˜ˆ์ œ ์ฝ”๋“œ)

๐ŸŽฏ View ์—์„œ ์ŠคํŠธ๋ฆฌ๋ฐ ์‘๋‹ต์œผ๋กœ ์ง์ ‘ ๋‹ค์šด๋กœ๋“œ

from myapp.csv_downloader import CSVDownloader

def download_view(request):
    downloader = CSVDownloader(
        user_id=request.user.id,
        queryset=MyModel.objects.all(),
        filterset_class=MyModelFilter,
        serializer_class=MyModelSerializer,
        query_params=request.GET,
    )
    return downloader.get_result_as_streaming_response()

โ˜๏ธ Celery Task ์—์„œ S3 ์—…๋กœ๋“œ ํ›„ URL ๋ฆฌํ„ด

class CMSMissionRewardLogCSVDownloader(CSVDownloader):
    filterset_class = CMSMissionRewardLogFilter
    file_prefix = "cms_mission_reward_log_csv_download"
    s3_bucket = settings.AWS_STORAGE_PRIVATE_BUCKET_NAME
    serializer_class = CMSMissionRewardLogSerializerForCSV


@shared_task()
def mission_reward_log_csv_download(query_params: dict):
    queryset = (
        MissionRewardLog.objects.all()
        .select_related("user", "mission", "reward")
        .order_by("-id")
    )

    download_url = CMSMissionRewardLogCSVDownloader(
        query_params=query_params,
        queryset=queryset,
        celery_task_id=current_task.request.id,
    ).get_result_as_download_url()

    return {"file_url": download_url}

4. ๋™์ž‘์›๋ฆฌ

  1. ์ดˆ๊ธฐํ™”

    • ์ „๋‹ฌ๋ฐ›์€ queryset์— ๋Œ€ํ•ด filterset์œผ๋กœ ํ•„ํ„ฐ๋ง
    • ๋ ˆ์ฝ”๋“œ ์ˆ˜๊ฐ€ allow_row_limit ์ดˆ๊ณผ ์‹œ ์˜ค๋ฅ˜ ๋ฐœ์ƒ
    • ์ •๋ ฌ ์˜ต์…˜(ordering_fields) ์ ์šฉ
  2. CSV ์ƒ์„ฑ

    • serializer_class๋ฅผ ํ†ตํ•ด ํ—ค๋” ์ƒ์„ฑ
    • ๊ฐ row๋ฅผ serializer๋ฅผ ํ†ตํ•ด ๋ณ€ํ™˜
    • True, False, None์€ ์ปค์Šคํ…€ ๋ฌธ์ž์—ด๋กœ ๋ณ€ํ™˜ (value_convert_map)
    • celery_task_id๊ฐ€ ์žˆ์œผ๋ฉด ์ผ์ • row๋งˆ๋‹ค set_progress() ํ˜ธ์ถœ
  3. ๊ฒฐ๊ณผ ๋ฐ˜ํ™˜

    • S3 ์—…๋กœ๋“œ โ†’ URL ๋ฆฌํ„ด
    • ๋˜๋Š” StreamingHttpResponse๋กœ ์ง์ ‘ ์ „์†ก

5. ์žฅ์  ๋ฐ ๋‹จ์ 

โœ… ์žฅ์ 

ํ•ญ๋ชฉ ์„ค๋ช…
๋ฒ”์šฉ์„ฑ filterset, serializer ์ฃผ์ž… ๋ฐฉ์‹์œผ๋กœ ์–ด๋””์„œ๋“  ์‚ฌ์šฉ ๊ฐ€๋Šฅ
ํ™•์žฅ์„ฑ ์ง„ํ–‰๋ฅ  ํŠธ๋ž˜ํ‚น, S3 ์—…๋กœ๋“œ, ํŒŒ์ผ ์ด๋ฆ„ ๋“ฑ ์ปค์Šคํ„ฐ๋งˆ์ด์ง• ๊ฐ€๋Šฅ
์ผ๊ด€์„ฑ DRF serializer๋ฅผ ๊ทธ๋Œ€๋กœ ์‚ฌ์šฉํ•˜์—ฌ API ์‘๋‹ต๊ณผ ๋™์ผํ•œ ํฌ๋งท ์œ ์ง€
์‹ค๋ฌด ์ตœ์ ํ™” row ์ œํ•œ, label ํ—ค๋”, value ๋ณ€ํ™˜ ๋“ฑ ์‹ค์ œ ์šด์˜์— ์ ํ•ฉ

โ— ๋‹จ์ 

ํ•ญ๋ชฉ ์„ค๋ช…
๋ฉ”๋ชจ๋ฆฌ ๊ธฐ๋ฐ˜ ์ฒ˜๋ฆฌ ์ „์ฒด CSV๋ฅผ ๋ฉ”๋ชจ๋ฆฌ์— ์˜ฌ๋ฆฌ๋Š” ๊ตฌ์กฐ (10๋งŒ๊ฑด ์ด์ƒ ์‹œ ์œ ์˜ ํ•„์š”)
serializer ์„ฑ๋Šฅ ํ•„๋“œ๊ฐ€ ๋ณต์žกํ•˜๊ฑฐ๋‚˜ ์ˆ˜์‹ญ๋งŒ ๊ฑด ์ด์ƒ์ผ ๊ฒฝ์šฐ ๋А๋ ค์งˆ ์ˆ˜ ์žˆ์Œ
row ์—๋Ÿฌ ์ฒ˜๋ฆฌ ์—†์Œ ํŠน์ • row ์ง๋ ฌํ™” ์—๋Ÿฌ ๋ฐœ์ƒ ์‹œ ์ „์ฒด ์‹คํŒจ ๊ฐ€๋Šฅ์„ฑ

๋ชฉ๋ก์œผ๋กœ