基於OpenCV目標跟蹤實現人員計數器

在本教程中,您將學習如何使用 OpenCV 和 Python 構建人員計數器。使用 OpenCV,我們將實時計算進或出百貨商店的人數。

在今天博客文章的第一部分,我們將討論如何利用兩者來創建更準確的人員計數器。之後,我們將查看項目的目錄結構,然後實施整個人員計數項目。最後,我們將檢查將 OpenCV 的人數統計應用到實際視頻中的結果。

1.瞭解對象檢測與對象跟蹤

在繼續本教程的其餘部分之前,您必須瞭解對象檢測和對象跟蹤之間的根本區別。

當我們應用對象檢測時,我們是在確定一個對象在圖像/幀中的位置。與目標跟蹤算法相比,目標檢測器通常在計算上更昂貴,因此也更慢。目標檢測算法的例子包括Haar級聯、HOG +線性支持向量機(HOG + Linear SVM)和基於深度學習的目標檢測器,如Faster R-CNN、YOLO和Single Shot檢測器(SSD)。

另一方面,對象跟蹤器將接受對象在圖像中位置的輸入 (x, y) 坐標,並將:

1.為該特定對象分配唯一 ID

2.在對象圍繞視頻流移動時跟蹤對象,根據幀的各種屬性(梯度、光流等)預測下一幀中的新對象位置

對象跟蹤算法的示例包括 MedianFlow、MOSSE、GOTURN、核化相關濾波器和判別相關濾波器等。

2.結合對象檢測和對象跟蹤

高精度目標跟蹤器將目標檢測和目標跟蹤的概念結合到一個算法中,通常分為兩個階段:

1.階段1 檢測:在檢測階段,我們正在運行計算成本更高的對象跟蹤器,以 (1) 檢測是否有新對象進入我們的視野,以及 (2) 看看我們是否可以找到在跟蹤階段“丟失”的對象。對於每個檢測到的對象,我們使用新的邊界框坐標創建或更新對象跟蹤器。由於我們的目標檢測器的計算成本更高,我們每 N 幀隻運行一次此階段。

2.階段2 跟蹤:當我們不處於“檢測”階段時,我們處於“跟蹤”階段。對於我們檢測到的每個對象,我們創建一個對象跟蹤器來跟蹤對象在框架周圍的移動。我們的目標跟蹤器應該比目標檢測器更快、更高效。我們將繼續跟蹤,直到我們到達第 N 幀,然後重新運行我們的目標檢測器。然後重復整個過程。

這種混合方法的好處是我們可以應用高度準確的對象檢測方法,而無需太多的計算負擔。我們將實施這樣一個跟蹤系統來建立我們的人員計數器。

3.項目結構

讓我們回顧一下今天博客文章的項目結構。獲取代碼後,您可以使用 tree 命令檢查目錄結構:

最重要的兩個目錄:

1.pyimagesearch/:該模塊包含質心跟蹤算法。 “組合對象跟蹤算法”部分介紹瞭質心跟蹤算法。

2.mobilenet_ssd/:包含 Caffe 深度學習模型文件。

今天項目的核心包含在 people_counter.py 腳本中——這是我們將花費大部分時間的地方。今天我們還將回顧 trackableobject.py 腳本。

4.結合對象跟蹤算法

為瞭實現我們的人員計數器,我們將同時使用 OpenCV 和 dlib。我們將 OpenCV 用於標準的計算機視覺/圖像處理功能,以及用於人數統計的深度學習對象檢測器。

然後我們將使用 dlib 來實現相關過濾器。我們也可以在這裡使用 OpenCV;但是,對於這個項目,dlib 對象跟蹤實現更容易使用。

除瞭 dlib 的對象跟蹤實現,我們還將使用質心跟蹤實現。回顧整個質心跟蹤算法超出瞭這篇博文的范圍,但我在下面提供瞭一個簡短的概述。

在步驟#1,我們接受一組邊界框並計算它們對應的質心(即邊界框的中心):

要使用 Python 通過質心腳本構建簡單的對象跟蹤,第一步是接受邊界框坐標並使用它們來計算質心。

邊界框本身可以由以下任一方式提供:

1.目標檢測器(如 HOG + Linear SVM、Faster R-CNN、SSDs 等)

2.或對象跟蹤器(例如相關過濾器)

在上圖中,您可以看到我們在算法的初始迭代中有兩個對象要跟蹤。

在步驟#2中,我們計算任何新質心(黃色)和現有質心(紫色)之間的歐幾裡得距離:

此圖像中存在三個對象。我們需要計算每對原始質心(紫色)和新質心(黃色)之間的歐幾裡得距離。

質心跟蹤算法假設它們之間具有最小歐幾裡德距離的質心對必須是相同的對象 ID。

在上面的示例圖像中,我們有兩個現有的質心(紫色)和三個新的質心(黃色),這意味著已經檢測到一個新對象(因為與舊質心相比,還有一個新質心)。

然後箭頭表示計算所有紫色質心和所有黃色質心之間的歐幾裡得距離。一旦我們有瞭歐幾裡得距離,我們就會在步驟#3 中嘗試關聯對象 ID:

您可以看到我們的質心跟蹤器已選擇關聯使它們各自的歐幾裡得距離最小化的質心。但是左下角的點呢?它沒有與任何東西相關聯——我們該怎麼辦? 要回答這個問題,我們需要執行步驟#4,註冊新對象:

註冊意味著我們通過以下方式將新對象添加到我們的跟蹤對象列表中:

1.為其分配一個新的對象 ID

2.存儲新對象的邊界框坐標的質心

如果對象丟失或離開視野,我們可以簡單地取消註冊對象(步驟#5)。

5.創建可追蹤對象

為瞭跟蹤和計算視頻流中的對象,我們需要一種簡單的方法來存儲有關對象本身的信息,包括:

  • 對象ID
  • 以前的質心(所以我們可以很容易地計算出物體移動的方向)
  • 對象是否已被計數

為瞭實現所有這些目標,我們可以定義一個 TrackableObject 實例——打開 trackableobject.py 文件並插入以下代碼:

class TrackableObject:
    def __init__(self, objectID, centroid):
        # store the object ID, then initialize a list of centroids
        # using the current centroid
        self.objectID = objectID
        self.centroids = [centroid]
        # initialize a boolean used to indicate if the object has
        # already been counted or not
        self.counted = False

TrackableObject 構造函數接受 objectID + centroid 並存儲它們。 centroids 變量是一個列表,因為它將包含對象的質心位置歷史記錄。 構造函數還將 counted 初始化為 False ,表示該對象還沒有被計數。

6.使用 OpenCV + Python 實現我們的人員計數器

# import the necessary packages
from pyimagesearch.centroidtracker import CentroidTracker
from pyimagesearch.trackableobject import TrackableObject
from imutils.video import VideoStream
from imutils.video import FPS
import numpy as np
import argparse
import imutils
import time
import dlib
import cv2

我們首先導入必要的包:

  • 從 pyimagesearch 模塊,我們導入自定義的 CentroidTracker 和 TrackableObject 類。
  • imutils.video 中的 VideoStream 和 FPS 模塊將幫助我們使用網絡攝像頭並計算估計的每秒幀數 (FPS) 吞吐率。
  • 我們需要 imutils 的 OpenCV 便利功能。
  • dlib 庫將用於其相關跟蹤器實現。
  • OpenCV 將用於深度神經網絡推理、打開視頻文件、寫入視頻文件以及在我們的屏幕上顯示輸出幀。

現在所有工具都觸手可及,讓我們解析命令行參數:

# construct the argument parse and parse the arguments
ap = argparse.ArgumentParser()
ap.add_argument("-p", "--prototxt", required=True,
    help="path to Caffe 'deploy' prototxt file")
ap.add_argument("-m", "--model", required=True,
    help="path to Caffe pre-trained model")
ap.add_argument("-i", "--input", type=str,
    help="path to optional input video file")
ap.add_argument("-o", "--output", type=str,
    help="path to optional output video file")
ap.add_argument("-c", "--confidence", type=float, default=0.4,
    help="minimum probability to filter weak detections")
ap.add_argument("-s", "--skip-frames", type=int, default=30,
    help="# of skip frames between detections")
args = vars(ap.parse_args())

我們有六個命令行參數,它們允許我們在運行時從終端將信息傳遞給我們的人員計數器腳本:

  • –prototxt :Caffe 部署 prototxt 文件的路徑。
  • –model :Caffe 預訓練 CNN 模型的路徑。
  • –input : 可選的輸入視頻文件路徑。如果未指定路徑,將使用您的網絡攝像頭。
  • –output :可選的輸出視頻路徑。如果未指定路徑,則不會錄制視頻。
  • –confidence :默認值為 0.4 ,這是有助於過濾掉弱檢測的最小概率閾值。
  • –skip-frames :在跟蹤對象上再次運行我們的 DNN 檢測器之前要跳過的幀數。請記住,對象檢測的計算成本很高,但它確實有助於我們的跟蹤器重新評估幀中的

對象。默認情況下,我們在使用 OpenCV DNN 模塊和我們的 CNN 單次檢測器模型檢測對象之間跳過 30 幀。

現在我們的腳本可以在運行時動態處理命令行參數,讓我們準備我們的 SSD:

# initialize the list of class labels MobileNet SSD was trained to detect
CLASSES = ["background", "aeroplane", "bicycle", "bird", "boat",
    "bottle", "bus", "car", "cat", "chair", "cow", "diningtable",
    "dog", "horse", "motorbike", "person", "pottedplant", "sheep",
    "sofa", "train", "tvmonitor"]
# load our serialized model from disk
print("[INFO] loading model...")
net = cv2.dnn.readNetFromCaffe(args["prototxt"], args["model"])

首先,我們將初始化 CLASSES——SSD 支持的類列表。我們隻對“人”類感興趣,但您也可以計算其他移動對象。

我們加載用於檢測對象的預訓練 MobileNet SSD(但同樣,我們隻對檢測和跟蹤人感興趣,而不是任何其他類)。

我們可以初始化我們的視頻流:

# if a video path was not supplied, grab a reference to the webcam
if not args.get("input", False):
    print("[INFO] starting video stream...")
    vs = VideoStream(src=0).start()
    time.sleep(2.0)
# otherwise, grab a reference to the video file
else:
    print("[INFO] opening video file...")
    vs = cv2.VideoCapture(args["input"])

首先,我們處理使用網絡攝像頭視頻流的情況。否則,我們將從視頻文件中捕獲幀。在開始循環幀之前,我們還有一些初始化要執行:

# initialize the video writer (we'll instantiate later if need be)
writer = None
# initialize the frame dimensions (we'll set them as soon as we read
# the first frame from the video)
W = None
H = None
# instantiate our centroid tracker, then initialize a list to store
# each of our dlib correlation trackers, followed by a dictionary to
# map each unique object ID to a TrackableObject
ct = CentroidTracker(maxDisappeared=40, maxDistance=50)
trackers = []
trackableObjects = {}
# initialize the total number of frames processed thus far, along
# with the total number of objects that have moved either up or down
totalFrames = 0
totalDown = 0
totalUp = 0
# start the frames per second throughput estimator
fps = FPS().start()

其餘的初始化包括:

  • writer:我們的視頻寫入器。如果我們正在寫入視頻,我們稍後會實例化這個對象。
  • W 和 H:我們的幀尺寸。我們需要將這些插入到 cv2.VideoWriter 中。
  • ct:我們的 CentroidTracker。
  • trackers :存儲 dlib 相關跟蹤器的列表。
  • trackableObjects :將 objectID 映射到 TrackableObject 的字典。
  • totalFrames :處理的幀總數。
  • totalDown 和 totalUp :向下或向上移動的對象/人的總數。
  • fps :我們用於基準測試的每秒幀數估計器。

現在我們所有的初始化都處理好瞭,讓我們循環傳入的幀:

# loop over frames from the video stream
while True:
    # grab the next frame and handle if we are reading from either
    # VideoCapture or VideoStream
    frame = vs.read()
    frame = frame[1] if args.get("input", False) else frame
    # if we are viewing a video and we did not grab a frame then we
    # have reached the end of the video
    if args["input"] is not None and frame is None:
        break
    # resize the frame to have a maximum width of 500 pixels (the
    # less data we have, the faster we can process it), then convert
    # the frame from BGR to RGB for dlib
    frame = imutils.resize(frame, width=500)
    rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    # if the frame dimensions are empty, set them
    if W is None or H is None:
        (H, W) = frame.shape[:2]
    # if we are supposed to be writing a video to disk, initialize
    # the writer
    if args["output"] is not None and writer is None:
        fourcc = cv2.VideoWriter_fourcc(*"MJPG")
        writer = cv2.VideoWriter(args["output"], fourcc, 30,
            (W, H), True)

我們開始循環。在循環的頂部,我們抓取下一幀。如果我們已經到達視頻的結尾,我們將跳出循環。

幀進行預處理。這包括調整大小和交換顏色通道,因為 dlib 需要 rgb 圖像。我們為視頻編寫器獲取幀的尺寸。 如果通過命令行參數提供瞭輸出路徑,我們將從那裡實例化視頻編寫器。

現在讓我們使用 SSD檢測人:

    # initialize the current status along with our list of bounding
    # box rectangles returned by either (1) our object detector or
    # (2) the correlation trackers
    status = "Waiting"
    rects = []
    # check to see if we should run a more computationally expensive
    # object detection method to aid our tracker
    if totalFrames % args["skip_frames"] == 0:
        # set the status and initialize our new set of object trackers
        status = "Detecting"
        trackers = []
        # convert the frame to a blob and pass the blob through the
        # network and obtain the detections
        blob = cv2.dnn.blobFromImage(frame, 0.007843, (W, H), 127.5)
        net.setInput(blob)
        detections = net.forward()

我們將狀態初始化為Waiting。可能的狀態包括:

  • Waiting:在這種狀態下,我們正在等待檢測和跟蹤人員。
  • Detecting:我們正在使用 MobileNet SSD 檢測人員。
  • Tracking:人們在幀中被跟蹤,我們正在計算 totalUp 和 totalDown 。

我們的 rects 列表將通過檢測或跟蹤來填充。我們繼續初始化rects 。

重要的是要瞭解深度學習對象檢測器的計算成本非常高,尤其是當您在 CPU 上運行它們時。

為瞭避免在每一幀上運行我們的目標檢測器,並加快我們的跟蹤管道,我們將跳過 N 幀(由命令行參數設置 –skip-frames ,其中 30 是默認值)。隻有每 N 幀,我們才會使用 SSD 進行對象檢測。否則,我們將隻是跟蹤中間的移動對象。

使用模運算符,我們確保每 N 幀執行一次 if 語句中的代碼。 進入if語句後,我們會將狀態更新為Detecting。 然後我們初始化新的跟蹤器列表。

接下來,我們將通過對象檢測進行推理。我們首先從圖像中創建一個 blob,然後將該 blob 通過網絡傳遞以獲得檢測。 現在我們將遍歷每個檢測,希望找到屬於person類的對象:

        # loop over the detections
        for i in np.arange(0, detections.shape[2]):
            # extract the confidence (i.e., probability) associated
            # with the prediction
            confidence = detections[0, 0, i, 2]
            # filter out weak detections by requiring a minimum
            # confidence
            if confidence > args["confidence"]:
                # extract the index of the class label from the
                # detections list
                idx = int(detections[0, 0, i, 1])
                # if the class label is not a person, ignore it
                if CLASSES[idx] != "person":
                    continue

循環檢測,我們繼續獲取置信度並過濾掉那些不屬於人類的結果。

現在我們可以為每個人計算一個邊界框並開始相關性跟蹤:

                # compute the (x, y)-coordinates of the bounding box
                # for the object
                box = detections[0, 0, i, 3:7] * np.array([W, H, W, H])
                (startX, startY, endX, endY) = box.astype("int")
                # construct a dlib rectangle object from the bounding
                # box coordinates and then start the dlib correlation
                # tracker
                tracker = dlib.correlation_tracker()
                rect = dlib.rectangle(startX, startY, endX, endY)
                tracker.start_track(rgb, rect)
                # add the tracker to our list of trackers so we can
                # utilize it during skip frames
                trackers.append(tracker)

計算我們的box。 然後實例化我們的 dlib 相關跟蹤器,然後將對象的邊界框坐標傳遞給 dlib.rectangle,將結果存儲為 rect。 隨後,我們開始跟蹤,並將跟蹤器附加到跟蹤器列表中。 這是我們每 N 個跳幀執行的所有操作的封裝! 讓我們處理在 else 塊中進行跟蹤的典型操作:

    # otherwise, we should utilize our object *trackers* rather than
    # object *detectors* to obtain a higher frame processing throughput
    else:
        # loop over the trackers
        for tracker in trackers:
            # set the status of our system to be 'tracking' rather
            # than 'waiting' or 'detecting'
            status = "Tracking"
            # update the tracker and grab the updated position
            tracker.update(rgb)
            pos = tracker.get_position()
            # unpack the position object
            startX = int(pos.left())
            startY = int(pos.top())
            endX = int(pos.right())
            endY = int(pos.bottom())
            # add the bounding box coordinates to the rectangles list
            rects.append((startX, startY, endX, endY))

大多數時候,並沒有發生在跳幀倍數上。在此期間,我們將利用跟蹤器來跟蹤對象,而不是應用檢測。 我們開始遍歷可用跟蹤器。 我們繼續將狀態更新為Tracking並獲取對象位置。 我們提取位置坐標,然後在我們的 rects 列表中填充信息。 現在讓我們畫一條水平可視化線(人們必須穿過它才能被跟蹤)並使用質心跟蹤器來更新我們的對象質心:

    # draw a horizontal line in the center of the frame -- once an
    # object crosses this line we will determine whether they were
    # moving 'up' or 'down'
    cv2.line(frame, (0, H // 2), (W, H // 2), (0, 255, 255), 2)
    # use the centroid tracker to associate the (1) old object
    # centroids with (2) the newly computed object centroids
    objects = ct.update(rects)

我們畫一條水平線,我們將用它來可視化人們“越過”——一旦人們越過這條線,我們將增加各自的計數器 然後,我們利用 CentroidTracker 實例化來接受 rects 列表,無論它們是通過對象檢測還是對象跟蹤生成的。我們的質心跟蹤器會將對象 ID 與對象位置相關聯。 在下一個代碼塊中,我們將回顧一個人在幀中向上或向下移動的邏輯:

    # loop over the tracked objects
    for (objectID, centroid) in objects.items():
        # check to see if a trackable object exists for the current
        # object ID
        to = trackableObjects.get(objectID, None)
        # if there is no existing trackable object, create one
        if to is None:
            to = TrackableObject(objectID, centroid)
        # otherwise, there is a trackable object so we can utilize it
        # to determine direction
        else:
            # the difference between the y-coordinate of the *current*
            # centroid and the mean of *previous* centroids will tell
            # us in which direction the object is moving (negative for
            # 'up' and positive for 'down')
            y = [c[1] for c in to.centroids]
            direction = centroid[1] - np.mean(y)
            to.centroids.append(centroid)
            # check to see if the object has been counted or not
            if not to.counted:
                # if the direction is negative (indicating the object
                # is moving up) AND the centroid is above the center
                # line, count the object
                if direction < 0 and centroid[1] < H // 2:
                    totalUp += 1
                    to.counted = True
                # if the direction is positive (indicating the object
                # is moving down) AND the centroid is below the
                # center line, count the object
                elif direction > 0 and centroid[1] > H // 2:
                    totalDown += 1
                    to.counted = True
        # store the trackable object in our dictionary
        trackableObjects[objectID] = to

我們首先遍歷更新後的對象id的邊界框坐標。我們嘗試為當前的objectID獲取TrackableObject。如果objectID的TrackableObject不存在,我們就創建一個。否則,已經存在一個 TrackableObject ,所以我們需要弄清楚對象(人)是向上還是向下移動。

為此,我們獲取給定對象之前所有質心位置的y坐標值。然後,通過取當前質心位置與之前所有質心位置的平均值之間的差來計算方向。

我們取均值的原因是為瞭確保我們的方向跟蹤更穩定。如果我們隻存儲這個人之前的質心位置,我們就有可能出現錯誤的方向計數。記住,目標檢測和目標跟蹤算法不是“魔術”——有時它們會預測出可能稍微偏離你預期的邊界盒;因此,通過取均值,我們可以讓我們的人計算得更準確。

如果TrackableObject還沒有被計數,我們需要確定它是否已經準備好被計數,通過:

1.檢查direction是否為負(表示對象向上移動)並且質心在中心線上方。在這種情況下,我們增加 totalUp。

2.或者檢查direction是否為正(表示物體正在向下移動)且質心在中心線以下。如果這是真的,我們增加totalDown。

最後,我們將TrackableObject存儲在trackableObjects字典中,這樣我們就可以在捕獲下一幀時獲取並更新它。

接下來的三個代碼塊處理:

  • 顯示(繪圖並向幀寫入文本)
  • 將幀寫入磁盤上的視頻文件(如果存在–output命令行參數)
  • 捕獲按鍵
  • 清理

首先,我們將在框架上繪制一些信息以進行可視化:

        # draw both the ID of the object and the centroid of the
        # object on the output frame
        text = "ID {}".format(objectID)
        cv2.putText(frame, text, (centroid[0] - 10, centroid[1] - 10),
            cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
        cv2.circle(frame, (centroid[0], centroid[1]), 4, (0, 255, 0), -1)
    # construct a tuple of information we will be displaying on the
    # frame
    info = [
        ("Up", totalUp),
        ("Down", totalDown),
        ("Status", status),
    ]
    # loop over the info tuples and draw them on our frame
    for (i, (k, v)) in enumerate(info):
        text = "{}: {}".format(k, v)
        cv2.putText(frame, text, (10, H - ((i * 20) + 20)),
            cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)

在這裡,我們在幀上覆蓋以下數據:

  • ObjectID :每個對象的ID。
  • centroid :對象的中心將由一個點表示,該點是通過填充一個圓圈而創建的。
  • info : 包括 totalUp 、 totalDown 和 status

然後我們將把幀寫入視頻文件(如果需要的話)並處理按鍵:

    # check to see if we should write the frame to disk
    if writer is not None:
        writer.write(frame)
    # show the output frame
    cv2.imshow("Frame", frame)
    key = cv2.waitKey(1) & 0xFF
    # if the `q` key was pressed, break from the loop
    if key == ord("q"):
        break
    # increment the total number of frames processed thus far and
    # then update the FPS counter
    totalFrames += 1
    fps.update()

在這個代碼塊中我們:

  • 如果需要,將幀寫入輸出視頻文件
  • 顯示幀並處理按鍵。如果q被按下,我們將跳出幀處理循環。
  • 更新我們的fps計數器

現在是時候清理瞭:

# stop the timer and display FPS information
fps.stop()
print("[INFO] elapsed time: {:.2f}".format(fps.elapsed()))
print("[INFO] approx. FPS: {:.2f}".format(fps.fps()))
# check to see if we need to release the video writer pointer
if writer is not None:
    writer.release()
# if we are not using a video file, stop the camera video stream
if not args.get("input", False):
    vs.stop()
# otherwise, release the video file pointer
else:
    vs.release()
# close any open windows
cv2.destroyAllWindows()

為瞭完成腳本,我們向終端顯示 FPS 信息,釋放所有指針,並關閉所有打開的窗口。

7.完整代碼

people_counter.py

from pyimagesearch.centroidtracker import CentroidTracker
from pyimagesearch.trackableobject import TrackableObject
from imutils.video import VideoStream
from imutils.video import FPS
import numpy as np
import argparse
import imutils
import time
import dlib
import cv2


# 構造參數解析並解析參數
ap = argparse.ArgumentParser()
ap.add_argument("-p", "--prototxt", required=True,
	help="path to Caffe 'deploy' prototxt file")
ap.add_argument("-m", "--model", required=True,
	help="path to Caffe pre-trained model")
ap.add_argument("-i", "--input", type=str,
	help="path to optional input video file")
ap.add_argument("-o", "--output", type=str,
	help="path to optional output video file")
ap.add_argument("-c", "--confidence", type=float, default=0.4,
	help="minimum probability to filter weak detections")
ap.add_argument("-s", "--skip-frames", type=int, default=30,
	help="# of skip frames between detections")
args = vars(ap.parse_args())

# 初始化類標簽列表
CLASSES = ["background", "aeroplane", "bicycle", "bird", "boat",
	"bottle", "bus", "car", "cat", "chair", "cow", "diningtable",
	"dog", "horse", "motorbike", "person", "pottedplant", "sheep",
	"sofa", "train", "tvmonitor"]
# 從磁盤加載我們的序列化模型
print("[INFO] loading model...")
net = cv2.dnn.readNetFromCaffe(args["prototxt"], args["model"])

# 如果未提供視頻路徑,請獲取網絡攝像頭的引用
if not args.get("input", False):
	print("[INFO] starting video stream...")
	vs = VideoStream(src=0).start()
	time.sleep(2.0)
# 否則,獲取對視頻文件的引用
else:
	print("[INFO] opening video file...")
	vs = cv2.VideoCapture(args["input"])

# 初始化視頻寫入器(如果需要,我們稍後將進行實例化)
writer = None
# 初始化幀尺寸(我們將在從視頻中讀取第一幀後立即設置它們)
W = None
H = None
# 實例化我們的質心跟蹤器,然後初始化一個列表來存儲每個dlib相關跟蹤器,
# 然後是一個字典來將每個唯一的對象ID映射到TrackableObject
ct = CentroidTracker(maxDisappeared=40, maxDistance=50)
trackers = []
trackableObjects = {}
# 初始化到目前為止處理的幀總數,以及向上或向下移動的對象總數
totalFrames = 0
totalDown = 0
totalUp = 0
# 啟動FPS評估器
fps = FPS().start()

# 循環視頻流中的幀
while True:
	# 如果我們正在從 VideoCapture 或 VideoStream 讀取數據,則抓取下一幀並處理
	frame = vs.read()
	frame = frame[1] if args.get("input", False) else frame
	# 如果我們正在觀看視頻並且我們沒有抓取幀,那麼我們已經到瞭視頻的結尾
	if args["input"] is not None and frame is None:
		break
	# 調整幀的最大寬度為 500 像素(我們擁有的數據越少,我們處理它的速度就越快),
	# 然後將幀從 BGR 轉換為 RGB 用於 dlib
	frame = imutils.resize(frame, width=500)
	rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
	# 如果幀尺寸為空,則設置它們
	if W is None or H is None:
		(H, W) = frame.shape[:2]
	# 如果我們應該將視頻寫入磁盤,請初始化寫入器
	if args["output"] is not None and writer is None:
		fourcc = cv2.VideoWriter_fourcc(*"MJPG")
		writer = cv2.VideoWriter(args["output"], fourcc, 30,
			(W, H), True)


	# 初始化當前狀態以及由(1)我們的對象檢測器或(2)相關跟蹤器返回的邊界框矩形列表
	status = "Waiting"
	rects = []
	# 檢查我們是否應該運行計算量更大的目標檢測方法來幫助我們的跟蹤器
	if totalFrames % args["skip_frames"] == 0:
		# 設置狀態並初始化我們的新對象跟蹤器集
		status = "Detecting"
		trackers = []
		# 將幀轉換為 blob 並通過網絡傳遞 blob 並獲得檢測結果
		blob = cv2.dnn.blobFromImage(frame, 0.007843, (W, H), 127.5)
		net.setInput(blob)
		detections = net.forward()

		# 循環檢測結果
		for i in np.arange(0, detections.shape[2]):
			# 提取與預測相關的置信度(即概率)
			confidence = detections[0, 0, i, 2]
			# 通過要求最小置信度過濾掉弱檢測
			if confidence > args["confidence"]:
				# 從檢測列表中提取類標簽的索引
				idx = int(detections[0, 0, i, 1])
				# 如果類標簽不是人,則忽略它
				if CLASSES[idx] != "person":
					continue
				# 計算對象邊界框的 (x, y) 坐標
				box = detections[0, 0, i, 3:7] * np.array([W, H, W, H])
				(startX, startY, endX, endY) = box.astype("int")
				# 利用邊界框坐標構造一個 dlib 矩形對象,然後啟動 dlib 相關跟蹤器
				tracker = dlib.correlation_tracker()
				rect = dlib.rectangle(startX, startY, endX, endY)
				tracker.start_track(rgb, rect)
				# 將跟蹤器添加到我們的跟蹤器列表中,以便我們可以在跳幀期間使用它
				trackers.append(tracker)

	# 否則,我們應該利用目標跟蹤器而不是目標檢測器來獲得更高的FPS
	else:
		# 遍歷跟蹤器
		for tracker in trackers:
			# 將系統的狀態設置為“跟蹤”而不是“等待”或“檢測”
			status = "Tracking"
			# 更新跟蹤器並獲取更新的位置
			tracker.update(rgb)
			pos = tracker.get_position()
			# 解包位置對象
			startX = int(pos.left())
			startY = int(pos.top())
			endX = int(pos.right())
			endY = int(pos.bottom())
			# 將邊界框坐標添加到矩形列表
			rects.append((startX, startY, endX, endY))

	# 在幀中心畫一條水平線——一旦一個物體穿過這條線,我們將確定他們是在“向上”還是“向下”移動。
	cv2.line(frame, (0, H // 2), (W, H // 2), (0, 255, 255), 2)
	# 使用質心跟蹤器將 (1) 舊對象質心與 (2) 新計算的對象質心相關聯
	objects = ct.update(rects)

	# 循環遍歷被跟蹤的對象
	for (objectID, centroid) in objects.items():
		# 檢查當前對象 ID 是否存在可跟蹤對象
		to = trackableObjects.get(objectID, None)
		# 如果沒有現有的可跟蹤對象,則創建一個
		if to is None:
			to = TrackableObject(objectID, centroid)
		# 否則,有一個可追蹤的物體,所以我們可以利用它來確定方向
		else:
			# *當前*質心的 y 坐標與 *previous* 質心的平均值之間的差異
			# 將告訴我們物體在哪個方向移動(“向上”為負,“向下”為正)
			y = [c[1] for c in to.centroids]
			direction = centroid[1] - np.mean(y)
			to.centroids.append(centroid)
			# 檢查對象是否已被計數
			if not to.counted:
				# 如果方向為負(表示物體向上移動)且質心在中線以上,則計算物體
				if direction < 0 and centroid[1] < H // 2:
					totalUp += 1
					to.counted = True
				# 如果方向為正(表示物體正在向下移動)並且質心低於中心線,則計算物體
				elif direction > 0 and centroid[1] > H // 2:
					totalDown += 1
					to.counted = True
		# 將可跟蹤對象存儲在我們的字典中
		trackableObjects[objectID] = to

		# 在輸出幀上繪制對象的 ID 和對象的質心
		text = "ID {}".format(objectID)
		cv2.putText(frame, text, (centroid[0] - 10, centroid[1] - 10),
			cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
		cv2.circle(frame, (centroid[0], centroid[1]), 4, (0, 255, 0), -1)
	# 構建我們將在幀上顯示的信息元組
	info = [
		("Up", totalUp),
		("Down", totalDown),
		("Status", status),
	]
	# 遍歷信息元組並將它們繪制在我們的幀上
	for (i, (k, v)) in enumerate(info):
		text = "{}: {}".format(k, v)
		cv2.putText(frame, text, (10, H - ((i * 20) + 20)),
			cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)



	# 檢查我們是否應該將幀寫入磁盤
	if writer is not None:
		writer.write(frame)
	# 顯示輸出幀
	cv2.imshow("Frame", frame)
	key = cv2.waitKey(1) & 0xFF
	# 如果' q '鍵被按下,中斷循環
	if key == ord("q"):
		break
	# 增加到目前為止處理的幀總數,然後更新 FPS 計數器
	totalFrames += 1
	fps.update()

# 停止定時器,顯示FPS信息
fps.stop()
print("[INFO] elapsed time: {:.2f}".format(fps.elapsed()))
print("[INFO] approx. FPS: {:.2f}".format(fps.fps()))
# 檢查我們是否需要釋放視頻寫入器指針
if writer is not None:
	writer.release()
# 如果我們不使用視頻文件,請停止攝像頭視頻流
if not args.get("input", False):
	vs.stop()
# 否則,釋放視頻文件指針
else:
	vs.release()
# 關閉所有打開的窗口
cv2.destroyAllWindows()

centroidtracker.py

(1)質心跟蹤器是最可靠的跟蹤器之一。

(2)為瞭簡單起見,質心跟蹤器計算包圍框的質心。

(3)也就是說,邊界框是圖像中對象的(x, y)坐標。

(4)一旦我們的SSD獲得瞭坐標,跟蹤器就會計算包圍框的質心(中心)。換句話說,就是物體的中心。

(5)然後為每一個被檢測到的特定對象分配一個唯一的ID,用於跟蹤幀序列。

from scipy.spatial import distance as dist
from collections import OrderedDict
import numpy as np

class CentroidTracker:
	def __init__(self, maxDisappeared=50, maxDistance=50):
		# 初始化下一個唯一的對象ID,並使用兩個有序字典來跟蹤給定對象ID到其質心的映射,
		# 以及它被標記為“消失”的連續幀數
		self.nextObjectID = 0
		self.objects = OrderedDict()
		self.disappeared = OrderedDict()

		# 存儲一個給定對象允許被標記為“消失”的最大連續幀數,直到我們需要從跟蹤中註銷該對象
		self.maxDisappeared = maxDisappeared

		# 存儲質心之間的最大距離以關聯對象——如果距離大於這個最大距離,我們開始將對象標記為“消失”
		self.maxDistance = maxDistance

	def register(self, centroid):
		# 註冊對象時,我們使用下一個可用的對象 ID 來存儲質心
		self.objects[self.nextObjectID] = centroid
		self.disappeared[self.nextObjectID] = 0
		self.nextObjectID += 1

	def deregister(self, objectID):
		# 要註銷對象 ID,我們從各自的字典中刪除對象 ID
		del self.objects[objectID]
		del self.disappeared[objectID]

	def update(self, rects):
		# 檢查輸入邊界框矩形列表是否為空
		if len(rects) == 0:
			# 循環遍歷任何現有的跟蹤對象並將它們標記為消失
			for objectID in list(self.disappeared.keys()):
				self.disappeared[objectID] += 1

				# 如果我們已經達到給定對象被標記為消失的最大連續幀數,則取消註冊它
				if self.disappeared[objectID] > self.maxDisappeared:
					self.deregister(objectID)

			# 早點返回,因為沒有要更新的質心或跟蹤信息
			return self.objects

		# 初始化當前幀的輸入質心數組
		inputCentroids = np.zeros((len(rects), 2), dtype="int")

		# 循環邊界框矩形
		for (i, (startX, startY, endX, endY)) in enumerate(rects):
			# 使用邊界框坐標推導出質心
			cX = int((startX + endX) / 2.0)
			cY = int((startY + endY) / 2.0)
			inputCentroids[i] = (cX, cY)

		# 如果我們當前沒有跟蹤任何對象,則獲取輸入質心並註冊它們中的每一個
		if len(self.objects) == 0:
			for i in range(0, len(inputCentroids)):
				self.register(inputCentroids[i])

		# 否則,我們目前正在跟蹤對象,因此我們需要嘗試將輸入質心與現有對象質心匹配
		else:
			# 獲取一組對象 ID 和相應的質心
			objectIDs = list(self.objects.keys())
			objectCentroids = list(self.objects.values())

			# 分別計算每對對象質心和輸入質心之間的距離——我們的目標是將輸入質心與現有對象質心匹配
			D = dist.cdist(np.array(objectCentroids), inputCentroids)

			# 為瞭執行這種匹配,我們必須 (1) 找到每一行中的最小值,
			# 然後 (2) 根據它們的最小值對行索引進行排序,以便具有最小值的行位於索引列表的 *front*
			rows = D.min(axis=1).argsort()

			# 接下來,我們對列執行類似的處理,方法是在每個列中找到最小的值,
			# 然後使用之前計算的行索引列表進行排序
			cols = D.argmin(axis=1)[rows]

			# 為瞭確定我們是否需要更新、註冊或取消註冊一個對象,我們需要跟蹤我們已經檢查過的行和列索引
			usedRows = set()
			usedCols = set()

			# 循環遍歷(行,列)索引元組的組合
			for (row, col) in zip(rows, cols):
				# 如果我們之前已經檢查過行值或列值,請忽略它
				if row in usedRows or col in usedCols:
					continue

				# 如果質心之間的距離大於最大距離,則不要將兩個質心關聯到同一個對象
				if D[row, col] > self.maxDistance:
					continue

				# 否則,獲取當前行的對象 ID,設置其新質心,並重置消失的計數器
				objectID = objectIDs[row]
				self.objects[objectID] = inputCentroids[col]
				self.disappeared[objectID] = 0

				# 表明我們已經分別檢查瞭每個行和列索引
				usedRows.add(row)
				usedCols.add(col)

			# 計算我們尚未檢查的行和列索引
			unusedRows = set(range(0, D.shape[0])).difference(usedRows)
			unusedCols = set(range(0, D.shape[1])).difference(usedCols)

			# 如果對象質心的數量等於或大於輸入質心的數量,
			# 我們需要檢查並查看其中一些對象是否可能已經消失
			if D.shape[0] >= D.shape[1]:
				# 循環未使用的行索引
				for row in unusedRows:
					# 獲取相應行索引的對象 ID 並增加消失的計數器
					objectID = objectIDs[row]
					self.disappeared[objectID] += 1

					# 檢查對象的連續幀數是否被標記為“消失”,以註銷該對象
					if self.disappeared[objectID] > self.maxDisappeared:
						self.deregister(objectID)

			# 否則,如果輸入質心的數量大於現有對象質心的數量,我們需要將每個新的輸入質心註冊為可跟蹤對象
			else:
				for col in unusedCols:
					self.register(inputCentroids[col])

		# 返回可跟蹤對象的集合
		return self.objects

trackableobject.py

class TrackableObject:
	def __init__(self, objectID, centroid):
		# 存儲對象 ID,然後使用當前質心初始化質心列表
		self.objectID = objectID
		self.centroids = [centroid]

		# 初始化一個佈爾值,用於指示對象是否已被計數
		self.counted = False

8.運行結果

打開終端,執行以下命令:

python people_counter.py --prototxt mobilenet_ssd/MobileNetSSD_deploy.prototxt \
    --model mobilenet_ssd/MobileNetSSD_deploy.caffemodel \
    --input videos/example_01.mp4 --output output/output_01.avi

我們的人員計數正在計算以下人數:

  • 正進入百貨商店(下)
  • 離開的人數(上)

在第一個視頻的最後,你會看到有7個人進入,3個人離開。

此外,檢查終端輸出,你會發現我們的人計數器能夠實時運行,達到34幀每秒。盡管我們正在使用深度學習對象檢測器來更準確地檢測人。

我們的 34 FPS 幀率是通過我們的兩個階段過程實現的: 每 30 幀檢測一次人 然後在其間的所有幀中應用更快、更有效的對象跟蹤算法。

9.改進我們的人員計數器應用程序

為瞭構建我們的 OpenCV 人員計數器,我們使用瞭 dlib 的相關性跟蹤器。此方法易於使用,並且隻需要很少的代碼。

然而,我們的實現有點低效——為瞭跟蹤多個對象,我們需要創建關聯跟蹤器對象的多個實例。然後當我們需要在後續幀中計算對象的位置時,我們需要遍歷所有 N 個對象跟蹤器並獲取更新的位置。

所有這些計算都將發生在我們腳本的主執行線程中,從而降低瞭我們的 FPS 速率。

因此,提高性能的一種簡單方法是使用dlib的多對象跟蹤器,以使我們的 FPS 速率提高 45%! 註意:OpenCV 也實現瞭多對象跟蹤,但不是多進程(至少在撰寫本文時)。 OpenCV 的多對象方法當然更容易使用,但如果沒有多處理能力,在這種情況下它並沒有多大幫助。

最後,為瞭獲得更高的跟蹤精度(但在沒有快速 GPU 的情況下會犧牲速度),您可以研究基於深度學習的對象跟蹤器,例如 Deep SORT。

BONUS

前幾天在github上看見一個改進版:

主要目標是將項目用作業務視角,隨時可以擴展。

用例:實時統計商店/大樓/購物中心等的人數。

如果人數超過上限就會向工作人員發出警報。

自動化特性並優化實時流以獲得更好的性能(使用線程)。

作為一項措施,以進行足跡分析,並在某種程度上應對COVID-19。

到此這篇關於基於OpenCV目標跟蹤實現人員計數器的文章就介紹到這瞭,更多相關OpenCV人員計數內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: