Professional Field

ETL : Airflow / DW(Snowflake) | AI : Python | BI : Tableau / Power BI

Engineer/Pipeline

[ETL] Airflow를 이용한 Naver(API)_to_Snowflake(DW)

K_CY 2024. 8. 1. 21:46

네이버 API에 있는 데이터를 Airflow를 이용해 데이터웨어하우스(dw)인 snowflake에 넣어보는 실습을 진행해보자.

 

1. 먼저 네이버 API 권한을 받는다.

사전 준비 : 네이버 API 어플리케이션 등록 네이버 API 사용 가이드
네이버 API를 사용하려면 네이버 개발자 센터에서 애플리케이션을 등록하고 클라이언트 아이디와 클라이언트 시크릿을 발급받아야 한다. (test 이기때문에 서비스 환경은 모두 http://localhost 로 해주었다.)

 

2. airflow 설치할 때 만든 airflow-docker/dags 폴더안에 naver_to_snow.py 를 만들어준다.airflow에는 Operator 로 파이프라인을 구성해주어야 한다. 

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
import requests
import snowflake.connector

def fetch_data_from_naver_api(client_id, client_secret, query, **kwargs):
    url = "https://openapi.naver.com/v1/search/blog.json"
    headers = {
        "X-Naver-Client-Id": client_id,
        "X-Naver-Client-Secret": client_secret,
    }
    params = {
        "query": query,
        "display": 10
    }
    response = requests.get(url, headers=headers, params=params)
    data = response.json()
    # 데이터를 XCom에 저장
    kwargs['ti'].xcom_push(key='naver_data', value=data['items'])

def insert_data_into_snowflake(user, password, account, warehouse, database, schema, table, **kwargs):
    # XCom에서 데이터 가져오기
    data = kwargs['ti'].xcom_pull(key='naver_data', task_ids='fetch_data_from_naver_api_task')
    
    conn = snowflake.connector.connect(
        user=user,
        password=password,
        account=account,
        warehouse=warehouse,
        database=database,
        schema=schema
    )
    cursor = conn.cursor()
    insert_query = f"INSERT INTO {table} (title, link, description, bloggername, bloggerlink, postdate) VALUES (%s, %s, %s, %s, %s, %s)"
    
    for item in data:
        cursor.execute(insert_query, (
            item['title'],
            item['link'],
            item['description'],
            item['bloggername'],
            item['bloggerlink'],
            item['postdate']
        ))
    
    cursor.close()
    conn.close()

default_args = {
    'owner': 'airflow',
    'start_date': days_ago(1),
    'retries': 1,
}

# DAG 틀 
dag = DAG(
    'naver_to_snowflake',
    default_args=default_args,
    description='네이버 API에서 데이터를 가져와 Snowflake에 로드',
    schedule_interval='@daily',
)

fetch_data_from_naver_api_task = PythonOperator(
    task_id='fetch_data_from_naver_api_task',
    python_callable=fetch_data_from_naver_api,
    op_kwargs={
        'client_id': 'your_client_id',  # 네이버 클라이언트 ID
        'client_secret': 'your_client_secret',  # 네이버 클라이언트 시크릿
        'query': '생일'  # 생일 관련 포스팅
    },
    provide_context=True,
    dag=dag,
)

insert_data_into_snowflake_task = PythonOperator(
    task_id='insert_data_into_snowflake_task',
    python_callable=insert_data_into_snowflake,
    op_kwargs={
        'user': '*****',  # Snowflake 사용자 이름
        'password': '*****',  # Snowflake 비밀번호
        'account': '*****',  # Snowflake 계정 이름
        'warehouse': '*****',  # 데이터 웨어하우스 이름
        'database': 'STAGING',  # 데이터베이스 이름
        'schema': 'INTERFACE',  # 스키마 이름
        'table': 'NAVER_TO_SNOW'  # 테이블 이름
    },
    provide_context=True,
    dag=dag,
)

# 작업 순서 설정
fetch_data_from_naver_api_task >> insert_data_into_snowflake_task

 

나는 API로부터 데이터를 긁어오는 작업과 SNOWFLAKE(DW)에 넣는 작업을 PythonOperator로 작업하였다.

조금 기다리다보면 Airflow 웹서버에 생성한 DAG가 올라온다.

 

3.  이제 airflow에서  Admin > Connections > + 에 들어가 네이버 API에 대한 커넥션을 생성해준다.

커넥션은 꼭 필요한 것은 아니다. 그러나 Airflow의 Connection 설정을 활용하면, API 호출에 필요한 인증 정보와 같은 중요한 데이터를 중앙에서 관리할 수 있고 보안성을 높일 수 있다. 이렇게 하면 코드 내에서 민감한 정보를 하드코딩하지 않고도 사용할 수 있다.

Airflow Connection 설정을 사용하는 이유

  1. 보안: API 키와 같은 민감한 정보를 코드에 하드코딩하지 않음으로써 보안성을 높일 수 있습니다.
  2. 중앙 관리: 여러 DAG에서 같은 API를 사용해야 할 때, Connection을 사용하면 이 정보를 중앙에서 쉽게 관리할 수 있습니다.
  3. 유지보수: API 키가 변경될 경우, Airflow UI에서 Connection을 업데이트하는 것만으로 모든 DAG에 반영할 수 있습니다.

 

4. 이제 만든 DAG를 TASK 해보자. 트리거 DAG를 눌러준다.

 

5. snowflake 에 insert 하는 과정에서 오류가 났다.

*** Could not read served logs: Request URL is missing an 'http://' or 'https://' protocol.

이 오류는 Airflow의 로그 파일 설정에서 URL에 프로토콜이 빠져 있을 때 발생한다. 이를 해결하기 위해 Airflow의 설정 파일(airflow.cfg)을 확인하고 수정해야한다. 나는 Docker안에 airflow를 설치하였으니 Docker 안에서 airflow.cfg 파일을 찾아준다.

Docker : airflow-docker-airflow-webserver-1

찾아보니 remote_base_log_folder에 해당 로그가 빠져있었다.

 

[logging]
base_log_folder = /opt/airflow/logs
remote_base_log_folder = s3://your-bucket/path/to/logs

 

 

 

5-1. 자세히 보니 네이버API에 접근할때도 문제가 생긴다. fetch_data_from_naver_api_task에 빨간불이 들어왔었다.

이것은 네이버에 개발자 센터에서 API에 접근할 때 네이버 로그인이 필요한 정보들을 끌고오려고 했기 때문에 추가로 받아야 하는 인증이 필요했다.

그래서 사용 API를 검색으로만 지정해주었더니 해결되었다.

 

6. DAG 완료.

 

7. SNOWFLAKE(DW) 테이블 적재 결과.

 

 

-추가 소스 SSL 보안 인증 무시 (TEST용)-

import requests

url = 'https://openapi.naver.com/v1/search/blog.json'
headers = {
    'X-Naver-Client-Id': 'YOUR_CLIENT_ID',
    'X-Naver-Client-Secret': 'YOUR_CLIENT_SECRET',
}
params = {
    'query': '생일',
    'display': 10,
}

response = requests.get(url, headers=headers, params=params, verify=False)
print(response.json())