mirror of
https://github.com/kerberos-io/heatmap.git
synced 2026-04-22 15:57:03 +08:00
mvp of a heatmap
This commit is contained in:
Binary file not shown.
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,171 @@
|
||||
# import the necessary packages
|
||||
from scipy.spatial import distance as dist
|
||||
from collections import OrderedDict
|
||||
import numpy as np
|
||||
|
||||
|
||||
class CentroidTracker:
|
||||
def __init__(self, maxDisappeared=50, maxDistance=50):
|
||||
# initialize the next unique object ID along with two ordered
|
||||
# dictionaries used to keep track of mapping a given object
|
||||
# ID to its centroid and number of consecutive frames it has
|
||||
# been marked as "disappeared", respectively
|
||||
self.nextObjectID = 0
|
||||
self.objects = OrderedDict()
|
||||
self.disappeared = OrderedDict()
|
||||
self.bbox = OrderedDict() # CHANGE
|
||||
|
||||
# store the number of maximum consecutive frames a given
|
||||
# object is allowed to be marked as "disappeared" until we
|
||||
# need to deregister the object from tracking
|
||||
self.maxDisappeared = maxDisappeared
|
||||
|
||||
# store the maximum distance between centroids to associate
|
||||
# an object -- if the distance is larger than this maximum
|
||||
# distance we'll start to mark the object as "disappeared"
|
||||
self.maxDistance = maxDistance
|
||||
|
||||
def register(self, centroid, inputRect):
|
||||
# when registering an object we use the next available object
|
||||
# ID to store the centroid
|
||||
self.objects[self.nextObjectID] = centroid
|
||||
self.bbox[self.nextObjectID] = inputRect # CHANGE
|
||||
self.disappeared[self.nextObjectID] = 0
|
||||
self.nextObjectID += 1
|
||||
|
||||
def deregister(self, objectID):
|
||||
# to deregister an object ID we delete the object ID from
|
||||
# both of our respective dictionaries
|
||||
del self.objects[objectID]
|
||||
del self.disappeared[objectID]
|
||||
del self.bbox[objectID] # CHANGE
|
||||
|
||||
def update(self, rects):
|
||||
# check to see if the list of input bounding box rectangles
|
||||
# is empty
|
||||
if len(rects) == 0:
|
||||
# loop over any existing tracked objects and mark them
|
||||
# as disappeared
|
||||
for objectID in list(self.disappeared.keys()):
|
||||
self.disappeared[objectID] += 1
|
||||
|
||||
# if we have reached a maximum number of consecutive
|
||||
# frames where a given object has been marked as
|
||||
# missing, deregister it
|
||||
if self.disappeared[objectID] > self.maxDisappeared:
|
||||
self.deregister(objectID)
|
||||
|
||||
# return early as there are no centroids or tracking info
|
||||
# to update
|
||||
# return self.objects
|
||||
return self.bbox
|
||||
|
||||
# initialize an array of input centroids for the current frame
|
||||
inputCentroids = np.zeros((len(rects), 2), dtype="int")
|
||||
inputRects = []
|
||||
# loop over the bounding box rectangles
|
||||
for (i, (startX, startY, endX, endY)) in enumerate(rects):
|
||||
# use the bounding box coordinates to derive the centroid
|
||||
cX = int((startX + endX) / 2.0)
|
||||
cY = int((startY + endY) / 2.0)
|
||||
inputCentroids[i] = (cX, cY)
|
||||
inputRects.append(rects[i]) # CHANGE
|
||||
|
||||
# if we are currently not tracking any objects take the input
|
||||
# centroids and register each of them
|
||||
if len(self.objects) == 0:
|
||||
for i in range(0, len(inputCentroids)):
|
||||
self.register(inputCentroids[i], inputRects[i]) # CHANGE
|
||||
|
||||
# otherwise, are are currently tracking objects so we need to
|
||||
# try to match the input centroids to existing object
|
||||
# centroids
|
||||
else:
|
||||
# grab the set of object IDs and corresponding centroids
|
||||
objectIDs = list(self.objects.keys())
|
||||
objectCentroids = list(self.objects.values())
|
||||
|
||||
# compute the distance between each pair of object
|
||||
# centroids and input centroids, respectively -- our
|
||||
# goal will be to match an input centroid to an existing
|
||||
# object centroid
|
||||
D = dist.cdist(np.array(objectCentroids), inputCentroids)
|
||||
|
||||
# in order to perform this matching we must (1) find the
|
||||
# smallest value in each row and then (2) sort the row
|
||||
# indexes based on their minimum values so that the row
|
||||
# with the smallest value as at the *front* of the index
|
||||
# list
|
||||
rows = D.min(axis=1).argsort()
|
||||
|
||||
# next, we perform a similar process on the columns by
|
||||
# finding the smallest value in each column and then
|
||||
# sorting using the previously computed row index list
|
||||
cols = D.argmin(axis=1)[rows]
|
||||
|
||||
# in order to determine if we need to update, register,
|
||||
# or deregister an object we need to keep track of which
|
||||
# of the rows and column indexes we have already examined
|
||||
usedRows = set()
|
||||
usedCols = set()
|
||||
|
||||
# loop over the combination of the (row, column) index
|
||||
# tuples
|
||||
for (row, col) in zip(rows, cols):
|
||||
# if we have already examined either the row or
|
||||
# column value before, ignore it
|
||||
if row in usedRows or col in usedCols:
|
||||
continue
|
||||
|
||||
# if the distance between centroids is greater than
|
||||
# the maximum distance, do not associate the two
|
||||
# centroids to the same object
|
||||
if D[row, col] > self.maxDistance:
|
||||
continue
|
||||
|
||||
# otherwise, grab the object ID for the current row,
|
||||
# set its new centroid, and reset the disappeared
|
||||
# counter
|
||||
objectID = objectIDs[row]
|
||||
self.objects[objectID] = inputCentroids[col]
|
||||
self.bbox[objectID] = inputRects[col] # CHANGE
|
||||
self.disappeared[objectID] = 0
|
||||
|
||||
# indicate that we have examined each of the row and
|
||||
# column indexes, respectively
|
||||
usedRows.add(row)
|
||||
usedCols.add(col)
|
||||
|
||||
# compute both the row and column index we have NOT yet
|
||||
# examined
|
||||
unusedRows = set(range(0, D.shape[0])).difference(usedRows)
|
||||
unusedCols = set(range(0, D.shape[1])).difference(usedCols)
|
||||
|
||||
# in the event that the number of object centroids is
|
||||
# equal or greater than the number of input centroids
|
||||
# we need to check and see if some of these objects have
|
||||
# potentially disappeared
|
||||
if D.shape[0] >= D.shape[1]:
|
||||
# loop over the unused row indexes
|
||||
for row in unusedRows:
|
||||
# grab the object ID for the corresponding row
|
||||
# index and increment the disappeared counter
|
||||
objectID = objectIDs[row]
|
||||
self.disappeared[objectID] += 1
|
||||
|
||||
# check to see if the number of consecutive
|
||||
# frames the object has been marked "disappeared"
|
||||
# for warrants deregistering the object
|
||||
if self.disappeared[objectID] > self.maxDisappeared:
|
||||
self.deregister(objectID)
|
||||
|
||||
# otherwise, if the number of input centroids is greater
|
||||
# than the number of existing object centroids we need to
|
||||
# register each new input centroid as a trackable object
|
||||
else:
|
||||
for col in unusedCols:
|
||||
self.register(inputCentroids[col], inputRects[col])
|
||||
|
||||
# return the set of trackable objects
|
||||
# return self.objects
|
||||
return self.bbox
|
||||
@@ -0,0 +1,155 @@
|
||||
import cv2
|
||||
import datetime
|
||||
import imutils
|
||||
import numpy as np
|
||||
from collections import defaultdict
|
||||
from centroidtracker import CentroidTracker
|
||||
|
||||
protopath = "MobileNetSSD_deploy.prototxt"
|
||||
modelpath = "MobileNetSSD_deploy.caffemodel"
|
||||
detector = cv2.dnn.readNetFromCaffe(prototxt=protopath, caffeModel=modelpath)
|
||||
|
||||
# Only enable it if you are using OpenVino environment
|
||||
# detector.setPreferableBackend(cv2.dnn.DNN_BACKEND_INFERENCE_ENGINE)
|
||||
# detector.setPreferableTarget(cv2.dnn.DNN_TARGET_CPU)
|
||||
|
||||
|
||||
CLASSES = ["background", "aeroplane", "bicycle", "bird", "boat",
|
||||
"bottle", "bus", "car", "cat", "chair", "cow", "diningtable",
|
||||
"dog", "horse", "motorbike", "person", "pottedplant", "sheep",
|
||||
"sofa", "train", "tvmonitor"]
|
||||
|
||||
# maxDisappeared, time wait when object moves out of frame
|
||||
tracker = CentroidTracker(maxDisappeared=700, maxDistance=220)
|
||||
|
||||
|
||||
def non_max_suppression_fast(boxes, overlapThresh):
|
||||
try:
|
||||
if len(boxes) == 0:
|
||||
return []
|
||||
|
||||
if boxes.dtype.kind == "i":
|
||||
boxes = boxes.astype("float")
|
||||
|
||||
pick = []
|
||||
|
||||
x1 = boxes[:, 0]
|
||||
y1 = boxes[:, 1]
|
||||
x2 = boxes[:, 2]
|
||||
y2 = boxes[:, 3]
|
||||
|
||||
|
||||
area = (x2 - x1 + 1) * (y2 - y1 + 1)
|
||||
idxs = np.argsort(y2)
|
||||
|
||||
while len(idxs) > 0:
|
||||
last = len(idxs) - 1
|
||||
i = idxs[last]
|
||||
pick.append(i)
|
||||
|
||||
xx1 = np.maximum(x1[i], x1[idxs[:last]])
|
||||
yy1 = np.maximum(y1[i], y1[idxs[:last]])
|
||||
xx2 = np.minimum(x2[i], x2[idxs[:last]])
|
||||
yy2 = np.minimum(y2[i], y2[idxs[:last]])
|
||||
|
||||
w = np.maximum(0, xx2 - xx1 + 1)
|
||||
h = np.maximum(0, yy2 - yy1 + 1)
|
||||
|
||||
overlap = (w * h) / area[idxs[:last]]
|
||||
|
||||
idxs = np.delete(idxs, np.concatenate(([last],
|
||||
np.where(overlap > overlapThresh)[0])))
|
||||
|
||||
return boxes[pick].astype("int")
|
||||
except Exception as e:
|
||||
print("Exception occurred in non_max_suppression : {}".format(e))
|
||||
|
||||
|
||||
def main():
|
||||
cap = cv2.VideoCapture('1639943552_6-967003_camera1_200-200-400-400_24_769.mp4')
|
||||
|
||||
fps_start_time = datetime.datetime.now()
|
||||
fps = 0
|
||||
total_frames = 0
|
||||
centroid_dict = defaultdict(list)
|
||||
object_id_list = []
|
||||
|
||||
while True:
|
||||
ret, frame = cap.read()
|
||||
frame = imutils.resize(frame, width=600)
|
||||
total_frames = total_frames + 1
|
||||
(height, width) = frame.shape[:2]
|
||||
|
||||
blob = cv2.dnn.blobFromImage(frame, 0.007843, (width, height), 127.5)
|
||||
detector.setInput(blob)
|
||||
person_detections = detector.forward()
|
||||
|
||||
rects = []
|
||||
for i in np.arange(0, person_detections.shape[2]):
|
||||
confidence = person_detections[0, 0, i, 2]
|
||||
if confidence > 0.5:
|
||||
idx = int(person_detections[0, 0, i, 1])
|
||||
|
||||
if CLASSES[idx] != "person":
|
||||
continue
|
||||
|
||||
person_box = person_detections[0, 0, i, 3:7] * np.array([width, height, width, height])
|
||||
(startX, startY, endX, endY) = person_box.astype("int")
|
||||
rects.append(person_box)
|
||||
|
||||
boundingboxes = np.array(rects)
|
||||
boundingboxes = boundingboxes.astype(int)
|
||||
rects = non_max_suppression_fast(boundingboxes, 0.4)
|
||||
objects = tracker.update(rects)
|
||||
|
||||
for (objectId, bbox) in objects.items():
|
||||
x1, y1, x2, y2 = bbox
|
||||
x1 = int(x1)
|
||||
y1 = int(y1)
|
||||
x2 = int(x2)
|
||||
y2 = int(y2)
|
||||
|
||||
xCenter = int((x1 + x2) / 2)
|
||||
yCenter = int((y1 + y2) / 2)
|
||||
|
||||
cv2.circle(frame, (xCenter, y2), 5, (0, 255, 0), -1)
|
||||
|
||||
centroid_dict[objectId].append((xCenter, y2))
|
||||
|
||||
if objectId not in object_id_list:
|
||||
object_id_list.append(objectId)
|
||||
start_pt = (xCenter, y2)
|
||||
end_pt = (xCenter, y2)
|
||||
cv2.line(frame, start_pt, end_pt, (0, 255, 0), 2)
|
||||
else:
|
||||
L = len(centroid_dict[objectId])
|
||||
for pt in range(len(centroid_dict[objectId])):
|
||||
if not pt + 1 == L:
|
||||
start_pt = (centroid_dict[objectId][pt][0], centroid_dict[objectId][pt][1])
|
||||
end_pt = (centroid_dict[objectId][pt + 1][0], centroid_dict[objectId][pt + 1][1])
|
||||
cv2.line(frame, start_pt, end_pt, (0, 255, 0), 1)
|
||||
|
||||
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 0, 255), 2)
|
||||
text = "ID: {}".format(objectId)
|
||||
cv2.putText(frame, text, (x1, y1-5), cv2.FONT_HERSHEY_COMPLEX_SMALL, 1, (0, 0, 255), 1)
|
||||
|
||||
fps_end_time = datetime.datetime.now()
|
||||
time_diff = fps_end_time - fps_start_time
|
||||
if time_diff.seconds == 0:
|
||||
fps = 0.0
|
||||
else:
|
||||
fps = (total_frames / time_diff.seconds)
|
||||
fps_text = "FPS: {:.2f}".format(fps)
|
||||
cv2.putText(frame, fps_text, (5, 30), cv2.FONT_HERSHEY_COMPLEX_SMALL, 1, (0, 0, 255), 1)
|
||||
|
||||
cv2.imshow("Application", frame)
|
||||
# cv2.VideoWriter_fourcc("new_vid.mp4", cv2.VideoWriter_fourcc(*'mp4v'), 20, (width, height))
|
||||
key = cv2.waitKey(1)
|
||||
if key == ord('q'):
|
||||
break
|
||||
cap.release()
|
||||
cv2.destroyAllWindows()
|
||||
|
||||
|
||||
main()
|
||||
|
||||
Reference in New Issue
Block a user