使用Hadoop的Java API操作HDFS
本文介紹Java API訪問HDFS,實現檔案的讀寫,檔案系統的操作等。開發環境為eclipse,開發時所依賴的jar包,可在Hadoop安裝目錄下找到。
Demo
package com.test.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache .hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class TestHdfs {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
//write
Path path = new Path("/import/tmp/Wtest.txt" );
FSDataOutputStream fout = fs.create(path);
byte[] bWrite = "hello hadoop distribute file system \n".getBytes();
fout.write(bWrite); //寫入位元組陣列
fout.flush(); //flush提供了一種將緩衝區的資料強制重新整理到檔案系統的方法
fout.close(); //關閉寫出流
fout = fs.append(path);
fout.write ("append: the append method of java API \n".getBytes());
fout.close(); //關閉寫出流
//read
FSDataInputStream fin = fs.open(path);
byte[] buff = new byte[128];
int len = 0 ;
while( (len = fin.read(buff,0,128)) != -1 )
{
System.out.print(new String(buff,0,len));
}
//建立目錄
if(fs.mkdirs(new Path("/import/test")))
{
System.out.println("mkdir /import/test success ");
}
//列出目錄
FileStatus[] paths = fs.listStatus(new Path("/import"));
for(int i = 0 ; i < paths.length ;++i)
{
System.out.println(paths[i].toString());
System.out.println(paths[i].getLen());
System.out.println(paths[i].isDirectory());
System.out.println(paths[i].getPath().getParent());
System.out.println(paths[i].getPath());
System.out.println(paths[i].getPath().getName());
}
//刪除
if(fs.delete(new Path("/import"), true))
{
System.out.println("delete directory /import ");
}
fin.close();
fs.close();
}
}
Explain
使用HDFS的JAVA API操作HDFS的檔案系統,首先要獲取用於操作檔案系統的例項,而檔案系統的又是與當前的系統的環境變數息息相關。對於操作HDFS來說,環境配置主要是core-site.xml中的相關配置。
1.獲取檔案系統訪問例項
Configuration conf = new Configuration(); //獲取當前的預設環境配置
FileSystem fs = FileSystem.get(conf); //根據當前環境配置,獲取檔案系統訪問例項
Configuration還用提供了用於增加/修改當前環境配置的相關方法,如addResource(Path file)可以增加xml格式的配置,set(String name,String value)以鍵值對的形式新增/修改配置項。
獲取檔案系統訪問例項方法有:
public static FileSystem get(Configuration conf) throws IOException;
public static FileSystem get(URI uri,Configuration conf) throws IOException;
第一個方法是使用預設的URI地址(core-site.xml中配置)獲取當前環境變數來載入檔案系統,第二個方法則傳入指定的URI路徑來獲取例項。
2.向HDFS中寫入資料
public FSDataOutputStream create(Path f) throws IOException;
public FSDataOutputStream append(Path f) throws IOException;
public void write(byte b[]) throws IOException;
public final void writeBytes(String s) throws IOException
public final void writeUTF(String str) throws IOException
首先要根據當前的環境,獲取寫出資料了物件,create方法根據路徑建立資料流物件,如果path目錄的檔案已經存在,則會覆蓋原檔案的內容。append方法則在原路徑的檔案上追加寫入。都返回了FSDataOutputStream物件,其繼承至DataOutputStream,提供了標準I/O的操作。FSDataOutputStream提供了很多寫出資料流的方法如過載的write,writeBytes,writeUTF等。flush提供了一種將緩衝區的資料強制重新整理到檔案系統的方法。此外,write()提供了一種帶有回撥方法的引數,回去在每次寫出快取時,提供進度。
OutputStream out = fs.create(new Path(dst), new Progressable() {
public void progress() {
System.out.print(".");
}
});
IOUtils.copyBytes(in, out, 4096, true);
3.讀取HDFS檔案系統的資料
public FSDataInputStream open(Path f) throws IOException;
public final int read(byte b[], int off, int len) throws IOException;
public class FSDataInputStream extends DataInputStream implements Seekable, PositionedReadable, ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead, HasEnhancedByteBufferAccess, CanUnbuffer ;
open()方法根據傳進來的path路徑,獲取環境變數,並設定讀取的緩衝區大小(預設為4096),然後返回FSDataInputStream例項,FSDataInputStream繼承至DataInputStream,並實現了Seekable等介面。DataInputStream繼承至標準I/O類,Seekable介面實現對資料的重定位,PositionedReadable介面實現從指定偏移量處讀取檔案。
read()方法從指定off位置讀取len長度的位元組存入byte陣列。如果到達檔案尾則返回-1,否則返回讀取的實際長度。
4.檔案/目錄操作
public boolean mkdirs(Path f) throws IOException;
提供遞迴的建立path目錄功能,mkdirs還有帶許可權的過載版本
public abstract boolean delete(Path paramPath, boolean paramBoolean) throws IOException;
如果paramBoolean為false,則不能遞迴的刪除子目錄,如果此時目錄非空,將丟擲異常Directory is not empty
public abstract FileStatus[] listStatus(Path paramPath) throws FileNotFoundException, IOException;
private void listStatus(ArrayList<FileStatus> results, Path f, PathFilter filter) throws FileNotFoundException, IOException;
listStatus方法可以列出指定目錄下的檔案或者資料夾(不能遞迴列出),具有PathFilter過濾的過載版本
FileStatus物件描述了檔案的各種屬性,諸如檔案是否是資料夾,檔案的許可權,所有者等,isDirectory(),getLen(),getPath()...
copyFromLocalFile(src, dst); //從本地拷貝檔案到HDFS
copyToLocalFile(src, dst); //從HDFS直接拷貝檔案到本地
5.打包&執行
Eclipse編譯所有的jar包從hadoop安裝包中都能找到,不一一列出。
匯出jar包:Export->java JAR file
選擇相關程式碼目錄,依賴的jar包可以不用匯出,hadoop預設環境知道如何載入這些jar包
需要在 JAR Manifest Specification中為待執行的application選擇執行的main class
hello hadoop distribute file system
append: the append method of java API
mkdir /import/test success
FileStatus{path=hdfs://localhost:9000/import/test; isDirectory=true; modification_time=1501305873649; access_time=0; owner=root; group=supergroup; permission=rwxr-xr-x; isSymlink=false}
0
true
hdfs://localhost:9000/import
hdfs://localhost:9000/import/test
test