基於Python實現視頻去重小工具

同級目錄下新建dup_video

import json
import os
import shutil

import cv2
import imagehash
from PIL import Image
from loguru import logger
from PySimpleGUI import popup_get_folder


class VideoDuplicate(object):
    '''
    返回整個視頻的圖片指紋列表
    從1秒開始,每3秒抽幀,計算一張圖像指紋
    '''

    def __init__(self):
        self._over_length_video: list = []
        self._no_video: list = []

    def _video_hash(self, video_path) -> list:
        '''
        @param video_path -> 視頻絕對路徑;
        '''
        hash_arr = []
        cap = cv2.VideoCapture(video_path)  ##打開視頻文件
        logger.info(f'開始抽幀【{video_path}】')

        n_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))  # 視頻的幀數
        logger.warning(f'視頻幀數:{n_frames}')

        fps = cap.get(cv2.CAP_PROP_FPS)  # 視頻的幀率
        logger.warning(f'視頻幀率:{fps}')

        dur = n_frames / fps * 1000  # 視頻大致總長度
        cap_set = 1000
        logger.warning(f'視頻大約總長:{dur / 1000}')
        if dur // 1000 > 11:
            logger.error(f'視頻時長超出規定范圍【6~10】;當前時長:【{dur // 1000}】;跳過該視頻;')
            self._over_length_video.append(video_path)
            return []

        while cap_set < dur:  # 從3秒開始,每60秒抽幀,計算圖像指紋。總長度-3s,是因為有的時候計算出來的長度不準。
            cap.set(cv2.CAP_PROP_POS_MSEC, cap_set)
            logger.debug(f'開始提取:【{cap_set // 1000}】/s的圖片;')
            # 返回該時間點的,圖像(numpy數組),及讀取是否成功
            success, image_np = cap.read()
            if success:
                img = Image.fromarray(cv2.cvtColor(image_np, cv2.COLOR_BGR2RGB))  # 轉成cv圖像格式
                h = str(imagehash.dhash(img))
                logger.success(f'【{cap_set}/s圖像指紋:【{h}】')
                hash_arr.append(h)  # 圖像指紋
            else:
                logger.error(str(cap_set / 1000))
            cap_set += 1000 * 2
        cap.release()  # 釋放視頻
        return hash_arr

    def start(self, base_dir):
        '''
        @param base_dir -> 主文件路徑;
        '''
        data: list = []
        for video in os.listdir(base_dir):
            logger.debug(f'-' * 80)
            name, ext = os.path.splitext(video)
            if ext not in ('.mp4', '.MP4'):
                logger.error(f'視頻文件格式不符;【{video}】;執行跳過;')
                continue

            abs_video_path = os.path.join(base_dir, video)
            video_hash_list = self._video_hash(abs_video_path)
            if video_hash_list:
                data.append({'video_abs_path': abs_video_path, 'hash': video_hash_list})

        self._write_log(data)
        return data

    @staticmethod
    def _write_log(data: list) -> None:
        '''視頻哈希後的值寫入日志文件'''
        with open(f'log.txt', 'w+', encoding='utf-8') as f:
            f.write(json.dumps(data))

    def __call__(self, base_dir, *args, **kwargs):
        self.start(base_dir)
        logger.debug(f'-----------------------------------開始比對關鍵幀差值感知餘弦算法-----------------------------')

        with open('log.txt') as f:
            data = json.loads(f.read())
            for i in range(0, len(data) - 1):
                for j in range(i + 1, len(data)):
                    if data[i]['hash'] == data[j]['hash']:
                        _, filename = os.path.split(data[i]['video_abs_path'])
                        logger.error(f'移動文件:【{filename}】')
                        shutil.move(
                            os.path.join(base_dir, filename),
                            os.path.join(os.path.join(os.getcwd(), 'dup_video'), filename)
                        )
        logger.warning('---------------------超長視頻----------------------')
        for i in self._over_length_video:
            _, name = os.path.split(i)
            logger.error(name)


def main():
    path = popup_get_folder('請選擇[視頻去重]文件夾')
    v = VideoDuplicate()
    v(path)


if __name__ == '__main__':
    main()

方法補充

除瞭上述代碼,小編還整理瞭其他可以實現視頻去除功能的方法,希望對大傢有所幫助

python+opencv抽取視頻幀並去重

import os 
import sys
import cv2
import glob
import json
import numpy as np
import skimage
from skimage import metrics
import hashlib
print(skimage.__version__)

def load_json(json_file):
    with open(json_file) as fp:
        data = json.load(fp)
    return data['outputs']


def ssim_dis(im1, im2):
    ssim = metrics.structural_similarity(im1, im2, data_range=255, multichannel=True)
    return ssim

# cv2
def isdarkOrBright(grayImg, thre_dark=10, thre_bright=230):
    mean = np.mean(grayImg)
    if mean < thre_dark or mean > thre_bright:
        return True 
    else:
        return False

def get_file_md5(file_name):
    """
    caculate md5
    : param file_name
    : return md5
    """
    m = hashlib.md5()
    with open(file_name, 'rb') as fobj:
        while True:
            data = fobj.read(4096)
            if not data:
                break
            m.update(data)
    return m.hexdigest()

def extract_frame(video_path, save_dir, prefix, ssim_thre=0.90):
    count = 0
    md5set = {}
    last_frame = None
    cap = cv2.VideoCapture(video_path)
    fps = cap.get(cv2.CAP_PROP_FPS)
    index = 0
    tmp_frames = []
    while cap.isOpened():
        frameState, frame = cap.read()
        if not frameState or frame is None:
            break
        grayImg = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        # if isdarkOrBright(grayImg):
        #     index += 1
        #     continue
        assert cv2.imwrite('tmp.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 100])
        md5 = get_file_md5('tmp.jpg')
        if md5 in md5set:
            md5set[md5] += 1
            index += 1
            continue
        md5set[md5] = 1
        
        save_path = os.path.join(save_dir, prefix+'_'+str(index).rjust(4, '0')+'.jpg')
        if last_frame is None:
            if cv2.imwrite(save_path, frame, [cv2.IMWRITE_JPEG_QUALITY, 100]):
                count += 1
                last_frame = frame
                tmp_frames.append(frame)
        else:
            dis = ssim_dis(last_frame, frame)
            if dis <= ssim_thre:
                save_frame = tmp_frames[len(tmp_frames)//2]
                if cv2.imwrite(save_path, save_frame, [cv2.IMWRITE_JPEG_QUALITY, 100]):
                    count += 1
                    last_frame = frame
                    tmp_frames = [frame]
            else:
                tmp_frames.append(frame)
        index += 1

    cap.release()
    return count
        
        

if __name__ == '__main__':
    import sys
    video_path = "videos/***.mp4"
    video_name = video_path.split("/")[-1]
    prefix = video_name[:-4]
    save_imgs_dir = prefix
    if not os.path.exists(save_imgs_dir):
        os.mkdir(save_imgs_dir)
    N = extract_frame(video_path, save_imgs_dir, prefix)
    print(video_name, N)

對圖片,視頻,文件進行去重

import os
from tkinter import *
from tkinter import messagebox
import tkinter.filedialog
root=Tk()
root.title("篩選重復的視頻和照片")
root.geometry("500x500+500+200")
def wbb():
      a=[]
      c={}
      filename=tkinter.filedialog.askopenfilenames()
            
      for i in filename:
            with open(i,'rb') as f:
                  a.append(f.read())
      for j in range(len(a)):
            c[a[j]]=filename[j]
      filename1=tkinter.filedialog.askdirectory()
     
      if filename1!="":
            p=1
            lb1.config(text=filename1+"下的文件為:")
            for h in c:
                k=c[h].split(".")[-1]
                with open(filename1+"/"+str(p)+"."+k,'wb') as f:
                      f.write(h)
                p=p+1      
            for g in os.listdir(filename1):
                  txt.insert(END,g+'\n')
                  
      else:
            messagebox.showinfo("提示",message ='請選擇路徑')
frame1=Frame(root,relief=RAISED)
frame1.place(relx=0.0)

frame2=Frame(root,relief=GROOVE)
frame2.place(relx=0.5)

lb1=Label(frame1,text="等等下面會有變化?",font=('華文新魏',13))
lb1.pack(fill=X)    

txt=Text(frame1,width=30,height=50,font=('華文新魏',10))
txt.pack(fill=X)        

lb=Label(frame2,text="點我選擇要進行篩選的文件:",font=('華文新魏',10))
lb.pack(fill=X)            
            
                  
btn=Button(frame2,text="請選擇要進行篩選的文件",fg='black',relief="raised",bd="9",command=wbb)
btn.pack(fill=X)
root.mainloop()

效果圖

到此這篇關於基於Python實現視頻去重小工具的文章就介紹到這瞭,更多相關Python視頻去重內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: