네이버 API에 있는 데이터를 Airflow를 이용해 데이터웨어하우스(dw)인 snowflake에 넣어보는 실습을 진행해보자.
1. 먼저 네이버 API 권한을 받는다.
사전 준비 : 네이버 API 어플리케이션 등록 네이버 API 사용 가이드
네이버 API를 사용하려면 네이버 개발자 센터에서 애플리케이션을 등록하고 클라이언트 아이디와 클라이언트 시크릿을 발급받아야 한다. (test 이기때문에 서비스 환경은 모두 http://localhost 로 해주었다.)
2. airflow 설치할 때 만든 airflow-docker/dags 폴더안에 naver_to_snow.py 를 만들어준다.airflow에는 Operator 로 파이프라인을 구성해주어야 한다.
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
import requests
import snowflake.connector
def fetch_data_from_naver_api(client_id, client_secret, query, **kwargs):
url = "https://openapi.naver.com/v1/search/blog.json"
headers = {
"X-Naver-Client-Id": client_id,
"X-Naver-Client-Secret": client_secret,
}
params = {
"query": query,
"display": 10
}
response = requests.get(url, headers=headers, params=params)
data = response.json()
# 데이터를 XCom에 저장
kwargs['ti'].xcom_push(key='naver_data', value=data['items'])
def insert_data_into_snowflake(user, password, account, warehouse, database, schema, table, **kwargs):
# XCom에서 데이터 가져오기
data = kwargs['ti'].xcom_pull(key='naver_data', task_ids='fetch_data_from_naver_api_task')
conn = snowflake.connector.connect(
user=user,
password=password,
account=account,
warehouse=warehouse,
database=database,
schema=schema
)
cursor = conn.cursor()
insert_query = f"INSERT INTO {table} (title, link, description, bloggername, bloggerlink, postdate) VALUES (%s, %s, %s, %s, %s, %s)"
for item in data:
cursor.execute(insert_query, (
item['title'],
item['link'],
item['description'],
item['bloggername'],
item['bloggerlink'],
item['postdate']
))
cursor.close()
conn.close()
default_args = {
'owner': 'airflow',
'start_date': days_ago(1),
'retries': 1,
}
# DAG 틀
dag = DAG(
'naver_to_snowflake',
default_args=default_args,
description='네이버 API에서 데이터를 가져와 Snowflake에 로드',
schedule_interval='@daily',
)
fetch_data_from_naver_api_task = PythonOperator(
task_id='fetch_data_from_naver_api_task',
python_callable=fetch_data_from_naver_api,
op_kwargs={
'client_id': 'your_client_id', # 네이버 클라이언트 ID
'client_secret': 'your_client_secret', # 네이버 클라이언트 시크릿
'query': '생일' # 생일 관련 포스팅
},
provide_context=True,
dag=dag,
)
insert_data_into_snowflake_task = PythonOperator(
task_id='insert_data_into_snowflake_task',
python_callable=insert_data_into_snowflake,
op_kwargs={
'user': '*****', # Snowflake 사용자 이름
'password': '*****', # Snowflake 비밀번호
'account': '*****', # Snowflake 계정 이름
'warehouse': '*****', # 데이터 웨어하우스 이름
'database': 'STAGING', # 데이터베이스 이름
'schema': 'INTERFACE', # 스키마 이름
'table': 'NAVER_TO_SNOW' # 테이블 이름
},
provide_context=True,
dag=dag,
)
# 작업 순서 설정
fetch_data_from_naver_api_task >> insert_data_into_snowflake_task
나는 API로부터 데이터를 긁어오는 작업과 SNOWFLAKE(DW)에 넣는 작업을 PythonOperator로 작업하였다.
조금 기다리다보면 Airflow 웹서버에 생성한 DAG가 올라온다.
3. 이제 airflow에서 Admin > Connections > + 에 들어가 네이버 API에 대한 커넥션을 생성해준다.
커넥션은 꼭 필요한 것은 아니다. 그러나 Airflow의 Connection 설정을 활용하면, API 호출에 필요한 인증 정보와 같은 중요한 데이터를 중앙에서 관리할 수 있고 보안성을 높일 수 있다. 이렇게 하면 코드 내에서 민감한 정보를 하드코딩하지 않고도 사용할 수 있다.
Airflow Connection 설정을 사용하는 이유
- 보안: API 키와 같은 민감한 정보를 코드에 하드코딩하지 않음으로써 보안성을 높일 수 있습니다.
- 중앙 관리: 여러 DAG에서 같은 API를 사용해야 할 때, Connection을 사용하면 이 정보를 중앙에서 쉽게 관리할 수 있습니다.
- 유지보수: API 키가 변경될 경우, Airflow UI에서 Connection을 업데이트하는 것만으로 모든 DAG에 반영할 수 있습니다.
4. 이제 만든 DAG를 TASK 해보자. 트리거 DAG를 눌러준다.
5. snowflake 에 insert 하는 과정에서 오류가 났다.
*** Could not read served logs: Request URL is missing an 'http://' or 'https://' protocol.
이 오류는 Airflow의 로그 파일 설정에서 URL에 프로토콜이 빠져 있을 때 발생한다. 이를 해결하기 위해 Airflow의 설정 파일(airflow.cfg)을 확인하고 수정해야한다. 나는 Docker안에 airflow를 설치하였으니 Docker 안에서 airflow.cfg 파일을 찾아준다.
Docker : airflow-docker-airflow-webserver-1
찾아보니 remote_base_log_folder에 해당 로그가 빠져있었다.
[logging]
base_log_folder = /opt/airflow/logs
remote_base_log_folder = s3://your-bucket/path/to/logs
5-1. 자세히 보니 네이버API에 접근할때도 문제가 생긴다. fetch_data_from_naver_api_task에 빨간불이 들어왔었다.
이것은 네이버에 개발자 센터에서 API에 접근할 때 네이버 로그인이 필요한 정보들을 끌고오려고 했기 때문에 추가로 받아야 하는 인증이 필요했다.
그래서 사용 API를 검색으로만 지정해주었더니 해결되었다.
6. DAG 완료.
7. SNOWFLAKE(DW) 테이블 적재 결과.
-추가 소스 SSL 보안 인증 무시 (TEST용)-
import requests
url = 'https://openapi.naver.com/v1/search/blog.json'
headers = {
'X-Naver-Client-Id': 'YOUR_CLIENT_ID',
'X-Naver-Client-Secret': 'YOUR_CLIENT_SECRET',
}
params = {
'query': '생일',
'display': 10,
}
response = requests.get(url, headers=headers, params=params, verify=False)
print(response.json())
'Engineer > Pipeline' 카테고리의 다른 글
[Pipeline] 파이프라인 Apache Airflow 설치방법 (Docker) (0) | 2024.08.01 |
---|---|
[Pipeline] 파이프라인 Apache Airflow 설치방법 (0) | 2024.07.05 |