Mercurial Hosting > traffic-intelligence
diff python/events.py @ 619:dc2d0a0d7fe1
merged code from Mohamed Gomaa Mohamed for the use of points of interests in mation pattern learning and motion prediction (TRB 2015)
author | Nicolas Saunier <nicolas.saunier@polymtl.ca> |
---|---|
date | Wed, 10 Dec 2014 15:27:08 -0500 |
parents | 306db0f3c7a2 |
children | 82e9f78a4714 |
line wrap: on
line diff
--- a/python/events.py Sun Dec 07 23:01:02 2014 -0500 +++ b/python/events.py Wed Dec 10 15:27:08 2014 -0500 @@ -9,8 +9,37 @@ import itertools import moving, prediction, indicators, utils +__metaclass__ = type -__metaclass__ = type +def findRoute(prototypes,objects,i,j,noiseEntryNums,noiseExitNums,minSimilarity= 0.3, spatialThreshold=1.0, delta=180): + if i[0] not in noiseEntryNums: + prototypesRoutes= [ x for x in sorted(prototypes.keys()) if i[0]==x[0]] + elif i[1] not in noiseExitNums: + prototypesRoutes=[ x for x in sorted(prototypes.keys()) if i[1]==x[1]] + else: + prototypesRoutes=[x for x in sorted(prototypes.keys())] + routeSim={} + lcss = utils.LCSS(similarityFunc=lambda x,y: (distanceForLCSS(x,y) <= spatialThreshold),delta=delta) + for y in prototypesRoutes: + if y in prototypes.keys(): + prototypesIDs=prototypes[y] + similarity=[] + for x in prototypesIDs: + s=lcss.computeNormalized(objects[j].positions, objects[x].positions) + similarity.append(s) + routeSim[y]=max(similarity) + route=max(routeSim, key=routeSim.get) + if routeSim[route]>=minSimilarity: + return route + else: + return i + +def getRoute(obj,prototypes,objects,noiseEntryNums,noiseExitNums,useDestination=True): + route=(obj.startRouteID,obj.endRouteID) + if useDestination: + if route not in prototypes.keys(): + route= findRoute(prototypes,objects,route,obj.getNum(),noiseEntryNums,noiseExitNums) + return route class Interaction(moving.STObject): '''Class for an interaction between two road users @@ -124,17 +153,20 @@ minDistance[instant] = moving.MovingObject.minDistance(self.roadUser1, self.roadUser2, instant) self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[3], minDistance)) - def computeCrossingsCollisions(self, predictionParameters, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None, nProcesses = 1): + def computeCrossingsCollisions(self, predictionParameters, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None, nProcesses = 1,usePrototypes=True,route1= (-1,-1),route2=(-1,-1),prototypes={},secondStepPrototypes={},nMatching={},objects=[],noiseEntryNums=[],noiseExitNums=[],minSimilarity=0.1,mostMatched=None,useDestination=True,useSpeedPrototype=True,acceptPartialLength=30, step=1): '''Computes all crossing and collision points at each common instant for two road users. ''' self.collisionPoints={} self.crossingZones={} TTCs = {} + if usePrototypes: + route1= getRoute(self.roadUser1,prototypes,objects,noiseEntryNums,noiseExitNums,useDestination) + route2= getRoute(self.roadUser2,prototypes,objects,noiseEntryNums,noiseExitNums,useDestination) if timeInterval: commonTimeInterval = timeInterval else: commonTimeInterval = self.timeInterval - self.collisionPoints, self.crossingZones = prediction.computeCrossingsCollisions(predictionParameters, self.roadUser1, self.roadUser2, collisionDistanceThreshold, timeHorizon, computeCZ, debug, commonTimeInterval, nProcesses) + self.collisionPoints, self.crossingZones = prediction.computeCrossingsCollisions(predictionParameters, self.roadUser1, self.roadUser2, collisionDistanceThreshold, timeHorizon, computeCZ, debug, commonTimeInterval, nProcesses,usePrototypes,route1,route2,prototypes,secondStepPrototypes,nMatching,objects,noiseEntryNums,noiseExitNums,minSimilarity,mostMatched,useDestination,useSpeedPrototype,acceptPartialLength, step) for i, cp in self.collisionPoints.iteritems(): TTCs[i] = prediction.SafetyPoint.computeExpectedIndicator(cp) # add probability of collision, and probability of successful evasive action @@ -142,13 +174,12 @@ if computeCZ: pPETs = {} - for i in list(commonTimeInterval)[:-1]: - if len(self.crossingZones[i]) > 0: - pPETs[i] = prediction.SafetyPoint.computeExpectedIndicator(self.crossingZones[i]) + for i, cz in self.crossingZones.iteritems(): + pPETs[i] = prediction.SafetyPoint.computeExpectedIndicator(cz) self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[9], pPETs)) def addVideoFilename(self,videoFilename): - self.videoFilename= videoFilename + self.videoFilename= videoFilename def addInteractionType(self,interactionType): ''' interaction types: conflict or collision if they are known'''