java實現對Hadoop的操作
基本操作
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import org.junit.Test; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.Arrays; @RunWith(JUnit4.class) @DisplayName("Test using junit4") public class HadoopClientTest { private FileSystem fileSystem = null; @BeforeEach public void init() throws URISyntaxException, IOException, InterruptedException { Configuration configuration = new Configuration(); configuration.set("dfs.replication", "1"); configuration.set("dfs.blocksize", "64m"); fileSystem = FileSystem.get(new URI("hdfs://hd-even-01:9000"), configuration, "root"); } /** * 從本地復制文件到Hadoop * * @throws URISyntaxException * @throws IOException * @throws InterruptedException */ @Test public void copyFileFromLocal() throws URISyntaxException, IOException, InterruptedException { // 上傳文件 fileSystem.copyFromLocalFile(new Path("C:\\Users\\Administrator\\Desktop\\win10激活.txt"), new Path("/even1")); // 關閉流,報錯winUtils,因為使用瞭linux的tar包,如果windows要使用,則需要編譯好這個winUtils包才能使用 fileSystem.close(); } /** * 從Hadoop下載文件到本地,下載需要配置Hadoop環境,並添加winutils到bin目錄 * * @throws URISyntaxException * @throws IOException * @throws InterruptedException */ @Test public void copyFileToLocal() throws URISyntaxException, IOException, InterruptedException { // 下載文件 fileSystem.copyToLocalFile(new Path("/win10激活.txt"), new Path("E:/")); // 關閉流,報錯winUtils,因為使用瞭linux的tar包,如果windows要使用,則需要編譯好這個winUtils包才能使用 fileSystem.close(); } /** * 創建文件夾 * * @throws IOException */ @Test public void hdfsMkdir() throws IOException { // 調用創建文件夾方法 fileSystem.mkdirs(new Path("/even1")); // 關閉方法 fileSystem.close(); } /** * 移動文件/修改文件名 */ public void hdfsRename() throws IOException { fileSystem.rename(new Path(""), new Path("")); fileSystem.close(); } /** * 刪除文件/文件夾 * * @throws IOException */ @Test public void hdfsRm() throws IOException { // fileSystem.delete(new Path("")); // 第二個參數表示遞歸刪除 fileSystem.delete(new Path(""), true); fileSystem.close(); } /** * 查看hdfs指定目錄的信息 * * @throws IOException */ @Test public void hdfsLs() throws IOException { // 調用方法返回遠程迭代器,第二個參數是把目錄文件夾內的文件也列出來 RemoteIterator<LocatedFileStatus> listFiles = fileSystem.listFiles(new Path("/"), true); while (listFiles.hasNext()) { LocatedFileStatus locatedFileStatus = listFiles.next(); System.out.println("文件路徑:" + locatedFileStatus.getPath()); System.out.println("塊大小:" + locatedFileStatus.getBlockSize()); System.out.println("文件長度:" + locatedFileStatus.getLen()); System.out.println("副本數量:" + locatedFileStatus.getReplication()); System.out.println("塊信息:" + Arrays.toString(locatedFileStatus.getBlockLocations())); } fileSystem.close(); } /** * 判斷是文件還是文件夾 */ @Test public void findHdfs() throws IOException { // 1,展示狀態信息 FileStatus[] listStatus = fileSystem.listStatus(new Path("/")); // 2,遍歷所有文件 for (FileStatus fileStatus : listStatus) { if (fileStatus.isFile()) System.out.println("是文件:" + fileStatus.getPath().getName()); else if (fileStatus.isDirectory()) System.out.println("是文件夾:" + fileStatus.getPath().getName()); } fileSystem.close(); } }
文件讀寫
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.junit.Before; import org.junit.Test; import org.junit.jupiter.api.DisplayName; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import java.io.*; import java.net.URI; import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; import java.util.Arrays; @RunWith(JUnit4.class) @DisplayName("this is read write test!") public class HadoopReadWriteTest { FileSystem fileSystem = null; Configuration configuration = null; @Before public void init() throws URISyntaxException, IOException, InterruptedException { // 1,加載配置 configuration = new Configuration(); // 2,構建客戶端 fileSystem = FileSystem.get(new URI("hdfs://hd-even-01:9000/"), configuration, "root"); } @Test public void testReadData() throws IOException { // 1,獲取hdfs文件流 FSDataInputStream open = fileSystem.open(new Path("/win10激活.txt")); // 2,設置一次獲取的大小 byte[] bytes = new byte[1024]; // 3,讀取數據 while (open.read(bytes) != -1) System.out.println(Arrays.toString(bytes)); open.close(); fileSystem.close(); } /** * 使用緩存流 * * @throws IOException */ @Test public void testReadData1() throws IOException { FSDataInputStream open = fileSystem.open(new Path("/win10激活.txt")); // 使用緩沖流會快點 BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(open, StandardCharsets.UTF_8)); String line = ""; while ((line = bufferedReader.readLine()) != null) { System.out.println(line); } bufferedReader.close(); open.close(); fileSystem.close(); } /** * 指定偏移量來實現隻讀部分內容 */ @Test public void readSomeData() throws IOException { FSDataInputStream open = fileSystem.open(new Path("/win10激活.txt")); // 指定開始的index open.seek(14); // 指定讀的多少 byte[] bytes = new byte[5]; while (open.read(bytes) != -1) System.out.println(new String(bytes)); open.close(); fileSystem.close(); } /** * 流方式寫數據 * @throws IOException */ @Test public void writeData() throws IOException { // 1,獲取輸出流 FSDataOutputStream out = fileSystem.create(new Path("/win11.txt"), false); // 2,獲取需要寫的文件輸入流 FileInputStream in = new FileInputStream(new File("C:\\Users\\Administrator\\Desktop\\xixi.txt")); byte[] b = new byte[1024]; int read = 0; while ((read = in.read(b)) != -1) { out.write(b, 0, read); } in.close(); out.close(); fileSystem.close(); } /** * 直接寫字符串 */ @Test public void writeData1() throws IOException { // 1,創建輸出流 FSDataOutputStream out = fileSystem.create(new Path("/aibaobao.txt"), false); // 2,寫數據 out.write("wochaoaibaobao".getBytes()); // 3,關閉流 IOUtils.closeStream(out); fileSystem.close(); } /** * IOUtils方式上傳 * * @throws IOException */ @Test public void putToHdfs() throws IOException { // 1,獲取輸入流 FileInputStream in = new FileInputStream(new File("C:\\Users\\Administrator\\Desktop\\xixi.txt")); // 2,獲取輸出流 FSDataOutputStream out = fileSystem.create(new Path("/haddopPut.txt"), false); // 3,拷貝 IOUtils.copyBytes(in, out, configuration); // 4,關閉流 IOUtils.closeStream(in); IOUtils.closeStream(out); fileSystem.close(); } /** * IOUtils方式下載 * @throws IOException */ @Test public void getFromHdfs() throws IOException { // 1,獲取輸入流 FSDataInputStream open = fileSystem.open(new Path("/haddopPut.txt")); // 2,獲取輸出流 FileOutputStream out = new FileOutputStream(new File("C:\\Users\\Administrator\\Desktop\\haddopPut.txt")); // 3,拷貝 IOUtils.copyBytes(open, out, configuration); // 4,關閉流 IOUtils.closeStream(open); IOUtils.closeStream(out); fileSystem.close(); } }
到此這篇關於java實現對Hadoop的操作的文章就介紹到這瞭,更多相關Java Hadoop內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!
推薦閱讀:
- Hadoop集成Spring的使用詳細教程(快速入門大數據)
- 教你怎麼使用hadoop來提取文件中的指定內容
- 雲計算實驗:Java MapReduce編程
- Hadoop中的壓縮與解壓縮案例詳解
- Windows下使用IDEA搭建Hadoop開發環境的詳細方法