1. 자신의 컴퓨터에 설치된  PostgreSQL DB에 접속하는 'engine' 만들기.

from sqlalchemy import create_engine    # DB 접속 엔진을 만들어준다.

# PostgreSQL 데이터베이스 접속 엔진 생성.
local_postgresql_url = 'postgresql://postgres:postgres@localhost:812/postgres'    # localhost의 812 포트를 Parallels에 설치된 PostgreSQL 5432 포트로 포트포워딩.
engine_postgresql = create_engine(local_postgresql_url)

 

1.2 A, B DB에서 불러와 Pandas로 하나의 데이터프레임으로 만들기.

postgre data : ST0001 실적을 누적하고 있다.
mariadb data : ST0002 실적을 누적하고 있다.
파이썬에서 이 2개의 실적을 변수로 받은 후 병합해라.

pdata 에 ST0001을 넣는다.
mdata 에 ST0002를 넣는다.

hint )
pdata, mdata로 불러온 경우 다음과 같이 2개의 데이터를 병합 가능하다.
pd.concat([pdata, mdata], axis = 0) # pdata와 mdata를 위아래로 붙이겠다.

import pandas as pd
import psycopg2                          # PostgreSQL을 사용 가능하게 해준다.
import pymysql                           # MariaDB를 사용 가능하게 해준다.
from sqlalchemy import create_engine     # DB 접속 엔진을 만들어준다.

# AWS PostgreSQL 데이터베이스 접속 엔진 생성.
aws_postgresql_url = 'postgresql://ID:PW@HOST:PORT/DB'
engine_postgresql = create_engine(aws_postgresql_url)

# AWS MariaDB 데이터베이스 접속 엔진 생성.
aws_mariadb_url = 'mysql+pymysql://ID:PW@HOST:PORT/DB'
engine_mariadb = create_engine(aws_mariadb_url)

# PostgreSQL에서 st01 불러오기.
db_st01 = "kopo_product_volume_st01"
pdata = pd.read_sql_table(table_name=db_st01, con=engine_postgresql)
pdata.head(3)

# MariaDB에서 st02 불러오기.
db_st02 = "kopo_product_volume_st02"
mdata = pd.read_sql_table(table_name=db_st02, con=engine_mariadb)
mdata.head(3)

# 불러온 2개의 테이블을 concat으로 합치기.
sumData = pd.concat([pdata, mdata], axis = 0)
sumData

# 로컬에 csv 파일로 저장하기.
sumData.to_csv("./sumData.csv", index = False)

Query (쿼리)를 이용한 방법

import pandas as pd
import psycopg2                          # PostgreSQL을 사용 가능하게 해준다.
import pymysql                           # MariaDB를 사용 가능하게 해준다.
from sqlalchemy import create_engine     # DB 접속 엔진을 만들어준다.

# AWS PostgreSQL 데이터베이스 접속 엔진 생성.
aws_postgresql_url = 'postgresql://ID:PW@HOST:PORT/DB'
engine_postgresql = create_engine(aws_postgresql_url)

# AWS MariaDB 데이터베이스 접속 엔진 생성.
aws_mariadb_url = 'mysql+pymysql://ID:PW@HOST:PORT/DB'
engine_mariadb = create_engine(aws_mariadb_url)

# PostgreSQL에서 st01 불러오기.
pg_query = 'select * from kopo_product_volume_st01'
pdata = pd.read_sql_query(pg_query, con=engine_postgresql)
pdata.head(3)

# MariaDB에서 st02 불러오기.
maria_query = 'select * from kopo_product_volume_st01'
mdata = pd.read_sql_query(maria_query con=engine_mariadb)
mdata.head(3)

# 불러온 2개의 테이블을 concat으로 합치기.
sumData = pd.concat([pdata, mdata], axis = 0)
sumData

# 로컬에 csv 파일로 저장하기.
sumData.to_csv("./sumData.csv", index = False)

 

 

2. 이번에는 자신의 PC에서 csv 파일을 불러와 engine을 생성하고 local DB 서버에 내보내기까지 한 번에 해봅니다.

import psycopg2
import pandas as pd
from sqlalchemy import create_engine 

# csv 데이터 로딩 후 컬럼 소문자로 변환
selloutData = pd.read_csv("../exampleCode/dataset/kopo_product_volume.csv")
selloutData.columns = ["regionid","productgroup","yearweek","volume"]

print(selloutData.head())    # 디버깅용.


결과 :

  regionid productgroup  yearweek  volume
0      A01       ST0001    201415  810144
1      A01       ST0002    201415  128999
2      A01       ST0001    201418  671464
3      A01       ST0002    201418  134467
4      A01       ST0001    201413  470040
# 데이터베이스 접속 엔진 생성
local_postgresql_url = create_engine('postgresql://postgres:postgres@127.0.0.1:812/postgres')    # localhost의 812 포트를 Parallels에 설치된 PostgreSQL 5432 포트로 포트포워딩.

# 데이터 저장
resultname='kopo_product_volume'
selloutData.to_sql(name=resultname, con=local_postgresql_url, index=False, if_exists='replace')    # name, con은 필수 요소라 생략하고 이렇게 써도 된다. (단, 가독성을 위해 넣자.) (.to_sql(resultname, p_engine, index = False, if_exists='replace'))

결과 확인 :

왜 소문자로 바꿀까?

DataBase마다 특징이 있다. Oracle은 대문자로 처리를 하고, PostgreSQL은 소문자로 처리를 하는 등의 특징. PostgreSQL에 보낼 것이기 때문에 불러온 데이터의 컬럼헤더를 소문자로 바꾸고, 저장하는 테이블 이름도 소문자로 바꾼다.

 

2.2 위 예제를 Pandas(판다스) 성능 향상 도구 d6tstack를 활용해봅시다.

import psycopg2
import pandas as pd
from sqlalchemy import create_engine 
import d6tstack

# DB 커넥션 열기
purl = 'postgresql+psycopg2://postgres:postgres@127.0.0.1:812/postgres' 
engine = create_engine(purl) 

# DB 테이블을 읽어 Data Frame 변수에 저장하기
selloutData = pd.read_sql_query('SELECT * FROM kopo_product_volume', engine) 

selloutData.head()    #디버깅용.

# 컬럼해더 재정의
selloutData.columns = ['regionid','pg','yearweek','volume']

# 데이터 저장
resultname='pgresult'
d6tstack.utils.pd_to_psql(df=selloutData, uri=purl, table_name=resultname, if_exists='replace')

ABC.to_sql과는 모양이 조금 다릅니다.

 

ABC.to_sql(name = "Hello", con = engine, index = False) 가 기본 구조인 반면
: ABC 데이터프레임을 engine을 이용해 DB에 "Hello"라는 테이블 이름으로 저장한다.
 

d6tstack.utils.pd_topsql(df = ABC, uri = "engine 만들 때 사용하는 URL", table_name = "Hello") 이 기본 구조입니다.
: ABC 데이터프레임을 "engine 만들 때 사용하는 URL"을 이용해 "Hello"라는 테이블 이름으로 저장한다.

 

결과 확인 :

 

2.3 위 예제를 활용해봅시다. A DB에서 불러와 B DB로 내보내는걸 해봅니다.
여기서 A DB는 MySQL, B DB는 PostgreSQL로 하겠습니다.

import pandas as pd
import psycopg2                          # PostgreSQL을 사용 가능하게 해준다.
import pymysql                           # MariaDB를 사용 가능하게 해준다.
from sqlalchemy import create_engine     # DB 접속 엔진을 만들어준다.
 
# DB 커넥션 열기 (엔진이 2개가 필요하다!!)
pg_my_engine = create_engine('postgresql://postgres:postgres@127.0.0.1:812/postgres') 
mysql_com_engine = create_engine('mysql+pymysql://root:mariadb@127.0.0.1:3306/mysql')

# DB 테이블을 읽어 Data Frame 변수에 저장하기
selloutData = pd.read_sql_query('SELECT * FROM kopo_product_volume', mysql_com_engine)    # MariaDB에서 불러오기.

selloutData.head()    # 디버깅용.

resultname = "pg_com_engine_out"
selloutData.to_sql(resultname, pg_my_engine, if_exists='replace')    # PostgreSQL에 저장하기.

여기서 중요한 점은 엔진을 2개를 만들어야 한다는 것입니다! 서로 다른 종류의 DB라서 그럴까요? 아니죠. 서로 다른 서버의 DB라서 그렇습니다. 만약 A라는 DB가 PostgreSQL이고, B라는 DB 역시 PostgreSQL이어도 A와 B는 서로 다른 서버라서 PostgreSQL 엔진을 2개 만들어줘야합니다.

 

이것을 위에서 했던 것처럼 PostgreSQL의 DB 특징인 소문자에 맞춰, 혹시 모를 컬럼헤더의 대문자, 테이블명의 대문자를 소문자로 처리하는 로직까지 구현해봅니다.

완성본

# PANDAS 패키지 불러오기
import pandas as pd 
import pymysql 
from sqlalchemy import create_engine 
 
# DB 커넥션 열기 (엔진이 2개가 필요하다!!)
mysqlUrl = 'mysql+pymysql://root:mariadb@127.0.0.1:3306/mysql'
postgresqlUrl = 'postgresql://postgres:postgres@127.0.0.1:812/postgres'
mysql_com_engine = create_engine(mysqlUrl)
pg_my_engine = create_engine(postgresqlUrl) 

# DB 테이블을 읽어 Data Frame 변수에 저장하기
selloutData = pd.read_sql_query('SELECT * FROM kopo_product_volume', mysql_com_engine)    # MariaDB에서 불러오기.

# 컬럼헤더 재정의 (PostgreSQL에 맞게 소문자 처리)
selloutData.columns = selloutData.columns.str.lower()
selloutData.head()    # 디버깅용.

# 데이터 저장하기.
resultname = "pg_com_engine_out"
selloutData.to_sql(resultname.lower(),
                   pg_my_engine,
                   if_exists='replace',
                   index=False)

 

 

 

 

번외. Oracle DB 불러오기.

import pandas as pd
import cx_Oracle
from sqlalchemy import create_engine 
 
# DB 커넥션 열기
engine = create_engine('oracle+cx_oracle://KOPO:KOPO@127.0.0.1:811/xe')    # localhost의 811 포트를 Parallels에 설치된 Oracle 1521 포트로 포트포워딩.

# DB 테이블을 읽어 Data Frame 변수에 저장하기
selloutData = pd.read_sql_query('SELECT * FROM kopo_product_volume', engine) 

# 컬럼해더 재정의
selloutData.columns = ['REGIONID','PRODUCTGROUP','YEARWEEK','VOLUME']

# 데이터 VIEW
selloutData.head()

 

더보기

cf. pd.read_sql_table과 pd.read_sql_query 차이

read_sql_table

Signature:
pd.read_sql_table(
    table_name,
    con,
    schema=None,
    index_col=None,
    coerce_float=True,
    parse_dates=None,
    columns=None,
    chunksize=None,
)
Docstring:
Read SQL database table into a DataFrame.


read_sql_query

Signature:
pd.read_sql_query(
    sql,
    con,
    index_col=None,
    coerce_float=True,
    params=None,
    parse_dates=None,
    chunksize=None,
)
Docstring:
Read SQL query into a DataFrame.

Returns a DataFrame corresponding to the result set of the query

 

Pandas(판다스)를 이용한 DB SQL 다루는 방법은 다음 레퍼런스를 참고하면 좋습니다.

pandas.read_sql_query — pandas 1.2.0 documentation (pydata.org)

 

pandas.read_sql_query — pandas 1.2.0 documentation

Dict of {column_name: format string} where format string is strftime compatible in case of parsing string times, or is one of (D, s, ns, ms, us) in case of parsing integer timestamps.

pandas.pydata.org

 

+ Recent posts