基於Pytorch實現的聲音分類實例代碼
前言
本章我們來介紹如何使用Pytorch訓練一個區分不同音頻的分類模型,例如你有這樣一個需求,需要根據不同的鳥叫聲識別是什麼種類的鳥,這時你就可以使用這個方法來實現你的需求瞭。
源碼地址:https://github.com/yeyupiaoling/AudioClassification-Pytorch
環境準備
主要介紹libsora,PyAudio,pydub的安裝,其他的依賴包根據需要自行安裝。
- Python 3.7
- Pytorch 1.10.0
安裝libsora
最簡單的方式就是使用pip命令安裝,如下:
pip install pytest-runner pip install librosa==0.9.1
註意: 如果pip命令安裝不成功,那就使用源碼安裝,下載源碼:https://github.com/librosa/librosa/releases/, windows的可以下載zip壓縮包,方便解壓。
pip install pytest-runner tar xzf librosa-<版本號>.tar.gz 或者 unzip librosa-<版本號>.tar.gz cd librosa-<版本號>/ python setup.py install
如果出現 libsndfile64bit.dll': error 0x7e
錯誤,請指定安裝版本0.6.3,如 pip install librosa==0.6.3
安裝ffmpeg, 下載地址:http://blog.gregzaal.com/how-to-install-ffmpeg-on-windows/,筆者下載的是64位,static版。
然後到C盤,筆者解壓,修改文件名為 ffmpeg
,存放在 C:\Program Files\
目錄下,並添加環境變量 C:\Program Files\ffmpeg\bin
最後修改源碼,路徑為 C:\Python3.7\Lib\site-packages\audioread\ffdec.py
,修改32行代碼,如下:
COMMANDS = ('C:\\Program Files\\ffmpeg\\bin\\ffmpeg.exe', 'avconv')
安裝PyAudio
使用pip安裝命令,如下:
pip install pyaudio
在安裝的時候需要使用到C++庫進行編譯,如果讀者的系統是windows,Python是3.7,可以在這裡下載whl安裝包,下載地址:https://github.com/intxcc/pyaudio_portaudio/releases
安裝pydub
使用pip命令安裝,如下:
pip install pydub
訓練分類模型
把音頻轉換成訓練數據最重要的是使用瞭librosa,使用librosa可以很方便得到音頻的梅爾頻譜(Mel Spectrogram),使用的API為 librosa.feature.melspectrogram()
,輸出的是numpy值。關於梅爾頻譜具體信息讀者可以自行瞭解,跟梅爾頻譜同樣很重要的梅爾倒譜(MFCCs)更多用於語音識別中,對應的API為 librosa.feature.mfcc()
。同樣以下的代碼,就可以獲取到音頻的梅爾頻譜。
wav, sr = librosa.load(data_path, sr=16000) features = librosa.feature.melspectrogram(y=wav, sr=sr, n_fft=400, n_mels=80, hop_length=160, win_length=400) features = librosa.power_to_db(features, ref=1.0, amin=1e-10, top_db=None)
生成數據列表
生成數據列表,用於下一步的讀取需要,audio_path
為音頻文件路徑,用戶需要提前把音頻數據集存放在dataset/audio
目錄下,每個文件夾存放一個類別的音頻數據,每條音頻數據長度在3秒以上,如 dataset/audio/鳥叫聲/······
。audio
是數據列表存放的位置,生成的數據類別的格式為 音頻路徑\t音頻對應的類別標簽
,音頻路徑和標簽用制表符 \t
分開。讀者也可以根據自己存放數據的方式修改以下函數。
Urbansound8K 是目前應用較為廣泛的用於自動城市環境聲分類研究的公共數據集,包含10個分類:空調聲、汽車鳴笛聲、兒童玩耍聲、狗叫聲、鉆孔聲、引擎空轉聲、槍聲、手提鉆、警笛聲和街道音樂聲。數據集下載地址:https://zenodo.org/record/1203745/files/UrbanSound8K.tar.gz。以下是針對Urbansound8K生成數據列表的函數。如果讀者想使用該數據集,請下載並解壓到 dataset
目錄下,把生成數據列表代碼改為以下代碼。
# 生成數據列表 def get_data_list(audio_path, list_path): sound_sum = 0 audios = os.listdir(audio_path) f_train = open(os.path.join(list_path, 'train_list.txt'), 'w') f_test = open(os.path.join(list_path, 'test_list.txt'), 'w') for i in range(len(audios)): sounds = os.listdir(os.path.join(audio_path, audios[i])) for sound in sounds: if '.wav' not in sound:continue sound_path = os.path.join(audio_path, audios[i], sound) t = librosa.get_duration(filename=sound_path) # 過濾小於2.1秒的音頻 if t >= 2.1: if sound_sum % 100 == 0: f_test.write('%s\t%d\n' % (sound_path, i)) else: f_train.write('%s\t%d\n' % (sound_path, i)) sound_sum += 1 print("Audio:%d/%d" % (i + 1, len(audios))) f_test.close() f_train.close() if __name__ == '__main__': get_data_list('dataset/UrbanSound8K/audio', 'dataset')
創建 reader.py
用於在訓練時讀取數據。編寫一個 CustomDataset
類,用讀取上一步生成的數據列表。
class CustomDataset(Dataset): def __init__(self, data_list_path, model='train', sr=16000, chunk_duration=3): super(CustomDataset, self).__init__() with open(data_list_path, 'r') as f: self.lines = f.readlines() self.model = model self.sr = sr self.chunk_duration = chunk_duration def __getitem__(self, idx): try: audio_path, label = self.lines[idx].replace('\n', '').split('\t') spec_mag = load_audio(audio_path, mode=self.model, sr=self.sr, chunk_duration=self.chunk_duration) return spec_mag, np.array(int(label), dtype=np.int64) except Exception as ex: print(f"[{datetime.now()}] 數據: {self.lines[idx]} 出錯,錯誤信息: {ex}", file=sys.stderr) rnd_idx = np.random.randint(self.__len__()) return self.__getitem__(rnd_idx) def __len__(self): return len(self.lines)
下面是在訓練時或者測試時讀取音頻數據,訓練時對轉換的梅爾頻譜數據隨機裁剪,如果是測試,就取前面的,最好要執行歸一化。
def load_audio(audio_path, mode='train', sr=16000, chunk_duration=3): # 讀取音頻數據 wav, sr_ret = librosa.load(audio_path, sr=sr) if mode == 'train': # 隨機裁剪 num_wav_samples = wav.shape[0] # 數據太短不利於訓練 if num_wav_samples < sr: raise Exception(f'音頻長度不能小於1s,實際長度為:{(num_wav_samples / sr):.2f}s') num_chunk_samples = int(chunk_duration * sr) if num_wav_samples > num_chunk_samples + 1: start = random.randint(0, num_wav_samples - num_chunk_samples - 1) stop = start + num_chunk_samples wav = wav[start:stop] # 對每次都滿長度的再次裁剪 if random.random() > 0.5: wav[:random.randint(1, sr // 2)] = 0 wav = wav[:-random.randint(1, sr // 2)] elif mode == 'eval': # 為避免顯存溢出,隻裁剪指定長度 num_wav_samples = wav.shape[0] num_chunk_samples = int(chunk_duration * sr) if num_wav_samples > num_chunk_samples + 1: wav = wav[:num_chunk_samples] features = librosa.feature.melspectrogram(y=wav, sr=sr, n_fft=400, n_mels=80, hop_length=160, win_length=400) features = librosa.power_to_db(features, ref=1.0, amin=1e-10, top_db=None) # 歸一化 mean = np.mean(features, 0, keepdims=True) std = np.std(features, 0, keepdims=True) features = (features - mean) / (std + 1e-5) features = features.astype('float32') return features
訓練
接著就可以開始訓練模型瞭,創建 train.py
。我們搭建簡單的卷積神經網絡,如果音頻種類非常多,可以適當使用更大的卷積神經網絡模型。通過把音頻數據轉換成梅爾頻譜。然後定義優化方法和獲取訓練和測試數據。要註意 args.num_classes
參數的值,這個是類別的數量,要根據你數據集中的分類數量來修改。
def train(args): # 獲取數據 train_dataset = CustomDataset(args.train_list_path, model='train') train_loader = DataLoader(dataset=train_dataset, batch_size=args.batch_size, shuffle=True, collate_fn=collate_fn, num_workers=args.num_workers) test_dataset = CustomDataset(args.test_list_path, model='eval') test_loader = DataLoader(dataset=test_dataset, batch_size=args.batch_size, collate_fn=collate_fn, num_workers=args.num_workers) # 獲取分類標簽 with open(args.label_list_path, 'r', encoding='utf-8') as f: lines = f.readlines() class_labels = [l.replace('\n', '') for l in lines] # 獲取模型 device = torch.device("cuda") model = EcapaTdnn(num_classes=args.num_classes) model.to(device) # 獲取優化方法 optimizer = torch.optim.Adam(params=model.parameters(), lr=args.learning_rate, weight_decay=5e-4) # 獲取學習率衰減函數 scheduler = CosineAnnealingLR(optimizer, T_max=args.num_epoch) # 恢復訓練 if args.resume is not None: model.load_state_dict(torch.load(os.path.join(args.resume, 'model.pth'))) state = torch.load(os.path.join(args.resume, 'model.state')) last_epoch = state['last_epoch'] optimizer_state = torch.load(os.path.join(args.resume, 'optimizer.pth')) optimizer.load_state_dict(optimizer_state) print(f'成功加載第 {last_epoch} 輪的模型參數和優化方法參數') # 獲取損失函數 loss = torch.nn.CrossEntropyLoss()
最後執行訓練,每100個batch打印一次訓練日志,訓練一輪之後執行測試和保存模型,在測試時,把每個batch的輸出都統計,最後求平均值。
for epoch in range(args.num_epoch): loss_sum = [] accuracies = [] for batch_id, (spec_mag, label) in enumerate(train_loader): spec_mag = spec_mag.to(device) label = label.to(device).long() output = model(spec_mag) # 計算損失值 los = loss(output, label) optimizer.zero_grad() los.backward() optimizer.step() # 計算準確率 output = torch.nn.functional.softmax(output, dim=-1) output = output.data.cpu().numpy() output = np.argmax(output, axis=1) label = label.data.cpu().numpy() acc = np.mean((output == label).astype(int)) accuracies.append(acc) loss_sum.append(los) if batch_id % 100 == 0: print(f'[{datetime.now()}] Train epoch [{epoch}/{args.num_epoch}], batch: {batch_id}/{len(train_loader)}, ' f'lr: {scheduler.get_last_lr()[0]:.8f}, loss: {sum(loss_sum) / len(loss_sum):.8f}, ' f'accuracy: {sum(accuracies) / len(accuracies):.8f}') scheduler.step()
每輪訓練結束之後都會執行一次評估,和保存模型。評估會出來輸出準確率,還保存瞭混合矩陣圖片,如下。
預測
在訓練結束之後,我們得到瞭一個模型參數文件,我們使用這個模型預測音頻,在執行預測之前,需要把音頻轉換為梅爾頻譜數據,最後輸出的結果即為預測概率最大的標簽。
parser = argparse.ArgumentParser(description=__doc__) add_arg = functools.partial(add_arguments, argparser=parser) add_arg('audio_path', str, 'dataset/UrbanSound8K/audio/fold5/156634-5-2-5.wav', '圖片路徑') add_arg('num_classes', int, 10, '分類的類別數量') add_arg('label_list_path', str, 'dataset/label_list.txt', '標簽列表路徑') add_arg('model_path', str, 'models/model.pth', '模型保存的路徑') args = parser.parse_args() # 獲取分類標簽 with open(args.label_list_path, 'r', encoding='utf-8') as f: lines = f.readlines() class_labels = [l.replace('\n', '') for l in lines] # 獲取模型 device = torch.device("cuda") model = EcapaTdnn(num_classes=args.num_classes) model.to(device) model.load_state_dict(torch.load(args.model_path)) model.eval() def infer(): data = load_audio(args.audio_path, mode='infer') data = data[np.newaxis, :] data = torch.tensor(data, dtype=torch.float32, device=device) # 執行預測 output = model(data) result = torch.nn.functional.softmax(output, dim=-1) result = result.data.cpu().numpy() # 顯示圖片並輸出結果最大的label lab = np.argsort(result)[0][-1] print(f'音頻:{args.audio_path} 的預測結果標簽為:{class_labels[lab]}') if __name__ == '__main__': infer()
其他
為瞭方便讀取錄制數據和制作數據集,這裡提供瞭兩個程序,首先是 record_audio.py
,這個用於錄制音頻,錄制的音頻幀率為44100,通道為1,16bit。
import pyaudio import wave import uuid from tqdm import tqdm import os s = input('請輸入你計劃錄音多少秒:') CHUNK = 1024 FORMAT = pyaudio.paInt16 CHANNELS = 1 RATE = 44100 RECORD_SECONDS = int(s) WAVE_OUTPUT_FILENAME = "save_audio/%s.wav" % str(uuid.uuid1()).replace('-', '') p = pyaudio.PyAudio() stream = p.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK) print("開始錄音, 請說話......") frames = [] for i in tqdm(range(0, int(RATE / CHUNK * RECORD_SECONDS))): data = stream.read(CHUNK) frames.append(data) print("錄音已結束!") stream.stop_stream() stream.close() p.terminate() if not os.path.exists('save_audio'): os.makedirs('save_audio') wf = wave.open(WAVE_OUTPUT_FILENAME, 'wb') wf.setnchannels(CHANNELS) wf.setsampwidth(p.get_sample_size(FORMAT)) wf.setframerate(RATE) wf.writeframes(b''.join(frames)) wf.close() print('文件保存在:%s' % WAVE_OUTPUT_FILENAME) os.system('pause')
創建 crop_audio.py
,在訓練是隻是裁剪前面的3秒的音頻,所以我們要把錄制的硬盤安裝每3秒裁剪一段,把裁剪後音頻存放在音頻名稱命名的文件夾中。最後把這些文件按照訓練數據的要求創建數據列表和訓練數據。
import os import uuid import wave from pydub import AudioSegment # 按秒截取音頻 def get_part_wav(sound, start_time, end_time, part_wav_path): save_path = os.path.dirname(part_wav_path) if not os.path.exists(save_path): os.makedirs(save_path) start_time = int(start_time) * 1000 end_time = int(end_time) * 1000 word = sound[start_time:end_time] word.export(part_wav_path, format="wav") def crop_wav(path, crop_len): for src_wav_path in os.listdir(path): wave_path = os.path.join(path, src_wav_path) print(wave_path[-4:]) if wave_path[-4:] != '.wav': continue file = wave.open(wave_path) # 幀總數 a = file.getparams().nframes # 采樣頻率 f = file.getparams().framerate # 獲取音頻時間長度 t = int(a / f) print('總時長為 %d s' % t) # 讀取語音 sound = AudioSegment.from_wav(wave_path) for start_time in range(0, t, crop_len): save_path = os.path.join(path, os.path.basename(wave_path)[:-4], str(uuid.uuid1()) + '.wav') get_part_wav(sound, start_time, start_time + crop_len, save_path) if __name__ == '__main__': crop_len = 3 crop_wav('save_audio', crop_len)
創建 infer_record.py
,這個程序是用來不斷進行錄音識別,錄音時間之所以設置為6秒,所以我們可以大致理解為這個程序在實時錄音識別。通過這個應該我們可以做一些比較有趣的事情,比如把麥克風放在小鳥經常來的地方,通過實時錄音識別,一旦識別到有鳥叫的聲音,如果你的數據集足夠強大,有每種鳥叫的聲音數據集,這樣你還能準確識別是那種鳥叫。如果識別到目標鳥類,就啟動程序,例如拍照等等。
# 錄音參數 CHUNK = 1024 FORMAT = pyaudio.paInt16 CHANNELS = 1 RATE = 44100 RECORD_SECONDS = 6 WAVE_OUTPUT_FILENAME = "infer_audio.wav" # 打開錄音 p = pyaudio.PyAudio() stream = p.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK) # 獲取錄音數據 def record_audio(): print("開始錄音......") frames = [] for i in range(0, int(RATE / CHUNK * RECORD_SECONDS)): data = stream.read(CHUNK) frames.append(data) print("錄音已結束!") wf = wave.open(WAVE_OUTPUT_FILENAME, 'wb') wf.setnchannels(CHANNELS) wf.setsampwidth(p.get_sample_size(FORMAT)) wf.setframerate(RATE) wf.writeframes(b''.join(frames)) wf.close() return WAVE_OUTPUT_FILENAME # 預測 def infer(audio_path): data = load_audio(audio_path, mode='infer') data = data[np.newaxis, :] data = torch.tensor(data, dtype=torch.float32, device=device) # 執行預測 output = model(data) result = torch.nn.functional.softmax(output, dim=-1) result = result.data.cpu().numpy() # 顯示圖片並輸出結果最大的label lab = np.argsort(result)[0][-1] return class_labels[lab] if __name__ == '__main__': try: while True: # 加載數據 audio_path = record_audio() # 獲取預測結果 label = infer(audio_path) print(f'預測的標簽為:{label}') except Exception as e: print(e) stream.stop_stream() stream.close() p.terminate()
總結
到此這篇關於基於Pytorch實現聲音分類的文章就介紹到這瞭,更多相關Pytorch實現聲音分類內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!
推薦閱讀:
- python PaddleSpeech實現嬰兒啼哭識別
- python 錄制系統聲音的示例
- 詳解pytorch的多GPU訓練的兩種方式
- pytorch 實現L2和L1正則化regularization的操作
- pytorch中.to(device) 和.cuda()的區別說明