Python數(shù)據(jù)處理心得--Pandas100秒處理一億行數(shù)據(jù)
1. 背景-為啥要用pandas
公司的日常運(yùn)營數(shù)據(jù)通過大數(shù)據(jù)平臺(HIVE SQL)通過匯總后,推送給業(yè)務(wù)部門進(jìn)行日常分析的數(shù)據(jù)仍然非常大。從數(shù)據(jù)量從PB&TB級降到了GB級,一般主要通過Mysql進(jìn)行存儲&聚合分析。
日或周的數(shù)據(jù),mysql處理還是可以的。到月數(shù)據(jù),超過10GB(1億行),處理起來就開始吃力,數(shù)據(jù)吞吐特別慢,內(nèi)存資源占用特別嚴(yán)重。
使用Pandas后,普通筆記本電腦可以很輕松地處理1億行的數(shù)據(jù),100秒內(nèi)就能完成計算, 計算實現(xiàn)的成本非常低。
2. Pandas介紹
很多方法可以解決這個問題,由于希望用單機(jī)版的開源軟件,正好python日常用的也比較多,于是使用pandas來解決這個問題。
pandas是numpy的基礎(chǔ)發(fā)展出來的, pandas的DataFrame數(shù)據(jù)結(jié)構(gòu)相當(dāng)于多個numpy series組合在一起。
為了更直觀地了解pandas,打算通過一個模擬數(shù)據(jù)的分析過程來介紹pandas的用法。
使用軟件的版本
anaconda3-4.1.1 (Python 3.5)
Jupiter notebook (4.2.1)
Pandas包 0.18.1 (anaconda自帶的)
3. 生產(chǎn)模擬數(shù)據(jù)
我們先模擬一個一億行的數(shù)據(jù),然后測試用pandas的代碼進(jìn)行匯總分析,并且記錄耗時&及內(nèi)存&CUP的大致占用情況。
基本需要把數(shù)據(jù)切成小塊處理,否則內(nèi)存占用會特別高,內(nèi)存不足,python無法運(yùn)行下去。
如果電腦有32GB內(nèi)存的,可以直接抽取1億行樣本數(shù)據(jù),不用使用循環(huán)進(jìn)行數(shù)據(jù)重復(fù),這樣的內(nèi)存消耗是25GB。
如果用下面的循環(huán),內(nèi)存占用只有幾百M(fèi)B。
############################################################################
import pandas as pd
import numpy as np
import csv
date= ['2017-11-01', '2017-11-02','2017-11-03','2017-11-04','2017-11-05','2017-11-06','2017-11-07']
#設(shè)置日期數(shù)據(jù),為后面的np.random.choice引用
area= ['華北', '華東', '華南','西南','華中','東北','西北']
order_type =[0, 1, 2, 3, 4 ,5 ,6 ,7 ,8, 9]
col1=np.random.choice(date, 1000000, p=[0.15, 0.15, 0.15, 0.15, 0.15, 0.15, 0.1])
#隨機(jī)抽樣100萬次,各個日常出現(xiàn)的概率是P。
col2=np.random.choice(area, 1000000, p=[0.2, 0.2, 0.2, 0.1, 0.1, 0.1, 0.1])
col3=np.random.choice(order_type, 1000000, p=[0.05, 0.2, 0.2, 0.1, 0.1, 0.1, 0.1, 0.05, 0.05, 0.05])
col4=np.random.choice(100, 1000000)
col5=np.random.choice(10000, 1000000)
df = pd.DataFrame({'date':col1, 'area':col2, 'order_type':col3, 'qty':col4, 'revenue':col5})
df=df.set_index('date')
#合并各個numpy生產(chǎn)的隨機(jī)數(shù)據(jù)成為Pandas的DataFrame
with open('E:\\mess_files\\sample_data.csv','w', newline='\n') as csvfile:
writer = csv.writer(csvfile)
#先寫入columns_name
writer.writerow(['date','area','order_type','qty','revenue'])
#為了減少內(nèi)存占用,沒有直接在上面生成1億行數(shù)據(jù),先生產(chǎn)100萬,然后循環(huán)100次。
for i in range(100):
i=i+1
df.to_csv ('E:\\mess_files\\sample_data.csv', encoding='gbk', header=False, mode='a')
print(i*1000000)
############################################################################
4. 數(shù)據(jù)分析代碼
涉及的功能:讀取數(shù)據(jù),增加計算字段,group by ,merge( left join), index (set & reset), 輸出數(shù)據(jù)(CSV & excel)。
############################################################################
import pandas as pd
import time
import csv
start = time.clock()
#開始計時
with open('E:\\mess_files\\pd_sum.csv','w', newline='\n') as csvfile:
writer = csv.writer(csvfile)
#先寫入columns_name
writer.writerow(['date','area','order_type','qty','revenue'])
#為匯總的輸出,建立一個CSV文件,并包含表頭字段明。
#分塊(每100萬行)進(jìn)行數(shù)據(jù)匯總, 并循環(huán)寫入csv中
reader = pd.read_csv('E:\\mess_files\\sample_data.csv', encoding='gbk',sep=',',iterator=True)
i=0
while True:
try:
start2 = time.clock()
#每次循環(huán)開始時間
# 從csv文件迭代讀取
df = reader.get_chunk(1000000)
mini_sum=df.groupby(['date','area','order_type']).sum()
#按date, area, order_type 進(jìn)行匯總
mini_sum.to_csv('E:\\mess_files\\pd_sum.csv',mode='a',header=False)
#匯總結(jié)果寫入CSV文件,'header=False' 避免重復(fù)寫入表頭。
# 計時
i=i+1
end2 = time.clock()
#每次循環(huán)結(jié)束時間
print('{} 秒: completed {} rows'.format(end2 - start2, i * 1000000))
except StopIteration:
print("Iteration is stopped.")
#循環(huán)結(jié)束退出
break
df=pd.read_csv('E:\\mess_files\\pd_sum.csv', encoding='gbk',sep=',')
df=df.groupby(['date','area','order_type']).sum()
df=df.reset_index()
#pandas匯總時,會根據(jù)groupby的字段建立multi_index, 需要重置index。
df['date']=pd.to_datetime(df['date'])
#將date列 設(shè)置為日期類型
df['avg']=df['revenue']/df['qty']
#增加一個計算字段 avg 平均客單價
df_sub=df[['date','area','qty']].groupby(['date','area']).sum().add_prefix('sum_')
#建立一個新DataFrame, 用于后面的left join 計算各個order_type的占比
df_merge=pd.merge(df, df_sub, how='outer', left_on=['date','area'], right_index=True)
#相當(dāng)于SQL的left join
df_merge['type_qty%']=df_merge['qty']/df_merge['sum_qty']
#增加計算字段
df_merge=df_merge.set_index('date')
output=pd.ExcelWriter('E:\\mess_files\\output_xls.xlsx')
df_merge.to_excel(output,'sheet1')
output.save()
#最終結(jié)果輸出到excel
end = time.clock()
#最終使用時間計時
print('final{} 秒'.format(end - start))
###############################################################################
使用了兩臺機(jī)器進(jìn)行數(shù)據(jù)運(yùn)算,DELL R720 2U SAS硬盤 96GB內(nèi)存的服務(wù)器,Thinkpad E450 SSD硬盤 i5 8G內(nèi)存的筆記本電腦。
運(yùn)行時,CUP占用率服務(wù)器5%,筆記本30%, 總內(nèi)存占用都是約6GB,耗時也非常接近, 每處理100萬行用時在 1秒種以內(nèi), 處理1億行數(shù)據(jù)的運(yùn)算還是很輕松的。
服務(wù)器循環(huán)每次計算100萬行用時 0.8秒, 總用時79.3秒。
########################
0.789916201370346 秒: completed 90000000 rows
0.7889745154019323 秒: completed 91000000 rows
0.7875460356832349 秒: completed 92000000 rows
0.7883160047623932 秒: completed 93000000 rows
0.7929830807664189 秒: completed 94000000 rows
0.7884885093438072 秒: completed 95000000 rows
0.8129294153000615 秒: completed 96000000 rows
0.8298620396579395 秒: completed 97000000 rows
0.787222294208533 秒: completed 98000000 rows
0.7879615432937328 秒: completed 99000000 rows
0.7891974322811279 秒: completed 100000000 rows
Iteration is stopped.
final79.22691993069884 秒
#########################
筆記本電腦循環(huán)每次計算100萬行用時 0.83, 總用時85.1秒。
#########################
0.817601222058812 秒: completed 92000000 rows
0.8092709856398557 秒: completed 93000000 rows
0.8277913177203118 秒: completed 94000000 rows
0.8203788228361191 秒: completed 95000000 rows
0.8211909342874009 秒: completed 96000000 rows
0.8238487924599838 秒: completed 97000000 rows
0.825806156394961 秒: completed 98000000 rows
0.8143844225134984 秒: completed 99000000 rows
0.8465947555305036 秒: completed 100000000 rows
Iteration is stopped.
final85.11640178604648 秒
#########################
本站僅提供存儲服務(wù),所有內(nèi)容均由用戶發(fā)布,如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請
點(diǎn)擊舉報。