Golang 實現超大文件讀取的兩種方法

Golang超大文件讀取的兩個方案

流處理方式

分片處理

去年的面試中我被問到超大文件你怎麼處理,這個問題確實當時沒多想,回來之後仔細研究和討論瞭下這個問題,對大文件讀取做瞭一個分析

比如我們有一個log文件,運行瞭幾年,有100G之大。按照我們之前的操作可能代碼會這樣寫:

func ReadFile(filePath string) []byte{
    content, err := ioutil.ReadFile(filePath)
    if err != nil {
        log.Println("Read error")
    }
    return content
} 

上面的代碼讀取幾兆的文件可以,但是如果大於你本身及其內存,那就直接翻車瞭。因為上面的代碼,是把文件所有的內容全部都讀取到內存之後返回,幾兆的文件,你內存夠大可以處理,但是一旦上幾百兆的文件,就沒那麼好處理瞭。

那麼,正確的方法有兩種

第一個是使用流處理方式代碼如下

func ReadFile(filePath string, handle func(string)) error {
    f, err := os.Open(filePath)
    defer f.Close()
    if err != nil {
        return err
    }
    buf := bufio.NewReader(f)
 
    for {
        line, err := buf.ReadLine("\n")
        line = strings.TrimSpace(line)
        handle(line)
        if err != nil {
            if err == io.EOF{
                return nil
            }
            return err
        }
        return nil
    }
}

第二個方案就是分片處理

當讀取的是二進制文件,沒有換行符的時候,使用下面的方案一樣處理大文件

func ReadBigFile(fileName string, handle func([]byte)) error {
    f, err := os.Open(fileName)
    if err != nil {
        fmt.Println("can't opened this file")
        return err
    }
    defer f.Close()
    s := make([]byte, 4096)
    for {
        switch nr, err := f.Read(s[:]); true {
        case nr < 0:
            fmt.Fprintf(os.Stderr, "cat: error reading: %s\n

補充:golang 讀取大文件處理sync.pool + bufio.NewReader(f)

看代碼吧~

文件大小

在這裡插入圖片描述

package main
import (
	"bufio"
	"fmt"
	"io"
	//"math"
	"os"
	"strings"
	"sync"
	"time"
)
func main() {
	/*
	文件數據樣例
	{"remark": "來電時間:  2021/04/15 13:52:07客戶電話:13913xx39xx ", "no": "600020510132021101310210547639", "title": "b-ae0e-0242ac100907", "call_in_date": "2021-04-15 13:52:12", "name": "張三", "_date": "2021-06-15", "name": "張三", "meet": "1"}
	1、我們取出 call_in_date": "2021-04-15 13:52:1的數據寫入另一個文件
	*/
	var (
		s time.Time //當前時間
		file *os.File
		fileStat os.FileInfo
		err error
		lastLineSize int64
	)
	s = time.Now()
	if file, err = os.Open("/Users/zhangsan/Downloads/log.txt");err != nil{
		fmt.Println(err)
	}
	defer func() {
		err = file.Close() //close after checking err
	}()
	//queryStartTime, err := time.Parse("2006-01-02T15:04:05.0000Z", startTimeArg)
	//if err != nil {
	//	fmt.Println("Could not able to parse the start time", startTimeArg)
	//	return
	//}
	//
	//queryFinishTime, err := time.Parse("2006-01-02T15:04:05.0000Z", finishTimeArg)
	//if err != nil {
	//	fmt.Println("Could not able to parse the finish time", finishTimeArg)
	//	return
	//}
	/**
	* {name:"log.log", size:911100961, mode:0x1a4,
	modTime:time.Time{wall:0x656c25c, ext:63742660691,
	loc:(*time.Location)(0x1192c80)}, sys:syscall.Stat_t{Dev:16777220,
	Mode:0x81a4, Nlink:0x1, Ino:0x118cba7, Uid:0x1f5, Gid:0x14, Rdev:0,
	Pad_cgo_0:[4]uint8{0x0, 0x0, 0x0, 0x0}, Atimespec:syscall.Timespec{Sec:1607063899, Nsec:977970393},
	Mtimespec:syscall.Timespec{Sec:1607063891, Nsec:106349148}, Ctimespec:syscall.Timespec{Sec:1607063891,
	Nsec:258847043}, Birthtimespec:syscall.Timespec{Sec:1607063883, Nsec:425808150},
	Size:911100961, Blocks:1784104, Blksize:4096, Flags:0x0, Gen:0x0, Lspare:0, Qspare:[2]int64{0, 0}}
	*
	*/
	if fileStat, err = file.Stat();err != nil {
		return
	}
	fileSize := fileStat.Size()//72849354767
	offset := fileSize - 1
	//檢測是不是都是空行 隻有\n
	for {
		var (
			b []byte
			n int
			char string
		)
		b = make([]byte, 1)
		//從指定位置讀取
		if n, err = file.ReadAt(b, offset);err != nil {
			fmt.Println("Error reading file ", err)
			break
		}
		char = string(b[0])
		if char == "\n" {
			break
		}
		offset--
		//獲取一行的大小
		lastLineSize += int64(n)
	}
	var (
		lastLine []byte
		logSlice []string
		logSlice1 []string
	)
	//初始化一行大小的空間
	lastLine = make([]byte, lastLineSize)
	_, err = file.ReadAt(lastLine, offset)
	if err != nil {
		fmt.Println("Could not able to read last line with offset", offset, "and lastline size", lastLineSize)
		return
	}
	//根據條件進行區分
	logSlice = strings.Split(strings.Trim(string(lastLine),"\n"),"next_pay_date")
	logSlice1  = strings.Split(logSlice[1],"\"")
	if logSlice1[2] == "2021-06-15"{
		Process(file)
	}
	fmt.Println("\nTime taken - ", time.Since(s))
		fmt.Println(err)
}
func Process(f *os.File) error {
	//讀取數據的key,減小gc壓力
	linesPool := sync.Pool{New: func() interface{} {
		lines := make([]byte, 250*1024)
		return lines
	}}
	//讀取回來的數據池
	stringPool := sync.Pool{New: func() interface{} {
		lines := ""
		return lines
	}}
	//一個文件對象本身是實現瞭io.Reader的 使用bufio.NewReader去初始化一個Reader對象,存在buffer中的,讀取一次就會被清空
	r := bufio.NewReader(f) //
	//設置讀取緩沖池大小 默認16
	r = bufio.NewReaderSize(r,250 *1024)
	var wg sync.WaitGroup
	for {
		buf := linesPool.Get().([]byte)
		//讀取Reader對象中的內容到[]byte類型的buf中
		n, err := r.Read(buf)
		buf = buf[:n]
		if n == 0 {
			if err != nil {
				fmt.Println(err)
				break
			}
			if err == io.EOF {
				break
			}
			return err
		}
		//補齊剩下沒滿足的剩餘
		nextUntillNewline, err := r.ReadBytes('\n')
		//fmt.Println(string(nextUntillNewline))
		if err != io.EOF {
			buf = append(buf, nextUntillNewline...)
		}
		wg.Add(1)
		go func() {
			ProcessChunk(buf, &linesPool, &stringPool)
			wg.Done()
		}()
	}
	wg.Wait()
	return nil
}
func ProcessChunk(chunk []byte, linesPool *sync.Pool,stringPool *sync.Pool) {
//做相應的處理
}

執行

go run test2.go "2020-01-01T00:00:00.0000Z" "2020-02-02T00:00:00.0000Z" /Users/zhangsan/go/src/workspace/test/log.log
EOF
Time taken -  20.023517675s
<nil>

以上為個人經驗,希望能給大傢一個參考,也希望大傢多多支持WalkonNet。如有錯誤或未考慮完全的地方,望不吝賜教。

推薦閱讀: