1. 程式人生 > >scala和java解壓zip的檔案,並上傳到hdfs伺服器

scala和java解壓zip的檔案,並上傳到hdfs伺服器

之前一直使用的事java開發的,後來學習hadoop,spark以後,需要做一個解壓檔案到伺服器的例子,由於學習scala時間不長,故先用java程式碼寫出來,在改成scala的程式碼,過程中出現了很多問題,我將會在最後說明,請大家注意不要少了包。

環境宣告:windows8.1,eclipse,idea14,hadoop2.6,scala2.11,請自行配置好hadoop的環境。

java程式碼如下:

<span style="font-size:12px;">public class TestFile {
public static void main(String[] args) {
String zipDir = "G:/zip";
File zipFile = new File(zipDir);
File[] zipFiles = zipFile.listFiles();
for(File zip:zipFiles)
{
if(zip.getName().endsWith(".zip"))
{
System.out.println("name is "+zip.getName()+" , path = "+zip.getAbsolutePath());
//解壓zip壓縮檔案
boolean result = unZipFile(zip,zipDir);
if(result)
{
String filedir = zip.getName().substring(0, zip.getName().indexOf("."));
String localDir = zipDir+filedir;
System.out.println("檔案f.getName()解壓成功 filedir = "+filedir);
//上傳檔案到HDFS
String cloudDir = "hdfs://10.132.10.235:9000/zcd/";
upload(localDir,cloudDir);
//刪除本地目錄
File fileDir = new File(localDir);
deleteFile(fileDir);
fileDir.delete();
zip.delete();

}
else
{
System.out.println("檔案f.getName()解壓失敗");
}
}
}
}

private static boolean unZipFile(File zipFile, String descDir)  
    {  
boolean flag = true;

try {
File pathFile = new File(descDir);  
if(!pathFile.exists()){  
   pathFile.mkdirs();  
}  
ZipFile zip = new ZipFile(zipFile);  
for(Enumeration entries = zip.getEntries();entries.hasMoreElements();){  
   ZipEntry entry = (ZipEntry)entries.nextElement();  
   String zipEntryName = entry.getName();  
   InputStream in = zip.getInputStream(entry);  
   String outPath = (descDir+zipEntryName).replaceAll("\\*", "/");;  
   //判斷路徑是否存在,不存在則建立檔案路徑  
   File file = new File(outPath.substring(0, outPath.lastIndexOf('/')));  
   if(!file.exists()){  
       file.mkdirs();  
   }  
   //判斷檔案全路徑是否為資料夾,如果是上面已經上傳,不需要解壓  
   if(new File(outPath).isDirectory()){  
       continue;  
   }  
   //輸出檔案路徑資訊  
   System.out.println(outPath);  
     
   OutputStream out = new FileOutputStream(outPath);  
   byte[] buf1 = new byte[1024];  
   int len;  
   while((len=in.read(buf1))>0){  
       out.write(buf1,0,len);  
   }  
   in.close();  
   out.close();  
   }
} catch (ZipException e) {
flag = false;
e.printStackTrace();
} catch (FileNotFoundException e) {
flag = false;
e.printStackTrace();
} catch (IOException e) {
flag = false;
e.printStackTrace();
}  
        System.out.println("******************解壓完畢********************");   
        return flag;
    }  

private static void upload(String localDir,String cloudDir)
{
try {
System.out.println("hadoop上傳檔案開始...");
// 獲取一個conf物件
Configuration conf = new Configuration();
File fileDir = new File(localDir);
File[] files = fileDir.listFiles();
InputStream in = null;
FileSystem fs = null;
OutputStream out = null;
for(File file:files)
{
// 本地檔案存取的位置
String LOCAL_SRC = file.getAbsolutePath();
// 存放到雲端HDFS的位置
String CLOUD_DEST = cloudDir+file.getName();
in = new BufferedInputStream(new FileInputStream(LOCAL_SRC));
// 檔案系統
fs = FileSystem.get(URI.create(CLOUD_DEST), conf);
// 輸出流
out = fs.create(new Path(CLOUD_DEST));
// 連線兩個流,形成通道,使輸入流向輸出流傳輸資料
IOUtils.copyBytes(in, out, 1024, true);
}
in.close();
fs.close();
out.close();
System.out.println("hadoop上傳檔案結束...");
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IllegalArgumentException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}

private static void deleteFile(File file)
{
if(file.exists())
{                    
   if(file.isFile())
   {                   
      file.delete();
   }
   else if(file.isDirectory())
   {              
     File files[] = file.listFiles();               
     for(int i=0;i<files.length;i++)
     {            
         deleteFile(files[i]);
     } 
   } 
} 
}
}</span>


scala程式碼 如下:

object zip {
  def main(args: Array[String]) {
    val zipDir = "G:/zip"
val zipFile = new File(zipDir)
    val zipFiles = zipFile.listFiles()
    for(zip <- zipFiles)
    {
      if(zip.getName.endsWith(".zip"))
      {
        println("name is "+zip.getName+" , path = "+zip.getAbsolutePath)
        //解壓zip壓縮檔案
val result = unZipFile(zip,zipDir) if(result) { val filedir = zip.getName.substring(0, zip.getName.indexOf(".")) val localDir = zipDir+filedir println("檔案f.getName()解壓成功 filedir = = "+filedir) //上傳檔案到HDFS val cloudDir = "hdfs://10.132.10.235:9000/zcd/" upload
(localDir,cloudDir); //刪除本地目錄 val fileDir = new File(localDir) deleteFile(fileDir) fileDir.delete() zip.delete() } else{ println("檔案f.getName()解壓失敗") } } } } /** * 解壓 * @param zipFile * @param descDir * @return */ def unZipFile(zipFile:File , descDir:String ):Boolean ={ var flag = true try { val pathFile:File = new File(descDir) if (!pathFile.exists) { pathFile.mkdirs } val zip = new ZipFile(zipFile) val entries = zip.getEntries while (entries.hasMoreElements){ // val entry = entries.nextElement.asInstanceOf[ZipFile] val entry = entries.nextElement val zipEntryName = entry.getName val in = zip.getInputStream(entry) val outPath = (descDir + zipEntryName).replaceAll("\\*", "/") //判斷路徑是否存在,不存在則建立檔案路徑 val file = new File(outPath.substring(0, outPath.lastIndexOf('/'))) if (!file.exists) { file.mkdirs } //判斷檔案全路徑是否為資料夾,如果是上面已經上傳,不需要解壓 if (!new File(outPath).isDirectory()) { //輸出檔案路徑資訊 println(outPath) val out = new FileOutputStream(outPath) //val writer = new PrintWriter(new File(outPath)) val buf1 = Array[Byte]() while((in.read(buf1))>0){ in.read out.write(buf1) } // out.close } in.close } }catch{ case e: ZipException => flag = false case e: FileNotFoundException => flag = false case e: IOException => flag = false } println("******************解壓完畢********************") flag } /** * 上傳 * @param localDir * @param cloudDir */ def upload(localDir:String,cloudDir:String): Unit ={ try { println("hadoop上傳檔案開始..."); // 獲取一個conf物件 val conf = new Configuration(); val fileDir = new File(localDir); val files = fileDir.listFiles(); var in:InputStream = null var fs:FileSystem=null var out:OutputStream=null for(file <- files){ // 本地檔案存取的位置 val LOCAL_SRC = file.getAbsolutePath // 存放到雲端HDFS的位置 val CLOUD_DEST = cloudDir+file.getName in = new BufferedInputStream(new FileInputStream(LOCAL_SRC)) // 檔案系統 fs = FileSystem.get(URI.create(CLOUD_DEST), conf) // 輸出流 out = fs.create(new Path(CLOUD_DEST)) // 連線兩個流,形成通道,使輸入流向輸出流傳輸資料 IOUtils.copyBytes(in, out, 1024, true) } in.close fs.close out.close println("hadoop上傳檔案結束..."); } catch { case e: FileNotFoundException => e.printStackTrace case e: IllegalArgumentException => e.printStackTrace case e: IOException => e.printStackTrace } } /** * 刪除 * @param file */ def deleteFile(file:File ):Unit={ if(file.exists()){ if(file.isFile()){ file.delete() }else if(file.isDirectory()){ val files = file.listFiles() for(file <- files){ deleteFile(file) } } } } }

在這裡主要說scala程式碼編寫過程中出現的問題:

scala可以自動解析物件的型別,所以不需要我們自己宣告物件的型別,一般都用val來宣告,除非是一定要改變的值,就用var,必須初始化。

在scala中一開始這句話是報錯的,提示不能解析,也就是找不到entries找不到nextElement 這個方法。

val entry = entries.nextElement
我們需要做如下的裝換:
val entry = entries.nextElement.asInstanceOf[ZipFile]
上面這個原因是因為我使用了一個ant1.8.jar的jar包,最後發現是這個包的原因,在scala下執行還有警告。用ant.jar替換ant1.8.jar包,就可以了。

在宣告物件的時候,物件是必須初始化的,如果可以為null,但是有時候你必須宣告它的型別,可以如下宣告,加上它的型別即可。

var in:InputStream = null

往hdfs寫檔案時可以用方法即可

// 本地檔案存取的位置
val LOCAL_SRC = file.getAbsolutePath
// 存放到雲端HDFS的位置
val CLOUD_DEST = cloudDir+file.getName
in = new BufferedInputStream(new FileInputStream(LOCAL_SRC))
// 檔案系統
fs = FileSystem.get(URI.create(CLOUD_DEST), conf)
// 輸出流
out = fs.create(new Path(CLOUD_DEST))
// 連線兩個流,形成通道,使輸入流向輸出流傳輸資料
IOUtils.copyBytes(in, out, 1024, true)