CSV 처리 중 각 프로세스를 1개의 실행 블록에 유지

CSV 처리 중 각 프로세스를 1개의 실행 블록에 유지

csv 파일을 처리하는 방법에 대해 몇 가지 질문을 했는데 처리량이 많은 것으로 나타났습니다.

매달 내 Google 드라이브 디렉토리에 여러 개의 TXT 파일이 수신됩니다. 이 정보를 처리하고 병합하여 Postgres 라이브러리에 로드해야 합니다.

수신된 파일의 레이아웃과 구조는 다음과 같습니다.

#A1401099999999              022021I                                   
00999999999   000000000099999999+000000000000000000-000000000000000000-   
00999999999   000000000099999999-000000000000000000-000000000000000000-  
00999999999   000000000099999999-000000000000000000-000000000000000000-
@036

AWK를 사용하여 파일 헤더를 3개의 열로 분할합니다.

$ cat tst.awk
BEGIN { OFS="," }
NR==1 {
    pfx = substr($0,8,7) OFS substr($0,30,6) OFS substr($0,36,1)
    next
}
{
    gsub(/[+-]/,"& ")
    $1 = pfx OFS $1
    print
}

결과

9999999,012021,I,0099999999,000000000099999999+,000000000000000000-,000000000000000000-
9999999,012021,I,0099999999,000000000099999999+,000000000000000000-,000000000000000000-
9999999,012021,I,0099999999,000000000099999999+,000000000000000000-,000000000000000000-
9999999,012021,I,0099999999,000000000099999999+,000000000000000000-,000000000000000000-
@036

다음 단계에서는 모든 파일의 통합을 다음 패턴으로 변경합니다.

cat -T *.txt > final.csv 


9999999,012021,I,0099999999,000000000099999999+,000000000000000000-,000000000000000000-
9999999,012021,I,0099999999,000000000099999999+,000000000000000000-,000000000000000000-
9999999,012021,I,0099999999,000000000099999999+,000000000000000000-,000000000000000000-
9999999,012021,S,0099999999,000000000099999999+,000000000000000000-,000000000000000000-
@036
9999999,022021,S,0099999999,000000000099999999+,000000000000000000-,000000000000000000-
9999999,022021,I,0099999999,000000000099999999+,000000000000000000-,000000000000000000-
9999999,022021,S,0099999999,000000000099999999+,000000000000000000-,000000000000000000-
9999999,022021,S,0099999999,000000000099999999+,000000000000000000-,000000000000000000-    
@042

베이스 플레이트를 제거했습니다.

sed -i '/@/d' final.csv

9999999,012021,I,0099999999,000000000099999999+,000000000000000000-,000000000000000000-
9999999,012021,I,0099999999,000000000099999999-,000000000000000000-,000000000000000000-
9999999,012021,I,0099999999,000000000099999999+,000000000000000000-,000000000000000000-
9999999,012021,S,0099999999,000000000099999999-,000000000000000000-,000000000000000000-
9999999,022021,S,0099999999,000000000099999999+,000000000000000000-,000000000000000000-
9999999,022021,I,0099999999,000000000099999999-,000000000000000000-,000000000000000000-
9999999,022021,S,0099999999,000000000099999999-,000000000000000000-,000000000000000000-
9999999,022021,S,0099999999,000000000099999999-,000000000000000000-,000000000000000000- 

필드 2의 형식을 날짜로 지정했습니다. "MM/YYYY"입니다. 이제 "yyyy-mm-01"입니다. dd의 기본값으로 01을 사용합니다.

awk -F, '{OFS=",";a=substr($2,1,2);b=substr($2,3,4);$2=b"-"a"-01";print $0}' final.csv

결과:

10013534,2021-01-01,I,0090922002,000000000009102629+,000000000000000000-,000000000000000000-,
10013534,2021-01-01,I,0091000002,000000000063288833+,000000000000000000-,000000000000000000-,
10013534,2021-01-01,I,0091100005,000000000063288833+,000000000000000000-,000000000000000000-,
10013534,2021-01-01,I,0091110002,000000000063288833+,000000000000000000-,000000000000000000-,
10013534,2021-01-01,I,0099999995,000000008017897139+,000000000000000000-,000000000000000000-,

이제 열 5에서 함수를 사용하려면 값이 음수일 때 위치를 "-"에서 열의 시작 부분으로 변경해야 합니다.

awk -F "," '{sign=substr($5,length($5),1);$5=substr($5,0,length($5)-1); if(sign =="-"){$5="-"$5}; print}' ./mycsv

    10013534,2021-01-01,I,0090922002,000000000009102629,000000000000000000-,000000000000000000-,
    10013534,2021-01-01,I,0091000002,-000000000063288833,000000000000000000-,000000000000000000-,
    10013534,2021-01-01,I,0091100005,-000000000063288833,000000000000000000-,000000000000000000-,
    10013534,2021-01-01,I,0091110002,000000000063288833,000000000000000000-,000000000000000000-,
    10013534,2021-01-01,I,0099999995,-000000008017897139,000000000000000000-,000000000000000000-,

이 모든 처리가 끝나면 데이터를 Postgres로 가져오고 싶습니다.

import pandas, csv

from io import StringIO
from sqlalchemy import create_engine

def psql_insert_copy(table, conn, keys, data_iter):
   dbapi_conn = conn.connection
   with dbapi_conn.cursor() as cur:
       s_buf = StringIO()
       writer = csv.writer(s_buf)
       writer.writerows(data_iter)
       s_buf.seek(0)
       columns = ', '.join('"{}"'.format(k) for k in keys)
       if table.schema:
           table_name = '{}.{}'.format(table.schema, table.name, columns)
       else:
           table_name = table.name
       sql = 'COPY {} ({}) FROM STDIN WITH CSV'.format(table_name, columns)
       cur.copy_expert(sql=sql, file=s_buf)

engine = create_engine('postgresql://xxxxx:xxx@xxxx:xxx/xxxx')

df = pandas.read_csv("final.csv")
df.to_sql('xxxxxxx', engine, schema='xxxxxxx', method=psql_insert_copy)

관련 정보