반응형

IT·디지털 자동화 블로그 · 실시간 데이터 파이프라인
왜 API-엑셀 자동 연동인가
대부분의 공공데이터는 REST API로 제공된다. 웹에서 CSV를 수동 다운로드하고 복붙 하는 작업은 누락·지연·버전 혼선을 낳는다. 파이썬으로 API를 직접 호출해 엑셀에 자동 누적 저장하면 데이터의 신선도가 높고, 정해진 포맷의 요약 보고서를 항상 같은 시간에 받아볼 수 있다.
환경 준비와 폴더 구조
pip install requests pandas openpyxl python-dateutil pyyaml
📁 public_api_excel/
├─ config.yaml # 엔드포인트, 파라미터, 스키마, 시트 이름 등
├─ run_collect.py # 메인 수집 스크립트
├─ utils.py # 재시도, 로깅 유틸
├─ data/
│ ├─ raw/ # 원본 누적
│ └─ reports/ # 요약/서식 적용 엑셀
└─ logs/ # 실행 로그
API 키는 코드에 하드코딩하지 말고 환경 변수나 별도 보안 저장소에 보관한다.
Windows PowerShell
setx PUBLIC_API_KEY "여기에_발급받은_API키"
API 설계: 키, 엔드포인트, 페이징, 쿼리 파라미터
공공데이터 포털의 다수 API는 페이지 단위로 데이터를 제공하며, 인증은 쿼리스트링 또는 헤더에 키를 포함해 수행한다. 응답 포맷은 JSON 또는 XML이 일반적이며, JSON을 권장한다. 페이징 규칙과 속도 제한을 먼저 파악하고, 수집 주기(예 5분, 1시간)를 설정한다.
예시 config.yaml
endpoint: "https://api.example.gov/v1/resources"
params:
region: "11" # 지역 코드 예시
page: 1
per_page: 100
schema:
id: "id"
timestamp: "obs_time"
excel:
file_prefix: "realtime_public_"
sheets:
raw: "RAW"
tidy: "TIDY"
summary: "SUMMARY"
파이썬 수집 코드(재시도·속도 제한·페이징)
# file: run_collect.py
import os, time, json, math, logging, datetime as dt
import requests, pandas as pd
from dateutil import parser as dparser
from pathlib import Path
import yaml
BASE = Path(__file__).resolve().parent
LOG = BASE / "logs" / "collect.log"
LOG.parent.mkdir(parents=True, exist_ok=True)
logging.basicConfig(filename=str(LOG), level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s")
def load_cfg():
with open(BASE/"config.yaml","r",encoding="utf-8") as f:
return yaml.safe_load(f)
def call_api(url, params, headers=None, max_attempts=3, backoff=2.0):
headers = headers or {}
for attempt in range(1, max_attempts+1):
try:
r = requests.get(url, params=params, headers=headers, timeout=20)
if r.status_code == 200:
return r.json()
if r.status_code in (429, 503): # 과도한 요청/일시 장애
time.sleep(backoff*attempt)
continue
r.raise_for_status()
except Exception as e:
logging.warning("attempt=%s error=%s", attempt, e)
time.sleep(backoff*attempt)
raise RuntimeError("API 호출 실패")
def normalize_rows(items):
# 필요한 필드만 추출하고 타입 변환
rows = []
for it in items:
row = {
"id": it.get("id"),
"name": it.get("name"),
"value": pd.to_numeric(it.get("value"), errors="coerce"),
"obs_time": dparser.parse(it.get("obs_time")).isoformat() if it.get("obs_time") else None,
"region": it.get("region")
}
rows.append(row)
return pd.DataFrame(rows)
def paginate_collect(cfg):
url = cfg["endpoint"]
key = os.getenv("PUBLIC_API_KEY")
params = cfg["params"].copy()
out_frames = []
headers = {"Accept":"application/json"}
# 첫 페이지 호출로 전체 페이지 수 추정
params.update({"api_key": key, "page": 1})
js = call_api(url, params, headers)
total = js.get("meta",{}).get("total", 0)
per_page = js.get("meta",{}).get("per_page", params.get("per_page",100))
pages = max(1, math.ceil(total / per_page))
df = normalize_rows(js.get("data", []))
out_frames.append(df)
# 남은 페이지 순회
for p in range(2, pages+1):
params["page"] = p
js = call_api(url, params, headers)
out_frames.append(normalize_rows(js.get("data", [])))
time.sleep(0.1) # 과도한 호출 방지
return pd.concat(out_frames, ignore_index=True)
def save_raw(df):
now = dt.datetime.now().strftime("%Y%m%d_%H%M%S")
raw_dir = BASE/"data"/"raw"
raw_dir.mkdir(parents=True, exist_ok=True)
path = raw_dir/f"raw_{now}.parquet"
df.to_parquet(path, index=False)
return path
if __name__=="__main__":
cfg = load_cfg()
df = paginate_collect(cfg)
raw_path = save_raw(df)
logging.info("수집 완료 rows=%s file=%s", len(df), raw_path)
print("수집 완료:", raw_path)
응답 스키마가 바뀌면 normalize_rows 함수만 수정하면 된다. 데이터 품질 점검 로직은 다음 절에서 추가한다.
엑셀 자동 저장·서식·요약 시트 생성
# file: write_excel.py
from pathlib import Path
import pandas as pd
from openpyxl import load_workbook
from openpyxl.styles import Alignment, Font, PatternFill, Border, Side
import datetime as dt, yaml, os
BASE = Path(__file__).resolve().parent
def load_cfg():
import yaml
with open(BASE/"config.yaml","r",encoding="utf-8") as f:
return yaml.safe_load(f)
def build_summary(df):
# 예시 요약: 지역별 최신값과 평균
latest = df.sort_values("obs_time").groupby("region").tail(1)
meanv = df.groupby("region")["value"].mean().reset_index().rename(columns={"value":"avg_value"})
sm = latest[["region","value"]].merge(meanv, on="region", how="left")
return sm.sort_values("value", ascending=False)
def write_to_excel(df, cfg):
out_dir = BASE/"data"/"reports"
out_dir.mkdir(parents=True, exist_ok=True)
stamp = dt.datetime.now().strftime("%Y%m%d_%H%M")
xlsx = out_dir/f"{cfg['excel']['file_prefix']}{stamp}.xlsx"
# TIDY: 타입·컬럼 정리
tidy = df.copy()
tidy["obs_time"] = pd.to_datetime(tidy["obs_time"])
tidy = tidy.sort_values(["region","obs_time"])
with pd.ExcelWriter(xlsx, engine="openpyxl") as w:
df.to_excel(w, index=False, sheet_name=cfg["excel"]["sheets"]["raw"])
tidy.to_excel(w, index=False, sheet_name=cfg["excel"]["sheets"]["tidy"])
build_summary(df).to_excel(w, index=False, sheet_name=cfg["excel"]["sheets"]["summary"])
# 간단 서식
wb = load_workbook(xlsx)
for name in cfg["excel"]["sheets"].values():
ws = wb[name]
ws.freeze_panes = "A2"
ws.auto_filter.ref = ws.dimensions
thin = Side(style="thin", color="CBD5E1")
border = Border(top=thin, bottom=thin, left=thin, right=thin)
head_fill = PatternFill("solid", fgColor="E2E8F0")
bold = Font(bold=True)
for c in ws[1]:
c.font = bold; c.fill = head_fill; c.border = border
c.alignment = Alignment(horizontal="center")
wb.save(xlsx)
return xlsx
if __name__=="__main__":
cfg = load_cfg()
# 가장 최근 raw 파일 읽기
raw_dir = BASE/"data"/"raw"
latest = sorted(raw_dir.glob("raw_*.parquet"))[-1]
df = pd.read_parquet(latest)
out = write_to_excel(df, cfg)
print("엑셀 저장 완료:", out)
보고서 시트는 RAW, TIDY, SUMMARY로 분리해 탐색성과 안정성을 높인다. 필요하면 차트(막대·라인)를 openpyxl 차트 객체로 추가할 수 있다.
중복 제거·증분 수집·로그 관리
# file: dedup_and_append.py
import pandas as pd
from pathlib import Path
import datetime as dt
BASE = Path(__file__).resolve().parent
def load_history():
hist = BASE/"data"/"history.parquet"
if hist.exists():
return pd.read_parquet(hist)
return pd.DataFrame(columns=["id","name","value","obs_time","region"])
def append_incremental(new_df):
hist = load_history()
merged = pd.concat([hist, new_df], ignore_index=True)
merged["obs_time"] = pd.to_datetime(merged["obs_time"])
merged.drop_duplicates(subset=["id","obs_time"], keep="last", inplace=True)
merged.sort_values(["id","obs_time"], inplace=True)
merged.to_parquet(BASE/"data"/"history.parquet", index=False)
return merged
if __name__=="__main__":
# 직전 수집 raw 파일 읽기
latest_raw = sorted((BASE/"data"/"raw").glob("raw_*.parquet"))[-1]
df = pd.read_parquet(latest_raw)
hist = append_incremental(df)
print("증분 병합 완료 rows=", len(hist))
id+obs_time 조합을 유니크 키로 사용해 중복을 방지한다. 운영 로그는 collect.log로 남겼으며, 실패 시 재시도 횟수와 에러 메시지를 기록해 추적성을 확보한다.
스케줄러 등록과 운영 팁
Windows 작업 스케줄러
- 작업 만들기에서 트리거를 5분마다 또는 매시 정각으로 설정
- 동작에 프로그램 시작 → python 경로와 run_collect.py 지정
- 후속 동작으로 dedup_and_append.py, write_excel.py를 연결
macOS·Linux cron
# 매 10분마다 수집 → 증분 → 엑셀 저장
*/10 * * * * /usr/bin/python3 /path/run_collect.py && /usr/bin/python3 /path/dedup_and_append.py && /usr/bin/python3 /path/write_excel.py >> /path/logs/cron.log 2>&1
속도 제한이 있는 API는 호출 간격을 넉넉히 두고, 페이징 요청 사이에 짧은 지연을 둔다. 응답 스키마 변경에 대비해 정상성 검사(필수 키 존재 여부)를 수집 단계에 포함하자.
체크리스트
- API 키를 환경 변수로 관리하고, 레포지토리에 노출되지 않도록 했다
- 페이징·속도 제한·에러 코드에 대한 처리 로직이 있다
- 수집 데이터 스키마를 정규화하고 타입을 일관되게 변환했다
- 증분 병합과 중복 제거 키(id+obs_time)를 정의했다
- 엑셀 보고서의 헤더·필터·고정창·숫자 포맷 등 서식을 표준화했다
- 스케줄러와 로그 파일, 실패 재시도 정책을 운영에 반영했다
다음 단계
엑셀 보고서에 자동 차트와 조건부 서식을 더해 시각화 품질을 높이고, PDF 변환과 이메일 발송 자동화를 결합하면 완전한 실시간 알림 체계를 구축할 수 있다. 또한 지역·기관별 여러 엔드포인트를 병렬 수집하는 멀티 파이프라인으로 확장해 보자.
반응형