1. 程式人生 > 程式設計 >Python連線HDFS實現檔案上傳下載及Pandas轉換文字檔案到CSV操作

Python連線HDFS實現檔案上傳下載及Pandas轉換文字檔案到CSV操作

1. 目標

通過hadoop hive或spark等資料計算框架完成資料清洗後的資料在HDFS上

爬蟲和機器學習在Python中容易實現

在Linux環境下編寫Python沒有pyCharm便利

需要建立Python與HDFS的讀寫通道

2. 實現

安裝Python模組pyhdfs

版本:Python3.6,hadoop 2.9

讀檔案程式碼如下

from pyhdfs import HdfsClient
client=HdfsClient(hosts='ghym:50070')#hdfs地址
res=client.open('/sy.txt')#hdfs檔案路徑,根目錄/
for r in res:
 line=str(r,encoding='utf8')#open後是二進位制,str()轉換為字串並轉碼
 print(line)

寫檔案程式碼如下

from pyhdfs import HdfsClient
client=HdfsClient(hosts='ghym:50070',user_name='hadoop')#只有hadoop使用者擁有寫許可權
str='hello world'
client.create('/py.txt',str)#建立新檔案並寫入字串

上傳本地檔案到HDFS

from pyhdfs import HdfsClient
client = HdfsClient(hosts='ghym:50070',user_name='hadoop')
client.copy_from_local('d:/pydemo.txt','/pydemo')#本地檔案絕對路徑,HDFS目錄必須不存在

3. 讀取文字檔案寫入csv

Python安裝pandas模組

確認文字檔案的分隔符

# pyhdfs讀取文字檔案,分隔符為逗號,from pyhdfs import HdfsClient
client = HdfsClient(hosts='ghym:50070',user_name='hadoop')
inputfile=client.open('/int.txt')
# pandas呼叫讀取方法read_table
import pandas as pd
df=pd.read_table(inputfile,encoding='gbk',sep=',')#引數為原始檔,編碼,分隔符
# 資料集to_csv方法轉換為csv
df.to_csv('demo.csv',index=None)#引數為目標檔案,是否要索引

補充知識:記 讀取hdfs 轉 pandas 再經由pandas轉為csv的一個坑

工作流程是這樣的:

讀取 hdfs 的 csv 檔案,採用的是 hdfs 客戶端提供的 read 方法,該方法返回一個生成器。

將讀取到的資料按 逗號 處理,變為一個二維陣列。

將二維陣列傳給 pandas,生成 df。

經若干處理後,將 df 轉為 csv 檔案並寫入hdfs。

問題是這樣的:

正常的資料:

ZERO,MEAN,STD,CV,INC,OPP,CS,IS_OUTNET

0,9.233,2.445,0.265,1.202,241,1,0

0,8.667,1.882,0.217,1.049,179,0

三行資料,正常走流程,沒有任何問題。

異常資料:

ZERO,IS_OUTNET,probability,prediction

0,'[0.9653901649086855,0.03460983509131456]',0.0

0,0.0

在每一行中都會有一個數組類似的資料,有一對引號包起來,中間存在逗號,不可以拆分。

為此,我的做法如下:

匹配逗號是被成對引號包圍的字串。

將匹配到的字串中的逗號替換為特定字元。

將替換後的新字串替換回原字串。

在將原字串中的特定字串替換為逗號。

本來這樣做沒有什麼問題,但是在經由pandas轉為csv的時候,發現原來帶引號的字串變為了前後各帶三個引號。

源資料:

Python連線HDFS實現檔案上傳下載及Pandas轉換文字檔案到CSV操作

處理後的資料:

Python連線HDFS實現檔案上傳下載及Pandas轉換文字檔案到CSV操作

方法如下:

Python連線HDFS實現檔案上傳下載及Pandas轉換文字檔案到CSV操作

仔細研究對比了下資料,發現數據裡的引號其實只是在純文字檔案中用來標識其為字串,並不應該存在於實際資料中。

Python連線HDFS實現檔案上傳下載及Pandas轉換文字檔案到CSV操作

而我每次匹配後都是原封不動替換回去,譬如:

源資料:

"[0.9653901649086855,0.03460983509131456]"

匹配替換後:

"[0.9653901649086855${dot}0.03460983509131456]"

這樣傳給pandas,它就會認為這個資料是帶引號的,在重新轉為csv的時候,就會進行轉義等操作,導致多出很多引號。

所以解決辦法就是在替換之前,將匹配時遇到的引號也去掉:

PATTERN = '(?<=(?P<quote>[\'\"]))([^,]+,[^,]+)+?(?=(?P=quote))'

中間 ([^,]+)+? 要用+?,因為必須確定是有這樣的組合才可以,並且非貪婪模式,故不可 ? 或者 *?

Python連線HDFS實現檔案上傳下載及Pandas轉換文字檔案到CSV操作

(ps:為了方便後面引用前面的匹配,我在環視匹配中建立了一個組)

再來個整體效果:

Python連線HDFS實現檔案上傳下載及Pandas轉換文字檔案到CSV操作

為了說明效果,引用pandas的自帶讀取csv方法:

Python連線HDFS實現檔案上傳下載及Pandas轉換文字檔案到CSV操作

可以看到pandas讀取出的該位置資料也是字串,引號正是作為一個字串宣告而存在。

再次修改正則:

def split_by_dot_escape_quote(string):
  """
  按逗號分隔字串,若其中有引號,將引號內容視為整體
  """
  # 匹配引號中的內容,非貪婪,採用正向肯定環視,
  # 當左引號(無論單雙引)被匹配到,放入組quote,
  # 中間的內容任意,但是要用+?,非貪婪,且至少有一次匹配到字元,
  # 若*?,則匹配0次也可,並不會匹配任意字元(環視只匹配位置不匹配字元),
  # 由於在任意字元後面又限定了前面匹配到的quote,故只會匹配到",
  # +?則會限定前面必有字元被匹配,故"",或引號中任意值都可匹配到
  pattern = re.compile('(?=(?P<quote>[\'\"])).+?(?P=quote)')
  rs = re.finditer(pattern,string)
  for data in rs:
    # 匹配到的字串
    old_str = data.group()
    # 將匹配到的字串中的逗號替換為特定字元,
    # 以便還原到原字串進行替換
    new_str = old_str.replace(',','${dot}')
    # 由於匹配到的引號僅為字串申明,並不具有實際意義,
    # 需要把匹配時遇到的引號都去掉,只替換掉當前匹配組的引號
    new_str = re.sub(data.group('quote'),'',new_str)
    string = string.replace(old_str,new_str)
  sps = string.split(',')
  return map(lambda x: x.replace('${dot}','),sps)
 
 
s = '"2011,603","3510006998","F","5","0",""'
print(list(split_by_dot_escape_quote(s)))

執行結果如下:

Python連線HDFS實現檔案上傳下載及Pandas轉換文字檔案到CSV操作

之前想的正則有些複雜,反而偏離了本意,還是對正則的認識不夠深。

以上這篇Python連線HDFS實現檔案上傳下載及Pandas轉換文字檔案到CSV操作就是小編分享給大家的全部內容了,希望能給大家一個參考,也希望大家多多支援我們。