Mercurial Hosting > traffic-intelligence
diff python/moving.py @ 680:da1352b89d02 dev
classification is working
author | Nicolas Saunier <nicolas.saunier@polymtl.ca> |
---|---|
date | Fri, 05 Jun 2015 02:25:30 +0200 |
parents | ab3fdff42624 |
children | fbe29be25501 |
line wrap: on
line diff
--- a/python/moving.py Wed Jun 03 16:00:46 2015 +0200 +++ b/python/moving.py Fri Jun 05 02:25:30 2015 +0200 @@ -5,7 +5,7 @@ from base import VideoFilenameAddable from math import sqrt, atan2, cos, sin -from numpy import median, array, zeros, hypot, NaN, std +from numpy import median, array, zeros, hypot, NaN, std, floor, float32 from matplotlib.pyplot import plot from scipy.stats import scoreatpercentile from scipy.spatial.distance import cdist @@ -78,7 +78,6 @@ else: return None - def unionIntervals(intervals): 'returns the smallest interval containing all intervals' inter = intervals[0] @@ -1129,8 +1128,9 @@ print('Object {} has no features loaded.'.format(self.getNum())) return None - def getSpeeds(self): - return self.getVelocities().norm() + def getSpeeds(self, nInstantsIgnoredAtEnds = 0): + n = min(nInstantsIgnoredAtEnds, int(floor(self.length()/2.))) + return self.getVelocities().norm()[n:-n] def getSpeedIndicator(self): from indicators import SeverityIndicator @@ -1346,23 +1346,20 @@ ### # User Type Classification ### - def classifyUserTypeSpeedMotorized(self, threshold, aggregationFunc = median, ignoreNInstantsAtEnds = 0): + def classifyUserTypeSpeedMotorized(self, threshold, aggregationFunc = median, nInstantsIgnoredAtEnds = 0): '''Classifies slow and fast road users slow: non-motorized -> pedestrians fast: motorized -> cars aggregationFunc can be any function that can be applied to a vector of speeds, including percentile: aggregationFunc = lambda x: percentile(x, percentileFactor) # where percentileFactor is 85 for 85th percentile''' - if ignoreNInstantsAtEnds > 0: - speeds = self.getSpeeds()[ignoreNInstantsAtEnds:-ignoreNInstantsAtEnds] - else: - speeds = self.getSpeeds() + speeds = self.getSpeeds(nInstantsIgnoredAtEnds) if aggregationFunc(speeds) >= threshold: self.setUserType(userType2Num['car']) else: self.setUserType(userType2Num['pedestrian']) - def classifyUserTypeSpeed(self, speedProbabilities, aggregationFunc = median): + def classifyUserTypeSpeed(self, speedProbabilities, aggregationFunc = median, nInstantsIgnoredAtEnds = 0): '''Classifies road user per road user type speedProbabilities are functions return P(speed|class) in a dictionary indexed by user type names @@ -1375,62 +1372,67 @@ else: return x''' if not hasattr(self, 'aggregatedSpeed'): - self.aggregatedSpeed = aggregationFunc(self.getSpeeds()) + self.aggregatedSpeed = aggregationFunc(self.getSpeeds(nInstantsIgnoredAtEnds)) userTypeProbabilities = {} for userTypename in speedProbabilities: userTypeProbabilities[userType2Num[userTypename]] = speedProbabilities[userTypename](self.aggregatedSpeed) self.setUserType(utils.argmaxDict(userTypeProbabilities)) return userTypeProbabilities - def initClassifyUserTypeHoGSVM(self, aggregationFunc = median): + def initClassifyUserTypeHoGSVM(self, aggregationFunc, pedBikeCarSVM, bikeCarSVM = None, pedBikeSpeedTreshold = float('Inf'), bikeCarSpeedThreshold = float('Inf'), nInstantsIgnoredAtEnds = 0): '''Initializes the data structures for classification - TODO? compute speed for longest feature? - Skip beginning and end of feature for speed? Offer options instead of median''' - self.aggregatedSpeed = aggregationFunc(self.getSpeeds()) + TODO? compute speed for longest feature?''' + self.aggregatedSpeed = aggregationFunc(self.getSpeeds(nInstantsIgnoredAtEnds)) + if self.aggregatedSpeed < pedBikeSpeedTreshold or bikeCarSVM is None: + self.appearanceClassifier = pedBikeCarSVM + elif self.aggregatedSpeed < bikeCarSpeedThreshold: + self.appearanceClassifier = bikeCarSVM + else: + class CarClassifier: + def predict(self, hog): + return userType2Num['car'] + self.appearanceClassifier = CarClassifier() + self.userTypes = {} - def classifyUserTypeHoGSVMAtInstant(self, img, pedBikeCarSVM, instant, homography, width, height, bikeCarSVM = None, pedBikeSpeedTreshold = float('Inf'), bikeCarSpeedThreshold = float('Inf'), px = 0.2, py = 0.2, pixelThreshold = 800): + def classifyUserTypeHoGSVMAtInstant(self, img, instant, homography, width, height, px = 0.2, py = 0.2, minNPixels = 800): '''Extract the image box around the object and applies the SVM model on it''' - croppedImg, yCropMin, yCropMax, xCropMin, xCropMax = imageBox(img, self, instant, homography, width, height, px, py, pixelThreshold) - if len(croppedImg) > 0: # != [] - hog = array([cvutils.HOG(croppedImg)], dtype = np.float32) - if self.aggregatedSpeed < pedBikeSpeedTreshold or bikeCarSVM is None: - self.userTypes[instant] = int(pedBikeCarSVM.predict(hog)) - elif self.aggregatedSpeed < bikeCarSpeedTreshold: - self.userTypes[instant] = int(bikeCarSVM.predict(hog)) - else: - self.userTypes[instant] = userType2Num['car'] + croppedImg, yCropMin, yCropMax, xCropMin, xCropMax = cvutils.imageBox(img, self, instant, homography, width, height, px, py, minNPixels) + if len(croppedImg) > 0: + hog = cvutils.HOG(croppedImg)#HOG(image, rescaleSize = (64, 64), orientations=9, pixelsPerCell=(8, 8), cellsPerBlock=(2, 2), visualize=False, normalize=False) + self.userTypes[instant] = int(self.appearanceClassifier.predict(hog)) else: self.userTypes[instant] = userType2Num['unknown'] - def classifyUserTypeHoGSVM(self, images, pedBikeCarSVM, homography, width, height, bikeCarSVM = None, pedBikeSpeedTreshold = float('Inf'), bikeCarSpeedThreshold = float('Inf'), speedProbabilities = None, aggregationFunc = median, px = 0.2, py = 0.2, pixelThreshold = 800): + def classifyUserTypeHoGSVM(self, pedBikeCarSVM = None, width = 0, height = 0, homography = None, images = None, bikeCarSVM = None, pedBikeSpeedTreshold = float('Inf'), bikeCarSpeedThreshold = float('Inf'), minSpeedEquiprobable = -1, speedProbabilities = None, aggregationFunc = median, nInstantsIgnoredAtEnds = 0, px = 0.2, py = 0.2, minNPixels = 800): '''Agregates SVM detections in each image and returns probability (proportion of instants with classification in each category) - iamges is a dictionary of images indexed by instant + images is a dictionary of images indexed by instant With default parameters, the general (ped-bike-car) classifier will be used - TODO? consider all categories?''' - if not hasattr(self, aggregatedSpeed) or not hasattr(self, userTypes): + + Considered categories are the keys of speedProbabilities''' + if not hasattr(self, 'aggregatedSpeed') or not hasattr(self, 'userTypes'): print('Initilize the data structures for classification by HoG-SVM') - self.initClassifyUserTypeHoGSVM(aggregationFunc) + self.initClassifyUserTypeHoGSVM(aggregationFunc, pedBikeCarSVM, bikeCarSVM, pedBikeSpeedTreshold, bikeCarSpeedThreshold, nInstantsIgnoredAtEnds) - if len(self.userTypes) != self.length(): # if classification has not been done previously + if len(self.userTypes) != self.length() and images is not None: # if classification has not been done previously for t in self.getTimeInterval(): if t not in self.userTypes: - self.classifyUserTypeHoGSVMAtInstant(images[t], pedBikeCarSVM, t, homography, width, height, bikeCarSVM, pedBikeSpeedTreshold, bikeCarSpeedThreshold, px, py, pixelThreshold) + self.classifyUserTypeHoGSVMAtInstant(images[t], t, homography, width, height, px, py, minNPixels) # compute P(Speed|Class) - if speedProbabilities is None: # equiprobable information from speed + if speedProbabilities is None or self.aggregatedSpeed < minSpeedEquiprobable: # equiprobable information from speed userTypeProbabilities = {userType2Num['car']: 1., userType2Num['pedestrian']: 1., userType2Num['bicycle']: 1.} else: userTypeProbabilities = {userType2Num[userTypename]: speedProbabilities[userTypename](self.aggregatedSpeed) for userTypename in speedProbabilities} # result is P(Class|Appearance) x P(Speed|Class) - nInstantsUserType = {userType2Num[userTypename]: 0 for userTypename in userTypeProbabilities}# number of instants the object is classified as userTypename + nInstantsUserType = {userTypeNum: 0 for userTypeNum in userTypeProbabilities}# number of instants the object is classified as userTypename for t in self.userTypes: - nInstantsUserType[self.userTypes[t]] += 1 - for userTypename in userTypeProbabilities: - userTypeProbabilities[userTypename] *= nInstantsUserType[userTypename] + nInstantsUserType[self.userTypes[t]] = nInstantsUserType.get(self.userTypes[t], 0) + 1 + for userTypeNum in userTypeProbabilities: + userTypeProbabilities[userTypeNum] *= nInstantsUserType[userTypeNum] # class is the user type that maximizes usertype probabilities self.setUserType(utils.argmaxDict(userTypeProbabilities))