1. 程式人生 > >使用Hadoop的Java API操作HDFS

使用Hadoop的Java API操作HDFS

本文介紹Java API訪問HDFS,實現檔案的讀寫,檔案系統的操作等。開發環境為eclipse,開發時所依賴的jar包,可在Hadoop安裝目錄下找到。

Demo

package com.test.hdfs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache
.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; public class TestHdfs { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); //write Path path = new Path("/import/tmp/Wtest.txt"
); FSDataOutputStream fout = fs.create(path); byte[] bWrite = "hello hadoop distribute file system \n".getBytes(); fout.write(bWrite); //寫入位元組陣列 fout.flush(); //flush提供了一種將緩衝區的資料強制重新整理到檔案系統的方法 fout.close(); //關閉寫出流 fout = fs.append(path); fout.write
("append: the append method of java API \n".getBytes()); fout.close(); //關閉寫出流 //read FSDataInputStream fin = fs.open(path); byte[] buff = new byte[128]; int len = 0 ; while( (len = fin.read(buff,0,128)) != -1 ) { System.out.print(new String(buff,0,len)); } //建立目錄 if(fs.mkdirs(new Path("/import/test"))) { System.out.println("mkdir /import/test success "); } //列出目錄 FileStatus[] paths = fs.listStatus(new Path("/import")); for(int i = 0 ; i < paths.length ;++i) { System.out.println(paths[i].toString()); System.out.println(paths[i].getLen()); System.out.println(paths[i].isDirectory()); System.out.println(paths[i].getPath().getParent()); System.out.println(paths[i].getPath()); System.out.println(paths[i].getPath().getName()); } //刪除 if(fs.delete(new Path("/import"), true)) { System.out.println("delete directory /import "); } fin.close(); fs.close(); } }

Explain
使用HDFS的JAVA API操作HDFS的檔案系統,首先要獲取用於操作檔案系統的例項,而檔案系統的又是與當前的系統的環境變數息息相關。對於操作HDFS來說,環境配置主要是core-site.xml中的相關配置。

1.獲取檔案系統訪問例項

Configuration conf = new Configuration();  //獲取當前的預設環境配置
FileSystem fs = FileSystem.get(conf);   //根據當前環境配置,獲取檔案系統訪問例項

Configuration還用提供了用於增加/修改當前環境配置的相關方法,如addResource(Path file)可以增加xml格式的配置,set(String name,String value)以鍵值對的形式新增/修改配置項。
獲取檔案系統訪問例項方法有:

public static FileSystem get(Configuration conf) throws IOException;
public static FileSystem get(URI uri,Configuration conf) throws IOException;

第一個方法是使用預設的URI地址(core-site.xml中配置)獲取當前環境變數來載入檔案系統,第二個方法則傳入指定的URI路徑來獲取例項。

2.向HDFS中寫入資料

public FSDataOutputStream create(Path f) throws IOException;
public FSDataOutputStream append(Path f) throws IOException;
public void write(byte b[]) throws IOException;
public final void writeBytes(String s) throws IOException
public final void writeUTF(String str) throws IOException

首先要根據當前的環境,獲取寫出資料了物件,create方法根據路徑建立資料流物件,如果path目錄的檔案已經存在,則會覆蓋原檔案的內容。append方法則在原路徑的檔案上追加寫入。都返回了FSDataOutputStream物件,其繼承至DataOutputStream,提供了標準I/O的操作。FSDataOutputStream提供了很多寫出資料流的方法如過載的write,writeBytes,writeUTF等。flush提供了一種將緩衝區的資料強制重新整理到檔案系統的方法。此外,write()提供了一種帶有回撥方法的引數,回去在每次寫出快取時,提供進度。

OutputStream out = fs.create(new Path(dst), new Progressable() {
    public void progress() {
        System.out.print(".");
    }
});
IOUtils.copyBytes(in, out, 4096, true);

3.讀取HDFS檔案系統的資料

public FSDataInputStream open(Path f) throws IOException;
public final int read(byte b[], int off, int len) throws IOException;

public class FSDataInputStream extends DataInputStream implements Seekable, PositionedReadable, ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead, HasEnhancedByteBufferAccess, CanUnbuffer ;

open()方法根據傳進來的path路徑,獲取環境變數,並設定讀取的緩衝區大小(預設為4096),然後返回FSDataInputStream例項,FSDataInputStream繼承至DataInputStream,並實現了Seekable等介面。DataInputStream繼承至標準I/O類,Seekable介面實現對資料的重定位,PositionedReadable介面實現從指定偏移量處讀取檔案。
read()方法從指定off位置讀取len長度的位元組存入byte陣列。如果到達檔案尾則返回-1,否則返回讀取的實際長度。

4.檔案/目錄操作

public boolean mkdirs(Path f) throws IOException;
提供遞迴的建立path目錄功能,mkdirs還有帶許可權的過載版本

public abstract boolean delete(Path paramPath, boolean paramBoolean) throws IOException;
如果paramBoolean為false,則不能遞迴的刪除子目錄,如果此時目錄非空,將丟擲異常Directory is not empty

public abstract FileStatus[] listStatus(Path paramPath) throws FileNotFoundException, IOException;
private void listStatus(ArrayList<FileStatus> results, Path f, PathFilter filter)  throws FileNotFoundException, IOException;

listStatus方法可以列出指定目錄下的檔案或者資料夾(不能遞迴列出),具有PathFilter過濾的過載版本
FileStatus物件描述了檔案的各種屬性,諸如檔案是否是資料夾,檔案的許可權,所有者等,isDirectory(),getLen(),getPath()...

copyFromLocalFile(src, dst);  //從本地拷貝檔案到HDFS
copyToLocalFile(src, dst);    //從HDFS直接拷貝檔案到本地

5.打包&執行
Eclipse編譯所有的jar包從hadoop安裝包中都能找到,不一一列出。
匯出jar包:Export->java JAR file
選擇相關程式碼目錄,依賴的jar包可以不用匯出,hadoop預設環境知道如何載入這些jar包
需要在 JAR Manifest Specification中為待執行的application選擇執行的main class
這裡寫圖片描述

hello hadoop distribute file system 
append: the append method of java API 
mkdir /import/test success 
FileStatus{path=hdfs://localhost:9000/import/test; isDirectory=true; modification_time=1501305873649; access_time=0; owner=root; group=supergroup; permission=rwxr-xr-x; isSymlink=false}
0
true
hdfs://localhost:9000/import
hdfs://localhost:9000/import/test
test