HDFS Java API操作
阿新 • • 發佈:2018-12-15
1.
2.
以下是處於完全分散式
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org. apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop. hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;
/**
* Description: HDFSAPI<br/>
* Copyright (c) , 2018, xlj <br/>
* This program is protected by copyright laws. <br/>
* Program Name:HDFSAPI.java <br/>
* @version : 1.0
*/
public class HDFSAPI {
public static void main(String[] args) throws IOException {
//讀取HDFS的檔案到控制檯,引數是你上傳到HDFS的路徑
catFileToConsole("/data.txt");
//讀取本地檔案中的內容儲存到本地
catFileToLocal("/data.txt");
}
/**
* @param filePath
* @throws IOException
*/
public static void catFileToConsole(String filePath) throws IOException {
/*
* 任何檔案系統都是與當前的環境變數緊密聯絡,對於當前的HDFS來說
* 我們需要在建立當前檔案系統的例項之前,必須獲得當前環境變數
* Configuration
* 為使用者提供當前環境變數的一個例項。其中封裝了當前搭建環境的配置
* 有了這個配置例項,才能繼續呼叫FileSystem類
*/
//1.獲取Configuration物件
Configuration conf = new Configuration();
/*2.需要設定當前相關屬性---->設定core-site.xml對應相關屬性
* 因為現在是要連線HDFS分散式檔案系統,所以要配置一個連線屬性
* 9000--->是一個埠,這是一個內部通訊埠號
* 50070-->HDFS WebUI介面的埠號
* 50090-->SecondaryNamenode埠號
*
*/
conf.set("fs.defaultFS", "hdfs://hadoop1:9000");
//需要都檔案系統進行連線訪問,FileSystem這是一個抽象類
//提供N個get方法來獲取當前的連線
//當前會丟擲一個IOException的異常,就是一個流操作
FileSystem fs = FileSystem.get(conf);
//open方法連線HDFS 引數:一個path--》HDFS分散式檔案系統要訪問的具體的地址
FSDataInputStream fis = fs.open(new Path(filePath));
//推薦一個工具類IOUtils
IOUtils.copyBytes(fis, System.out, 4096, true);;
}
/**
* 讀取檔案中的內容儲存到本地
* @param filePath
* @throws IOException
*/
public static void catFileToLocal(String filePath) throws IOException {
//1.建立連線配置
Configuration conf = new Configuration();
//2.設計相關屬性
conf.set("fs.defaultFS", "hdfs://hadoop1:9000");
//3.獲取FileSystem物件
FileSystem fs = FileSystem.get(conf);
//4.獲取一個輸入流物件,讀取當前HDFS分散式檔案系統中資料夾的內容
FSDataInputStream fis = fs.open(new Path(filePath));
//5.建立一個輸出流將內容寫到本地檔案
OutputStream os = new FileOutputStream(new File("dir/file.txt"));
IOUtils.copyBytes(fis, os, 4096, true);
}
/**
* 在檔案系統中建立資料夾
* @param dirPath
* @throws URISyntaxException
* @throws InterruptedException
* @throws IOException
*/
public static void mkdir(String dirPath) throws IOException, InterruptedException, URISyntaxException {
//1.獲取configuration物件
Configuration conf = new Configuration();
//2.直接建立FileSystem
//三個引數 :
/*
* 第一個引數:URI --hdfs的內部通訊網站 URI ---net包
* 第二個引數:Configuration物件
* 第三個引數:使用者---root
*/
FileSystem fs = FileSystem.get(new URI("hdfs://hadoo1:9000"), conf,"root");
//建立當前路徑下的資料夾,這裡需要注意的是方法名多一個s
//就意味著即可以建立一個資料夾,也可以建立多資料夾
boolean result = fs.mkdirs(new Path(dirPath));
if(result) {
System.out.println("資料夾建立成功");
}else {
System.out.println("資料夾建立失敗");
}
}
/**
* 建立空檔案
* @param filePath
* @throws URISyntaxException
* @throws InterruptedException
* @throws IOException
*/
public static void touchFile(String filePath) throws IOException, InterruptedException, URISyntaxException {
//1.建立Configuration物件
Configuration conf = new Configuration();
//設定屬性
//conf.set("fs.defaultFS", "hdfs://hadoop1:9000");
//有設定過副本數量 conf.set設定副本數量--在不設定副本數量的前提下預設是3
//所以這裡創建出來的檔案使用的是預設值
//若需要自己的副本數量 conf.set需要設定副本屬性
// <name>dfs.replication</name> <value>2</value>
// 3.獲取FileSystem物件
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop1:9000"),conf,"root");
//4.建立空檔案
fs.create(new Path(filePath));
System.out.println("建立成功");
}
/**
* 顯示檔案資訊
* @param dirPath
* @throws URISyntaxException
* @throws InterruptedException
* @throws IOException
*/
public static void list(String dirPath) throws IOException, InterruptedException, URISyntaxException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop1:9000"), conf,"root");
/*
* FileSystem 類似於Java中的File
* 所有的FileSystem 中提供了一個遍歷資料夾的方式 listStatus 返回值是一個數組,陣列元素的資料型別是FileStatus
*/
FileStatus [] fss = fs.listStatus(new Path(dirPath));
for (FileStatus f : fss) {
System.out.println("檔名字:"+f.getPath().getName());
System.out.println("檔案的所屬者:"+f.getOwner());
System.out.println("檔案的所屬組: "+f.getGroup());
System.out.println("檔案的大小: "+f.getLen());
System.out.println("檔案的副本數:"+f.getReplication());
System.out.println("是否是目錄:"+f.isDir());
}
}
/**
* 獲取HDFS中資源情況
* @throws URISyntaxException
* @throws InterruptedException
* @throws IOException
*/
public static void getSource() throws IOException, InterruptedException, URISyntaxException {
//通過FileSystem獲取連線
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop1:9000"), new Configuration(), "root");
//通過其子類轉換成當前子類物件並代用其獲取狀態的方法getStatus
DistributedFileSystem dfs = (DistributedFileSystem)fs;
FsStatus fss = dfs.getStatus();
System.out.println("總量:"+(fss.getCapacity() / 1024 / 1024)+"GB");
System.out.println("使用的量:"+(fss.getUsed() / 1024 / 1024)+"MB");
System.out.println("w維持總量:"+(fss.getRemaining() / 1024 / 1024)+"GB");
}
/**
* 獲取單臺節點的資訊
* @throws URISyntaxException
* @throws InterruptedException
* @throws IOException
*/
public static void getNodeInfos() throws IOException, InterruptedException, URISyntaxException {
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop1:9000"), new Configuration(), "root");
DistributedFileSystem dfs = (DistributedFileSystem)fs;
DatanodeInfo[] dis = dfs.getDataNodeStats();
for (DatanodeInfo datanodeInfo : dis) {
System.out.println("當前節點的總容量:"+(datanodeInfo.getCapacity()));
System.out.println("HostName:"+(datanodeInfo.getHostName()));
System.out.println("IP地址:"+(datanodeInfo.getIpAddr()));
System.out.println(datanodeInfo.getName());
}
}
/**
* 塊資訊
* @param filePath
* @throws IOException
* @throws URISyntaxException
* @throws InterruptedException
*/
public static void getBlockInfos(String filePath) throws IOException, InterruptedException, URISyntaxException {
//通過FileSystem獲取連線
FileSystem fs = FileSystem.get(new URI("hdfs://hadoo1:9000"), new Configuration(), "root");
//拿到當前檔案的描述資訊
FileStatus fss = fs.getFileStatus(new Path(filePath));
//獲取檔案中塊的資訊-->返回值是一個數組,為什麼
//因為只要檔案大於128M就會被切分為其他塊
BlockLocation[] bls = fs.getFileBlockLocations(fss, 0, fss.getLen());
for (BlockLocation bl : bls) {
//BlockLocation方法:返回值是字串陣列,將bl物件中的屬性值存在陣列中
for (int i = 0; i < bl.getHosts().length; i++) {
System.out.println(bl.getHosts()[i]);
}
}
}
/**
* 帶進度的檔案上傳
* @param filePath
* @param filepath2
* @throws URISyntaxException
* @throws InterruptedException
* @throws IOException
*/
public static void upLoadProcess(String filePath,String filepath2) throws IOException, InterruptedException, URISyntaxException {
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop1:9000"), new Configuration(), "root");
FSDataOutputStream fos = fs.create(new Path(filePath),new Progressable() {
//Progressable介面中提供了一個progress這個方法,每次在寫檔案的時候寫64K
@Override
public void progress() {
//迴圈 獲取檔案大小等等
System.out.println("*");
}
});
//獲取一個檔案
InputStream fis = new FileInputStream(new File(filepath2));
IOUtils.copyBytes(fis, fos, 1024,true);
}
/**
* 檔案的上傳 相當於 -put
* @param paths
* @param HDFSfilePath
* @throws URISyntaxException
* @throws InterruptedException
* @throws IOException
*/
public static void upload(String localFilePath,String HDFSfilePath) throws IOException, InterruptedException, URISyntaxException {
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop1:9000"),new Configuration(), "root");
/*
* 第一個 引數:本地檔案的位置
* 第二個引數:HDFS檔案系統中儲存檔案的位置
*/
fs.copyFromLocalFile(new Path(localFilePath), new Path(HDFSfilePath));
}
public static void uploads(Path []paths ,String HDFSfilePath) throws IOException, InterruptedException, URISyntaxException {
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop1:9000"),new Configuration(), "root");
/*1.上傳成功後是否要刪除本地檔案
*2.上傳後若有相同檔案是否要覆蓋
*這兩個引數都是boolean型別
*true---上傳成功後要刪除和覆蓋
*false--不刪除也不覆蓋
*3.刪除是多個檔案路徑
*4.HDFS檔案系統中要儲存的路徑
*/
fs.copyFromLocalFile(false, true,paths,new Path(HDFSfilePath));
}
public static void download(String HDFSfilePath,String localFilePath) throws IOException, InterruptedException, URISyntaxException {
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop1:9000"), new Configuration(), "root");
/*
* 1.下載完後是否要刪除HDFS檔案 boolean
* 2.HDFS檔案系統的路徑
* 3.當前下載到本地的路徑
* 4.是否要使用本地的檔案系統,改用為java的IO流
*/
fs.copyToLocalFile(false, new Path(HDFSfilePath),new Path(localFilePath),true);
}
/**
* 刪除HDFS檔案系統中的檔案
* @param filePath
* @throws URISyntaxException
* @throws InterruptedException
* @throws IOException
*/
public static void deleteFile(String filePath) throws IOException, InterruptedException, URISyntaxException {
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop1:9000"),new Configuration(),"root");
Path path = new Path(filePath);
if(fs.exists(path)) {
//檢查是不是一個目錄,是就遞迴刪除,不是就直接刪除
if(fs.isDirectory(path)) {
fs.delete(path,true);
}else {
fs.delete(path,false);
}
}else {
System.out.println("刪除的檔案不存在");
}
}
/**
* 移動HDFS系統中的檔案,集移動剪下於一身
* @param src
* @param dsc
* @throws IOException
* @throws InterruptedException
* @throws URISyntaxException
*/
public static void rename(String src,String dsc) throws IOException, InterruptedException, URISyntaxException {
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop1:9000"),new Configuration(),"root");
fs.rename(new Path(src), new Path(dsc));
}
}