| import json import os import shutil import cv2 import imagehash from PIL import Image from loguru import logger from PySimpleGUI import popup_get_folder class VideoDuplicate( object ):
返回整个视频的图片指纹列表
从1秒开始,每3秒抽帧,计算一张图像指纹
def __init__( self ):
self ._over_length_video: list = []
self ._no_video: list = []
def _video_hash( self , video_path) - > list :
@param video_path -> 视频绝对路径;
hash_arr = []
cap = cv2.VideoCapture(video_path) ##打开视频文件
logger.info(f 开始抽帧【{video_path}】 )
n_frames = int (cap.get(cv2.CAP_PROP_FRAME_COUNT)) # 视频的帧数
logger.warning(f 视频帧数:{n_frames} )
fps = cap.get(cv2.CAP_PROP_FPS) # 视频的帧率
logger.warning(f 视频帧率:{fps} )
dur = n_frames / fps * 1000 # 视频大致总长度
cap_set = 1000
logger.warning(f 视频大约总长:{dur / 1000} )
if dur / / 1000 > 11 :
logger.error(f 视频时长超出规定范围【6~10】;当前时长:【{dur // 1000}】;跳过该视频; )
self ._over_length_video.append(video_path)
return []
while cap_set < dur: # 从3秒开始,每60秒抽帧,计算图像指纹。总长度-3s,是因为有的时候计算出来的长度不准。
cap. set (cv2.CAP_PROP_POS_MSEC, cap_set)
logger.debug(f 开始提取:【{cap_set // 1000}】/s的图片; )
# 返回该时间点的,图像(numpy数组),及读取是否成功
success, image_np = cap.read()
if success:
img = Image.fromarray(cv2.cvtColor(image_np, cv2.COLOR_BGR2RGB)) # 转成cv图像格式
h = str (imagehash.dhash(img))
logger.success(f 【{cap_set}/s图像指纹:【{h}】 )
hash_arr.append(h) # 图像指纹
else :
logger.error( str (cap_set / 1000 ))
cap_set + = 1000 * 2
cap.release() # 释放视频
return hash_arr
def start( self , base_dir):
@param base_dir -> 主文件路径;
data: list = []
for video in os.listdir(base_dir):
logger.debug(f - * 80 )
name, ext = os.path.splitext(video)
if ext not in ( .mp4 , .MP4 ):
logger.error(f 视频文件格式不符;【{video}】;执行跳过; )
continue
abs_video_path = os.path.join(base_dir, video)
video_hash_list = self ._video_hash(abs_video_path)
if video_hash_list:
data.append({ video_abs_path : abs_video_path, hash : video_hash_list})
self ._write_log(data)
return data
@staticmethod
def _write_log(data: list ) - > None :
视频哈希后的值写入日志文件
with open (f log.txt , w+ , encoding = utf-8 ) as f:
f.write(json.dumps(data))
def __call__( self , base_dir, * args, * * kwargs):
self .start(base_dir)
logger.debug(f -----------------------------------开始比对关键帧差值感知余弦算法----------------------------- )
with open ( log.txt ) as f:
data = json.loads(f.read())
for i in range ( 0 , len (data) - 1 ):
for j in range (i + 1 , len (data)):
if data[i][ hash ] = = data[j][ hash ]:
_, filename = os.path.split(data[i][ video_abs_path ])
logger.error(f 移动文件:【{filename}】 )
shutil.move(
os.path.join(base_dir, filename),
os.path.join(os.path.join(os.getcwd(), dup_video ), filename)
)
logger.warning( ---------------------超长视频---------------------- )
for i in self ._over_length_video:
_, name = os.path.split(i)
logger.error(name) def main():
path = popup_get_folder( 请选择[视频去重]文件夹 ) |