基於Pytorch實現的聲音分類實例代碼

前言

本章我們來介紹如何使用Pytorch訓練一個區分不同音頻的分類模型,例如你有這樣一個需求,需要根據不同的鳥叫聲識別是什麼種類的鳥,這時你就可以使用這個方法來實現你的需求瞭。

源碼地址:https://github.com/yeyupiaoling/AudioClassification-Pytorch

環境準備

主要介紹libsora,PyAudio,pydub的安裝,其他的依賴包根據需要自行安裝。

  • Python 3.7
  • Pytorch 1.10.0

安裝libsora

最簡單的方式就是使用pip命令安裝,如下:

pip install pytest-runner
pip install librosa==0.9.1

註意: 如果pip命令安裝不成功,那就使用源碼安裝,下載源碼:https://github.com/librosa/librosa/releases/, windows的可以下載zip壓縮包,方便解壓。

pip install pytest-runner
tar xzf librosa-<版本號>.tar.gz 或者 unzip librosa-<版本號>.tar.gz
cd librosa-<版本號>/
python setup.py install

如果出現 libsndfile64bit.dll': error 0x7e錯誤,請指定安裝版本0.6.3,如 pip install librosa==0.6.3

安裝ffmpeg, 下載地址:http://blog.gregzaal.com/how-to-install-ffmpeg-on-windows/,筆者下載的是64位,static版。
然後到C盤,筆者解壓,修改文件名為 ffmpeg,存放在 C:\Program Files\目錄下,並添加環境變量 C:\Program Files\ffmpeg\bin

最後修改源碼,路徑為 C:\Python3.7\Lib\site-packages\audioread\ffdec.py,修改32行代碼,如下:

COMMANDS = ('C:\\Program Files\\ffmpeg\\bin\\ffmpeg.exe', 'avconv')

安裝PyAudio

使用pip安裝命令,如下:

pip install pyaudio

在安裝的時候需要使用到C++庫進行編譯,如果讀者的系統是windows,Python是3.7,可以在這裡下載whl安裝包,下載地址:https://github.com/intxcc/pyaudio_portaudio/releases

安裝pydub

使用pip命令安裝,如下:

pip install pydub

訓練分類模型

把音頻轉換成訓練數據最重要的是使用瞭librosa,使用librosa可以很方便得到音頻的梅爾頻譜(Mel Spectrogram),使用的API為 librosa.feature.melspectrogram(),輸出的是numpy值。關於梅爾頻譜具體信息讀者可以自行瞭解,跟梅爾頻譜同樣很重要的梅爾倒譜(MFCCs)更多用於語音識別中,對應的API為 librosa.feature.mfcc()。同樣以下的代碼,就可以獲取到音頻的梅爾頻譜。

wav, sr = librosa.load(data_path, sr=16000)
features = librosa.feature.melspectrogram(y=wav, sr=sr, n_fft=400, n_mels=80, hop_length=160, win_length=400)
features = librosa.power_to_db(features, ref=1.0, amin=1e-10, top_db=None)

生成數據列表

生成數據列表,用於下一步的讀取需要,audio_path為音頻文件路徑,用戶需要提前把音頻數據集存放在dataset/audio目錄下,每個文件夾存放一個類別的音頻數據,每條音頻數據長度在3秒以上,如 dataset/audio/鳥叫聲/······audio是數據列表存放的位置,生成的數據類別的格式為 音頻路徑\t音頻對應的類別標簽,音頻路徑和標簽用制表符 \t分開。讀者也可以根據自己存放數據的方式修改以下函數。

Urbansound8K 是目前應用較為廣泛的用於自動城市環境聲分類研究的公共數據集,包含10個分類:空調聲、汽車鳴笛聲、兒童玩耍聲、狗叫聲、鉆孔聲、引擎空轉聲、槍聲、手提鉆、警笛聲和街道音樂聲。數據集下載地址:https://zenodo.org/record/1203745/files/UrbanSound8K.tar.gz。以下是針對Urbansound8K生成數據列表的函數。如果讀者想使用該數據集,請下載並解壓到 dataset目錄下,把生成數據列表代碼改為以下代碼。

# 生成數據列表
def get_data_list(audio_path, list_path):
    sound_sum = 0
    audios = os.listdir(audio_path)

    f_train = open(os.path.join(list_path, 'train_list.txt'), 'w')
    f_test = open(os.path.join(list_path, 'test_list.txt'), 'w')

    for i in range(len(audios)):
        sounds = os.listdir(os.path.join(audio_path, audios[i]))
        for sound in sounds:
            if '.wav' not in sound:continue
            sound_path = os.path.join(audio_path, audios[i], sound)
            t = librosa.get_duration(filename=sound_path)
            # 過濾小於2.1秒的音頻
            if t >= 2.1:
                if sound_sum % 100 == 0:
                    f_test.write('%s\t%d\n' % (sound_path, i))
                else:
                    f_train.write('%s\t%d\n' % (sound_path, i))
                sound_sum += 1
        print("Audio:%d/%d" % (i + 1, len(audios)))

    f_test.close()
    f_train.close()


if __name__ == '__main__':
    get_data_list('dataset/UrbanSound8K/audio', 'dataset')

創建 reader.py用於在訓練時讀取數據。編寫一個 CustomDataset類,用讀取上一步生成的數據列表。

class CustomDataset(Dataset):
    def __init__(self, data_list_path, model='train', sr=16000, chunk_duration=3):
        super(CustomDataset, self).__init__()
        with open(data_list_path, 'r') as f:
            self.lines = f.readlines()
        self.model = model
        self.sr = sr
        self.chunk_duration = chunk_duration

    def __getitem__(self, idx):
        try:
            audio_path, label = self.lines[idx].replace('\n', '').split('\t')
            spec_mag = load_audio(audio_path, mode=self.model, sr=self.sr, chunk_duration=self.chunk_duration)
            return spec_mag, np.array(int(label), dtype=np.int64)
        except Exception as ex:
            print(f"[{datetime.now()}] 數據: {self.lines[idx]} 出錯,錯誤信息: {ex}", file=sys.stderr)
            rnd_idx = np.random.randint(self.__len__())
            return self.__getitem__(rnd_idx)

    def __len__(self):
        return len(self.lines)

下面是在訓練時或者測試時讀取音頻數據,訓練時對轉換的梅爾頻譜數據隨機裁剪,如果是測試,就取前面的,最好要執行歸一化。

def load_audio(audio_path, mode='train', sr=16000, chunk_duration=3):
    # 讀取音頻數據
    wav, sr_ret = librosa.load(audio_path, sr=sr)
    if mode == 'train':
        # 隨機裁剪
        num_wav_samples = wav.shape[0]
        # 數據太短不利於訓練
        if num_wav_samples < sr:
            raise Exception(f'音頻長度不能小於1s,實際長度為:{(num_wav_samples / sr):.2f}s')
        num_chunk_samples = int(chunk_duration * sr)
        if num_wav_samples > num_chunk_samples + 1:
            start = random.randint(0, num_wav_samples - num_chunk_samples - 1)
            stop = start + num_chunk_samples
            wav = wav[start:stop]
            # 對每次都滿長度的再次裁剪
            if random.random() > 0.5:
                wav[:random.randint(1, sr // 2)] = 0
                wav = wav[:-random.randint(1, sr // 2)]
    elif mode == 'eval':
        # 為避免顯存溢出,隻裁剪指定長度
        num_wav_samples = wav.shape[0]
        num_chunk_samples = int(chunk_duration * sr)
        if num_wav_samples > num_chunk_samples + 1:
            wav = wav[:num_chunk_samples]
    features = librosa.feature.melspectrogram(y=wav, sr=sr, n_fft=400, n_mels=80, hop_length=160, win_length=400)
    features = librosa.power_to_db(features, ref=1.0, amin=1e-10, top_db=None)
    # 歸一化
    mean = np.mean(features, 0, keepdims=True)
    std = np.std(features, 0, keepdims=True)
    features = (features - mean) / (std + 1e-5)
    features = features.astype('float32')
    return features

訓練

接著就可以開始訓練模型瞭,創建 train.py。我們搭建簡單的卷積神經網絡,如果音頻種類非常多,可以適當使用更大的卷積神經網絡模型。通過把音頻數據轉換成梅爾頻譜。然後定義優化方法和獲取訓練和測試數據。要註意 args.num_classes參數的值,這個是類別的數量,要根據你數據集中的分類數量來修改。

def train(args):
    # 獲取數據
    train_dataset = CustomDataset(args.train_list_path, model='train')
    train_loader = DataLoader(dataset=train_dataset, batch_size=args.batch_size, shuffle=True, collate_fn=collate_fn, num_workers=args.num_workers)

    test_dataset = CustomDataset(args.test_list_path, model='eval')
    test_loader = DataLoader(dataset=test_dataset, batch_size=args.batch_size, collate_fn=collate_fn, num_workers=args.num_workers)
    # 獲取分類標簽
    with open(args.label_list_path, 'r', encoding='utf-8') as f:
        lines = f.readlines()
        class_labels = [l.replace('\n', '') for l in lines]
    # 獲取模型
    device = torch.device("cuda")
    model = EcapaTdnn(num_classes=args.num_classes)
    model.to(device)

    # 獲取優化方法
    optimizer = torch.optim.Adam(params=model.parameters(),
                                 lr=args.learning_rate,
                                 weight_decay=5e-4)
    # 獲取學習率衰減函數
    scheduler = CosineAnnealingLR(optimizer, T_max=args.num_epoch)

    # 恢復訓練
    if args.resume is not None:
        model.load_state_dict(torch.load(os.path.join(args.resume, 'model.pth')))
        state = torch.load(os.path.join(args.resume, 'model.state'))
        last_epoch = state['last_epoch']
        optimizer_state = torch.load(os.path.join(args.resume, 'optimizer.pth'))
        optimizer.load_state_dict(optimizer_state)
        print(f'成功加載第 {last_epoch} 輪的模型參數和優化方法參數')

    # 獲取損失函數
    loss = torch.nn.CrossEntropyLoss()

最後執行訓練,每100個batch打印一次訓練日志,訓練一輪之後執行測試和保存模型,在測試時,把每個batch的輸出都統計,最後求平均值。

    for epoch in range(args.num_epoch):
        loss_sum = []
        accuracies = []
        for batch_id, (spec_mag, label) in enumerate(train_loader):
            spec_mag = spec_mag.to(device)
            label = label.to(device).long()
            output = model(spec_mag)
            # 計算損失值
            los = loss(output, label)
            optimizer.zero_grad()
            los.backward()
            optimizer.step()

            # 計算準確率
            output = torch.nn.functional.softmax(output, dim=-1)
            output = output.data.cpu().numpy()
            output = np.argmax(output, axis=1)
            label = label.data.cpu().numpy()
            acc = np.mean((output == label).astype(int))
            accuracies.append(acc)
            loss_sum.append(los)
            if batch_id % 100 == 0:
                print(f'[{datetime.now()}] Train epoch [{epoch}/{args.num_epoch}], batch: {batch_id}/{len(train_loader)}, '
                      f'lr: {scheduler.get_last_lr()[0]:.8f}, loss: {sum(loss_sum) / len(loss_sum):.8f}, '
                      f'accuracy: {sum(accuracies) / len(accuracies):.8f}')
        scheduler.step()

每輪訓練結束之後都會執行一次評估,和保存模型。評估會出來輸出準確率,還保存瞭混合矩陣圖片,如下。

預測

在訓練結束之後,我們得到瞭一個模型參數文件,我們使用這個模型預測音頻,在執行預測之前,需要把音頻轉換為梅爾頻譜數據,最後輸出的結果即為預測概率最大的標簽。

parser = argparse.ArgumentParser(description=__doc__)
add_arg = functools.partial(add_arguments, argparser=parser)
add_arg('audio_path',       str,    'dataset/UrbanSound8K/audio/fold5/156634-5-2-5.wav', '圖片路徑')
add_arg('num_classes',      int,    10,                        '分類的類別數量')
add_arg('label_list_path',  str,    'dataset/label_list.txt',  '標簽列表路徑')
add_arg('model_path',       str,    'models/model.pth',        '模型保存的路徑')
args = parser.parse_args()


# 獲取分類標簽
with open(args.label_list_path, 'r', encoding='utf-8') as f:
    lines = f.readlines()
class_labels = [l.replace('\n', '') for l in lines]
# 獲取模型
device = torch.device("cuda")
model = EcapaTdnn(num_classes=args.num_classes)
model.to(device)
model.load_state_dict(torch.load(args.model_path))
model.eval()


def infer():
    data = load_audio(args.audio_path, mode='infer')
    data = data[np.newaxis, :]
    data = torch.tensor(data, dtype=torch.float32, device=device)
    # 執行預測
    output = model(data)
    result = torch.nn.functional.softmax(output, dim=-1)
    result = result.data.cpu().numpy()
    # 顯示圖片並輸出結果最大的label
    lab = np.argsort(result)[0][-1]
    print(f'音頻:{args.audio_path} 的預測結果標簽為:{class_labels[lab]}')


if __name__ == '__main__':
    infer()

其他

為瞭方便讀取錄制數據和制作數據集,這裡提供瞭兩個程序,首先是 record_audio.py,這個用於錄制音頻,錄制的音頻幀率為44100,通道為1,16bit。

import pyaudio
import wave
import uuid
from tqdm import tqdm
import os

s = input('請輸入你計劃錄音多少秒:')

CHUNK = 1024
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 44100
RECORD_SECONDS = int(s)
WAVE_OUTPUT_FILENAME = "save_audio/%s.wav" % str(uuid.uuid1()).replace('-', '')

p = pyaudio.PyAudio()

stream = p.open(format=FORMAT,
                channels=CHANNELS,
                rate=RATE,
                input=True,
                frames_per_buffer=CHUNK)

print("開始錄音, 請說話......")

frames = []

for i in tqdm(range(0, int(RATE / CHUNK * RECORD_SECONDS))):
    data = stream.read(CHUNK)
    frames.append(data)

print("錄音已結束!")

stream.stop_stream()
stream.close()
p.terminate()

if not os.path.exists('save_audio'):
    os.makedirs('save_audio')

wf = wave.open(WAVE_OUTPUT_FILENAME, 'wb')
wf.setnchannels(CHANNELS)
wf.setsampwidth(p.get_sample_size(FORMAT))
wf.setframerate(RATE)
wf.writeframes(b''.join(frames))
wf.close()

print('文件保存在:%s' % WAVE_OUTPUT_FILENAME)
os.system('pause')

創建 crop_audio.py,在訓練是隻是裁剪前面的3秒的音頻,所以我們要把錄制的硬盤安裝每3秒裁剪一段,把裁剪後音頻存放在音頻名稱命名的文件夾中。最後把這些文件按照訓練數據的要求創建數據列表和訓練數據。

import os
import uuid
import wave
from pydub import AudioSegment


# 按秒截取音頻
def get_part_wav(sound, start_time, end_time, part_wav_path):
    save_path = os.path.dirname(part_wav_path)
    if not os.path.exists(save_path):
        os.makedirs(save_path)
    start_time = int(start_time) * 1000
    end_time = int(end_time) * 1000
    word = sound[start_time:end_time]
    word.export(part_wav_path, format="wav")


def crop_wav(path, crop_len):
    for src_wav_path in os.listdir(path):
        wave_path = os.path.join(path, src_wav_path)
        print(wave_path[-4:])
        if wave_path[-4:] != '.wav':
            continue
        file = wave.open(wave_path)
        # 幀總數
        a = file.getparams().nframes
        # 采樣頻率
        f = file.getparams().framerate
        # 獲取音頻時間長度
        t = int(a / f)
        print('總時長為 %d s' % t)
        # 讀取語音
        sound = AudioSegment.from_wav(wave_path)
        for start_time in range(0, t, crop_len):
            save_path = os.path.join(path, os.path.basename(wave_path)[:-4], str(uuid.uuid1()) + '.wav')
            get_part_wav(sound, start_time, start_time + crop_len, save_path)


if __name__ == '__main__':
    crop_len = 3
    crop_wav('save_audio', crop_len)

創建 infer_record.py,這個程序是用來不斷進行錄音識別,錄音時間之所以設置為6秒,所以我們可以大致理解為這個程序在實時錄音識別。通過這個應該我們可以做一些比較有趣的事情,比如把麥克風放在小鳥經常來的地方,通過實時錄音識別,一旦識別到有鳥叫的聲音,如果你的數據集足夠強大,有每種鳥叫的聲音數據集,這樣你還能準確識別是那種鳥叫。如果識別到目標鳥類,就啟動程序,例如拍照等等。

# 錄音參數
CHUNK = 1024
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 44100
RECORD_SECONDS = 6
WAVE_OUTPUT_FILENAME = "infer_audio.wav"

# 打開錄音
p = pyaudio.PyAudio()
stream = p.open(format=FORMAT,
                channels=CHANNELS,
                rate=RATE,
                input=True,
                frames_per_buffer=CHUNK)

# 獲取錄音數據
def record_audio():
    print("開始錄音......")

    frames = []
    for i in range(0, int(RATE / CHUNK * RECORD_SECONDS)):
        data = stream.read(CHUNK)
        frames.append(data)

    print("錄音已結束!")

    wf = wave.open(WAVE_OUTPUT_FILENAME, 'wb')
    wf.setnchannels(CHANNELS)
    wf.setsampwidth(p.get_sample_size(FORMAT))
    wf.setframerate(RATE)
    wf.writeframes(b''.join(frames))
    wf.close()
    return WAVE_OUTPUT_FILENAME


# 預測
def infer(audio_path):
    data = load_audio(audio_path, mode='infer')
    data = data[np.newaxis, :]
    data = torch.tensor(data, dtype=torch.float32, device=device)
    # 執行預測
    output = model(data)
    result = torch.nn.functional.softmax(output, dim=-1)
    result = result.data.cpu().numpy()
    # 顯示圖片並輸出結果最大的label
    lab = np.argsort(result)[0][-1]
    return class_labels[lab]


if __name__ == '__main__':
    try:
        while True:
            # 加載數據
            audio_path = record_audio()
            # 獲取預測結果
            label = infer(audio_path)
            print(f'預測的標簽為:{label}')
    except Exception as e:
        print(e)
        stream.stop_stream()
        stream.close()
        p.terminate()

總結

到此這篇關於基於Pytorch實現聲音分類的文章就介紹到這瞭,更多相關Pytorch實現聲音分類內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: