scala和java解壓zip的檔案,並上傳到hdfs伺服器
阿新 • • 發佈:2019-01-26
之前一直使用的事java開發的,後來學習hadoop,spark以後,需要做一個解壓檔案到伺服器的例子,由於學習scala時間不長,故先用java程式碼寫出來,在改成scala的程式碼,過程中出現了很多問題,我將會在最後說明,請大家注意不要少了包。
環境宣告:windows8.1,eclipse,idea14,hadoop2.6,scala2.11,請自行配置好hadoop的環境。
java程式碼如下:
<span style="font-size:12px;">public class TestFile { public static void main(String[] args) { String zipDir = "G:/zip"; File zipFile = new File(zipDir); File[] zipFiles = zipFile.listFiles(); for(File zip:zipFiles) { if(zip.getName().endsWith(".zip")) { System.out.println("name is "+zip.getName()+" , path = "+zip.getAbsolutePath()); //解壓zip壓縮檔案 boolean result = unZipFile(zip,zipDir); if(result) { String filedir = zip.getName().substring(0, zip.getName().indexOf(".")); String localDir = zipDir+filedir; System.out.println("檔案f.getName()解壓成功 filedir = "+filedir); //上傳檔案到HDFS String cloudDir = "hdfs://10.132.10.235:9000/zcd/"; upload(localDir,cloudDir); //刪除本地目錄 File fileDir = new File(localDir); deleteFile(fileDir); fileDir.delete(); zip.delete(); } else { System.out.println("檔案f.getName()解壓失敗"); } } } } private static boolean unZipFile(File zipFile, String descDir) { boolean flag = true; try { File pathFile = new File(descDir); if(!pathFile.exists()){ pathFile.mkdirs(); } ZipFile zip = new ZipFile(zipFile); for(Enumeration entries = zip.getEntries();entries.hasMoreElements();){ ZipEntry entry = (ZipEntry)entries.nextElement(); String zipEntryName = entry.getName(); InputStream in = zip.getInputStream(entry); String outPath = (descDir+zipEntryName).replaceAll("\\*", "/");; //判斷路徑是否存在,不存在則建立檔案路徑 File file = new File(outPath.substring(0, outPath.lastIndexOf('/'))); if(!file.exists()){ file.mkdirs(); } //判斷檔案全路徑是否為資料夾,如果是上面已經上傳,不需要解壓 if(new File(outPath).isDirectory()){ continue; } //輸出檔案路徑資訊 System.out.println(outPath); OutputStream out = new FileOutputStream(outPath); byte[] buf1 = new byte[1024]; int len; while((len=in.read(buf1))>0){ out.write(buf1,0,len); } in.close(); out.close(); } } catch (ZipException e) { flag = false; e.printStackTrace(); } catch (FileNotFoundException e) { flag = false; e.printStackTrace(); } catch (IOException e) { flag = false; e.printStackTrace(); } System.out.println("******************解壓完畢********************"); return flag; } private static void upload(String localDir,String cloudDir) { try { System.out.println("hadoop上傳檔案開始..."); // 獲取一個conf物件 Configuration conf = new Configuration(); File fileDir = new File(localDir); File[] files = fileDir.listFiles(); InputStream in = null; FileSystem fs = null; OutputStream out = null; for(File file:files) { // 本地檔案存取的位置 String LOCAL_SRC = file.getAbsolutePath(); // 存放到雲端HDFS的位置 String CLOUD_DEST = cloudDir+file.getName(); in = new BufferedInputStream(new FileInputStream(LOCAL_SRC)); // 檔案系統 fs = FileSystem.get(URI.create(CLOUD_DEST), conf); // 輸出流 out = fs.create(new Path(CLOUD_DEST)); // 連線兩個流,形成通道,使輸入流向輸出流傳輸資料 IOUtils.copyBytes(in, out, 1024, true); } in.close(); fs.close(); out.close(); System.out.println("hadoop上傳檔案結束..."); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IllegalArgumentException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } private static void deleteFile(File file) { if(file.exists()) { if(file.isFile()) { file.delete(); } else if(file.isDirectory()) { File files[] = file.listFiles(); for(int i=0;i<files.length;i++) { deleteFile(files[i]); } } } } }</span>
scala程式碼 如下:
object zip { def main(args: Array[String]) { val zipDir = "G:/zip" val zipFile = new File(zipDir) val zipFiles = zipFile.listFiles() for(zip <- zipFiles) { if(zip.getName.endsWith(".zip")) { println("name is "+zip.getName+" , path = "+zip.getAbsolutePath) //解壓zip壓縮檔案val result = unZipFile(zip,zipDir) if(result) { val filedir = zip.getName.substring(0, zip.getName.indexOf(".")) val localDir = zipDir+filedir println("檔案f.getName()解壓成功 filedir = = "+filedir) //上傳檔案到HDFS val cloudDir = "hdfs://10.132.10.235:9000/zcd/" upload(localDir,cloudDir); //刪除本地目錄 val fileDir = new File(localDir) deleteFile(fileDir) fileDir.delete() zip.delete() } else{ println("檔案f.getName()解壓失敗") } } } } /** * 解壓 * @param zipFile * @param descDir * @return */ def unZipFile(zipFile:File , descDir:String ):Boolean ={ var flag = true try { val pathFile:File = new File(descDir) if (!pathFile.exists) { pathFile.mkdirs } val zip = new ZipFile(zipFile) val entries = zip.getEntries while (entries.hasMoreElements){ // val entry = entries.nextElement.asInstanceOf[ZipFile] val entry = entries.nextElement val zipEntryName = entry.getName val in = zip.getInputStream(entry) val outPath = (descDir + zipEntryName).replaceAll("\\*", "/") //判斷路徑是否存在,不存在則建立檔案路徑 val file = new File(outPath.substring(0, outPath.lastIndexOf('/'))) if (!file.exists) { file.mkdirs } //判斷檔案全路徑是否為資料夾,如果是上面已經上傳,不需要解壓 if (!new File(outPath).isDirectory()) { //輸出檔案路徑資訊 println(outPath) val out = new FileOutputStream(outPath) //val writer = new PrintWriter(new File(outPath)) val buf1 = Array[Byte]() while((in.read(buf1))>0){ in.read out.write(buf1) } // out.close } in.close } }catch{ case e: ZipException => flag = false case e: FileNotFoundException => flag = false case e: IOException => flag = false } println("******************解壓完畢********************") flag } /** * 上傳 * @param localDir * @param cloudDir */ def upload(localDir:String,cloudDir:String): Unit ={ try { println("hadoop上傳檔案開始..."); // 獲取一個conf物件 val conf = new Configuration(); val fileDir = new File(localDir); val files = fileDir.listFiles(); var in:InputStream = null var fs:FileSystem=null var out:OutputStream=null for(file <- files){ // 本地檔案存取的位置 val LOCAL_SRC = file.getAbsolutePath // 存放到雲端HDFS的位置 val CLOUD_DEST = cloudDir+file.getName in = new BufferedInputStream(new FileInputStream(LOCAL_SRC)) // 檔案系統 fs = FileSystem.get(URI.create(CLOUD_DEST), conf) // 輸出流 out = fs.create(new Path(CLOUD_DEST)) // 連線兩個流,形成通道,使輸入流向輸出流傳輸資料 IOUtils.copyBytes(in, out, 1024, true) } in.close fs.close out.close println("hadoop上傳檔案結束..."); } catch { case e: FileNotFoundException => e.printStackTrace case e: IllegalArgumentException => e.printStackTrace case e: IOException => e.printStackTrace } } /** * 刪除 * @param file */ def deleteFile(file:File ):Unit={ if(file.exists()){ if(file.isFile()){ file.delete() }else if(file.isDirectory()){ val files = file.listFiles() for(file <- files){ deleteFile(file) } } } } }
在這裡主要說scala程式碼編寫過程中出現的問題:
scala可以自動解析物件的型別,所以不需要我們自己宣告物件的型別,一般都用val來宣告,除非是一定要改變的值,就用var,必須初始化。
在scala中一開始這句話是報錯的,提示不能解析,也就是找不到entries找不到nextElement 這個方法。
val entry = entries.nextElement我們需要做如下的裝換:
val entry = entries.nextElement.asInstanceOf[ZipFile]
上面這個原因是因為我使用了一個ant1.8.jar的jar包,最後發現是這個包的原因,在scala下執行還有警告。用ant.jar替換ant1.8.jar包,就可以了。
在宣告物件的時候,物件是必須初始化的,如果可以為null,但是有時候你必須宣告它的型別,可以如下宣告,加上它的型別即可。
var in:InputStream = null
往hdfs寫檔案時可以用方法即可
// 本地檔案存取的位置 val LOCAL_SRC = file.getAbsolutePath // 存放到雲端HDFS的位置 val CLOUD_DEST = cloudDir+file.getName in = new BufferedInputStream(new FileInputStream(LOCAL_SRC)) // 檔案系統 fs = FileSystem.get(URI.create(CLOUD_DEST), conf) // 輸出流 out = fs.create(new Path(CLOUD_DEST)) // 連線兩個流,形成通道,使輸入流向輸出流傳輸資料 IOUtils.copyBytes(in, out, 1024, true)