5️⃣ 주차 강의 내용
- Airflow 시 혼란을 겪는 부분을 중점적으로 학습하였다.
Autocommit 설정
- autocommit = False일 때
- 내가 명시적으로 commit을 하기 전까지는 나에게만 그 변화가 보인다. 다른 사람들에게는 그 변화가 보이지 않는다.
- 예를 들어, 테이블을 삭제하고 조회하면 테이블이 없는 것으로 나온다. 하지만 다른 사람들에게는 여전히 테이블이 있는 것으로 조회가 됨
- PostgresHook은 현재 autocommit = False를 반환함
- autocommit = False일 때 주의할 점
- 쿼리문 실행 후 파이썬 try ~ exception와 commit;을 함께 사용하는 것이 best practice
- try ~ exception 시 ETL 운영상 관점에서 raise를 사용하는 것이 좋음
- autocommit = True 일 때 주의할 점
- try ~ exception ~ finally과 BEGIN; END; 로 트랜잭션 처리
- 테이블 삭제와 삽입을 실행할 때
- 일반적으로 autocommit = True를 사용
- try ~ exception ~ finally과 BEGIN; END; 로 트랜잭션 처리
- transaction 중간에 에러가 발생할 때 주의할 점
- exception 처리 시 ROLLBACK;을 수행해줘야함
Start_date 이해하기
- 실제 실행 시점은 start_date + interval이다.
- 데이터 기준 start_date으로 execution_date을 의미하는 것이 아니다.
- 보통 back fill을 유용하게 하기 위해 execution_date 변수를 생성한다.
- execution_date은 현재 시점보다 이전 시간이 나온다.
- execution_date은 현 시점 - duration으로 설정해야 back fill이 쉽다.
- full refresh할 경우 back fill이 아무 의미 없다.
- 현재 시점으로 execution_date을 설정하지 않아야 한다.
DELETE FROM과 TRUNCATE
- 공통점
- 테이블 내용을 삭제한다는 일은 정확히 똑같다.
- 차이점
- DELETE FROM
- WHERE절로 삭제할 행을 지정할 수 있다.
- TRUNCATE보다 속도가 약간 느리다.
- TRUNCATE
- 전체를 한 번에 삭제하며 바로 COMMIT한다.
- 확실한 상황에서만 사용해야 한다.
- 속도가 빠르다.
- DELETE FROM
Terminal 실행하기
- airflow tasks test my_first_dag print_hello {execution}
- airflow tasks run my_first_dag print_hello {execution}
- execution_date은 과거 시점이어야 동작함
- test 명령어로 돌리면 실제 반영되지 않음
- airflow 실행은 터미널로 하는 것이 디버깅하기 쉬움
과제
Workflow DAG 변경하기
- weather_foreacast 테이블 생성하기
- api key 발급 후, DAG Variable 등록
- 앞으로 updated_date으로 primary key uniqueness 보장
- 이유는 사용하는 api에서 execution_date을 지정할 수 없기 때문
-- weather_forecast 테이블 생성
CREATE TABLE ghgoo1798.weather_forecast(
date date primary key,
temp float,
min_temp float,
max_temp float,
updated_date timestamp default GETDATE()
);
- API를 활용해 데이터 수집하기
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable
from airflow.hoooks.postgres_hook import PostgresHook
from datetime import datetime
from datetime import timedelta
import requests
import logging
import psycopg2
import json
def get_Redshift_connection():
# autocommit is False by default
hook = PostgresHook(postgres_conn_id = 'redshift_dev_db')
return hook.get_conn().cursor()
def etl(**context):
api_key = Variable.get("open_weather_api_key")
# 서울의 위도/경도
lat = 37.5665
lon = 126.9780
# https://openweathermap.org/api/one-call-api
url = "https://api.openweathermap.org/data/2.5/onecall?lat=%s&lon=%s&appid=%s&units=metric" % (lat, lon, api_key)
response = requests.get(url)
data = json.loads(response.text)
"""
{'dt': 1622948400, 'sunrise': 1622923873, 'sunset': 1622976631, 'moonrise': 1622915520, 'moonset': 1622962620, 'moon_phase': 0.87, 'temp': {'day': 26.59, 'min': 15.67, 'max': 28.11, 'night': 22.68, 'eve': 26.29, 'morn': 15.67}, 'feels_like': {'day': 26.59, 'night': 22.2, 'eve': 26.29, 'morn': 15.36}, 'pressure': 1003, 'humidity': 30, 'dew_point': 7.56, 'wind_speed': 4.05, 'wind_deg': 250, 'wind_gust': 9.2, 'weather': [{'id': 802, 'main': 'Clouds', 'description': 'scattered clouds', 'icon': '03d'}], 'clouds': 44, 'pop': 0, 'uvi': 3}
"""
ret = []
for d in data["daily"]:
day = datetime.fromtimestamp(d["dt"]).strftime('%Y-%m-%d')
ret.append("('{}',{},{},{})".format(day, d["temp"]["day"], d["temp"]["min"], d["temp"]["max"]))
cur = get_Redshift_connection()
insert_sql = """DELETE FROM ghgoo1798.weather_forecast;INSERT INTO ghgoo1798.weather_forecast VALUES """ + ",".join(ret)
logging.info(insert_sql)
try:
cur.execute(insert_sql)
cur.execute("Commit;")
except Exception as e:
cur.execute("Rollback;")
dag = DAG(
dag_id = 'Weather_to_Redshift',
start_date = datetime(2021,9,3), # 날짜가 미래인 경우 실행이 안됨
schedule_interval = '0 2 * * *', # 적당히 조절
max_active_runs = 1,
catchup = False,
default_args = {
'retries': 1,
'retry_delay': timedelta(minutes=3),
}
)
etl = PythonOperator(
task_id = 'etl',
python_callable = etl,
dag = dag
)
- Primary Key 유지하기 위해 temp 테이블 생성
- temp 테이블에 수집 데이터가 저장되도록 DAG 변경
- updated_date가 최신인 행 1개씩만 추출해서 원본 테이블에 저장
- ROW_NUMBER, Partition by, Order by 활용
from airflow import DAG
from airflow.operators.python import PythonOperator
# from airflow.operators import PythonOperator
from airflow.models import Variable
from airflow.hooks.postgres_hook import PostgresHook
from datetime import datetime
from datetime import timedelta
import requests
import logging
import psycopg2
import json
def get_Redshift_connection():
# autocommit is False by default
hook = PostgresHook(postgres_conn_id='redshift_dev_db')
return hook.get_conn().cursor()
def etl(**context):
api_key = Variable.get("open_weather_api_key")
# 서울의 위도/경도
lat = 37.5665
lon = 126.9780
# https://openweathermap.org/api/one-call-api
url = "https://api.openweathermap.org/data/2.5/onecall?lat=%s&lon=%s&appid=%s&units=metric" % (lat, lon, api_key)
response = requests.get(url)
data = json.loads(response.text)
"""
{'dt': 1622948400, 'sunrise': 1622923873, 'sunset': 1622976631, 'moonrise': 1622915520, 'moonset': 1622962620, 'moon_phase': 0.87, 'temp': {'day': 26.59, 'min': 15.67, 'max': 28.11, 'night': 22.68, 'eve': 26.29, 'morn': 15.67}, 'feels_like': {'day': 26.59, 'night': 22.2, 'eve': 26.29, 'morn': 15.36}, 'pressure': 1003, 'humidity': 30, 'dew_point': 7.56, 'wind_speed': 4.05, 'wind_deg': 250, 'wind_gust': 9.2, 'weather': [{'id': 802, 'main': 'Clouds', 'description': 'scattered clouds', 'icon': '03d'}], 'clouds': 44, 'pop': 0, 'uvi': 3}
"""
ret = []
for d in data["daily"]:
day = datetime.fromtimestamp(d["dt"]).strftime('%Y-%m-%d')
# Primary Key 유지를 위한 updated_date도 추가해줘야함
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
ret.append("('{}',{},{},{},{})".format(day, d["temp"]["day"], d["temp"]["min"], d["temp"]["max"], now))
### 스키마 변경 + 수집할때마다 삭제하지 않고 계속 누적
cur = get_Redshift_connection()
insert_sql = """INSERT INTO ghgoo1798.temp_weather_forecast VALUES """ + ",".join(ret)
logging.info(insert_sql)
try:
cur.execute(insert_sql)
cur.execute("Commit;")
except Exception as e:
cur.execute("Rollback;")
## 원본 테이블에 저장하는 save 함수 생성
def save():
cur = get_Redshift_connection()
sql = """
BEGIN;
DELETE FROM ghgoo1798.weather_forecast;
INSERT INTO ghgoo1798.weather_forecast SELECT date, temp, min_temp, max_temp, updated_date
FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY date ORDER BY updated_date DESC) seq
FROM ghgoo1798.temp_weather_forecast
) WHERE seq=1;
END;
"""
cur.execute(sql)
dag = DAG(
dag_id = 'Weather_to_Redshift',
start_date = datetime(2021,9,3), # 날짜가 미래인 경우 실행이 안됨
schedule_interval = '0 2 * * *', # 적당히 조절
max_active_runs = 1,
catchup = False,
default_args = {
'retries': 1,
'retry_delay': timedelta(minutes=3),
}
)
etl = PythonOperator(
task_id = 'etl',
python_callable = etl,
dag = dag
)
## save task 생성
save = PythonOperator(
task_id = 'save',
python_callable = save,
dag = dag
)
etl >> save
- 피드백 반영
- PostgresHook 사용 시, autocommit이 False로 설정되므로 BEGIN; END;로 둘러싸지 않아도 된다.
- 고로 save의 경우 commit;이 실행되지 않아 며칠동안 동작 시 안정성을 보장할 수 없음
- 따라서 SQL 실행을 try/except로 넣고 commit; rollback 수행
- try/except 처리 시 exception이 발생했을 때, except로 처리하면 에러가 더 이상 전파되지 않음. ETL 운영 관점에서 에러가 난 걸 알 수가 없다.
- 따라서 try ~ exception 처리 후 뒤에 raise를 붙여 줄 것
- raise는 직접 에러를 발생시키는 명령어
from airflow import DAG from airflow.operators.python import PythonOperator # from airflow.operators import PythonOperator from airflow.models import Variable from airflow.hooks.postgres_hook import PostgresHook from datetime import datetime from datetime import timedelta import requests import logging import psycopg2 import json def get_Redshift_connection(): # autocommit is False by default hook = PostgresHook(postgres_conn_id='redshift_dev_db') return hook.get_conn().cursor() def etl(**context): api_key = Variable.get("open_weather_api_key") # 서울의 위도/경도 lat = 37.5665 lon = 126.9780 # https://openweathermap.org/api/one-call-api url = "https://api.openweathermap.org/data/2.5/onecall?lat=%s&lon=%s&appid=%s&units=metric" % (lat, lon, api_key) response = requests.get(url) data = json.loads(response.text) """ {'dt': 1622948400, 'sunrise': 1622923873, 'sunset': 1622976631, 'moonrise': 1622915520, 'moonset': 1622962620, 'moon_phase': 0.87, 'temp': {'day': 26.59, 'min': 15.67, 'max': 28.11, 'night': 22.68, 'eve': 26.29, 'morn': 15.67}, 'feels_like': {'day': 26.59, 'night': 22.2, 'eve': 26.29, 'morn': 15.36}, 'pressure': 1003, 'humidity': 30, 'dew_point': 7.56, 'wind_speed': 4.05, 'wind_deg': 250, 'wind_gust': 9.2, 'weather': [{'id': 802, 'main': 'Clouds', 'description': 'scattered clouds', 'icon': '03d'}], 'clouds': 44, 'pop': 0, 'uvi': 3} """ ret = [] for d in data["daily"]: day = datetime.fromtimestamp(d["dt"]).strftime('%Y-%m-%d') # Primary Key 유지를 위한 updated_date도 추가해줘야함 now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") ret.append("('{}',{},{},{},{})".format(day, d["temp"]["day"], d["temp"]["min"], d["temp"]["max"], now)) ### 스키마 변경 + 수집할때마다 삭제하지 않고 계속 누적 cur = get_Redshift_connection() insert_sql = """INSERT INTO ghgoo1798.temp_weather_forecast VALUES """ + ",".join(ret) logging.info(insert_sql) try: cur.execute(insert_sql) cur.execute("Commit;") except Exception as e: cur.execute("Rollback;") ## 원본 테이블에 저장하는 save 함수 생성 def save(): cur = get_Redshift_connection() sql = """ DELETE FROM ghgoo1798.weather_forecast; INSERT INTO ghgoo1798.weather_forecast SELECT date, temp, min_temp, max_temp, updated_date FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY date ORDER BY updated_date DESC) seq FROM ghgoo1798.temp_weather_forecast ) WHERE seq=1; """ try: cur.execute(sql) cur.execute("Commit;") except Exception as e: cur.execute("Rollback;") raise dag = DAG( dag_id = 'Weather_to_Redshift', start_date = datetime(2021,9,3), # 날짜가 미래인 경우 실행이 안됨 schedule_interval = '0 2 * * *', # 적당히 조절 max_active_runs = 1, catchup = False, default_args = { 'retries': 1, 'retry_delay': timedelta(minutes=3), } ) etl = PythonOperator( task_id = 'etl', python_callable = etl, dag = dag ) ## save task 생성 save = PythonOperator( task_id = 'save', python_callable = save, dag = dag ) etl >> save
- PostgresHook 사용 시, autocommit이 False로 설정되므로 BEGIN; END;로 둘러싸지 않아도 된다.
https://programmers.co.kr/learn/courses/12539
'데이터 엔지니어링' 카테고리의 다른 글
[7 Week] 프로그래머스 - 실리콘밸리에서 날아온 데이터 엔지니어링 스타터 키트 with Python (2) | 2021.09.20 |
---|---|
[6 Week] 프로그래머스 - 실리콘밸리에서 날아온 데이터 엔지니어링 스타터 키트 with Python (2) | 2021.09.18 |
[4 Week] 프로그래머스 - 실리콘밸리에서 날아온 데이터 엔지니어링 스타터 키트 with Python (0) | 2021.08.29 |
[3 Week] 프로그래머스 - 실리콘밸리에서 날아온 데이터 엔지니어링 스타터 키트 with Python (0) | 2021.08.29 |
[2 Week] 프로그래머스 - 실리콘밸리에서 날아온 데이터 엔지니어링 스타터 키트 with Python (0) | 2021.08.15 |