[AI Tech] GCP Composer로 Airflow 활용하기
1. GCP Composer 세팅
1. Composer 생성
GCP 콘솔로 이동해서 Composer2 생성을 해준다.
다음과 같이 세팅을 해주고 필요하면 권한 설정을 해준다.
추가적으로 IAM 설정으로 들어가서 Composer 서비스 계정에 역할을 추가적으로 지정해줘야한다.
추가적으로 주어야하는 권한은
- BigQuery 사용자
- Cloud Composer v2 API 서비스 에이전트 확장 프로그램
- Composer 관리자
- Composer 작업자
- Dataproc 작업자
- Dataproc 편집자
등등… 자세한 내용은 해당 링크 참고
[IAM으로 액세스 제어 | Cloud Composer | Google Cloud](https://cloud.google.com/composer/docs/how-to/access-control?hl=ko) |
- 이걸 설정 안하면…
요런 에러를 볼수 있다.
2. DAGs 생성
코드는 강의에서 배운 것과 유사함
-
샘플 DAGs 코드
from airflow import DAG from datetime import datetime, timedelta from pathlib import Path from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator # GCS 데이터를 BigQuery로 옮김 from airflow.providers.google.cloud.operators.bigquery import BigQueryExecuteQueryOperator # BigQuery에서 Query를 실행 from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator # Local에서 GCS로 데이터를 옮김 PROJECT_ID = "level3-416207" # 프로젝트 ID BUCKET_NAME = "level3_recsys02" # 업로드할 GCS Bucket FILE_NAME = "users_device_info.csv" LOCAL_FILE_PATH = str(Path(__file__).parent.parent / "data" / FILE_NAME) # Composer와 연결된 GCS의 경로 GCS_PATH = f"user_info/users_device_info.csv" # 업로드할 GCS(BigQuert와 연결된)의 경로 default_args = { "owner": "czero", "depends_on_past": False, "start_date": datetime(2024, 3, 7), "end_date": datetime(2024, 3, 12) } # GCS에서 BigQuery에 적재하기전 스키마 설정 schema_fields = [ { "mode": "NULLABLE", "name": "id", "type": "STRING" }, { "mode": "NULLABLE", "name": "keywords", "type": "STRING" }, { "mode": "NULLABLE", "name": "favorite_ids", "type": "STRING" }, ] with DAG( dag_id="elt-users_info", default_args=default_args, schedule_interval="30 0 * * *", tags=["user"], catchup=True ) as dag: # 1) Extract : Local To GCS extract_data = LocalFilesystemToGCSOperator( task_id="extract_data", src=LOCAL_FILE_PATH, dst=GCS_PATH, bucket=BUCKET_NAME ) # 2) Load : GCS To BigQuery load_csv = GCSToBigQueryOperator( task_id="gcs_to_bigquery", bucket=BUCKET_NAME, source_objects=[GCS_PATH], destination_project_dataset_table=f"{PROJECT_ID}.log_129.user_device", # from GCS to BigQuery schema_fields=schema_fields, source_format='CSV', skip_leading_rows=1, create_disposition="CREATE_IF_NEEDED", write_disposition="WRITE_TRUNCATE", location="US" ) # 3) Transform : BigQuery에서 Query 실행해서 다시 BigQuery에 저장 sql_query = f""" SELECT * FROM `{PROJECT_ID}.log_129.user_device` WHERE keywords IS NOT NULL AND favorite_ids IS NOT NULL; """ transform = BigQueryExecuteQueryOperator( task_id="run_query", sql=sql_query, use_legacy_sql=False, allow_large_results=True, write_disposition="WRITE_TRUNCATE", destination_dataset_table=f"{PROJECT_ID}.log_129.user_device_notNULL" ) extract_data >> load_csv >> transform
다만 주의해야 할 점은
GCS에서 BigQuery로 적재할 때 데이터셋이 사전에 생성되어있는지 확인해주어야한다.
테이블을 없으면 생성해주지만, 데이터셋은 그렇지 않는 것 같다. ㅎㅎ(이것 땜에 몇시간 날림)
3. 데이터 및 DAGs 업로드
Composer를 생성하면 Composer와 연결된 GCS가 다음과 같이 별도로 생성된다.
1) 데이터 업로드
해당 GCS에는 data, dags등을 업로드 할 수 있다. 내가 사용할 데이터를 data 폴더에 업로드해준다.
2) DAGs 업로드
작성한 DAG 파일을 업로드 해 줄 차례이다. dags 폴더에 업로드해준다.
4. Airflow Web Server 접속 및 확인
Airflow Web Server로 이동하면 다음과 같이 배치단위로 실행되는 것을 확인할 수 있다.
BigQuery에 접속해서 테이블이 생성된 것을 확인하면 끝~~!
참고자료
네부캠 Product Serving 클라우드 서비스 실습
GCP Composer2 Guide
[Cloud Composer 개요 | Google Cloud](https://cloud.google.com/composer/docs/composer-2/composer-overview?hl=ko) |
Leave a comment