Mercurial Hosting > traffic-intelligence
diff python/events.py @ 607:84690dfe5560
add some functions for behaviour analysis
author | MohamedGomaa |
---|---|
date | Tue, 25 Nov 2014 22:49:47 -0500 |
parents | 07b1bd0785cd |
children | 0dc36203973d |
line wrap: on
line diff
--- a/python/events.py Mon Nov 24 13:02:10 2014 -0500 +++ b/python/events.py Tue Nov 25 22:49:47 2014 -0500 @@ -8,8 +8,10 @@ import multiprocessing import itertools +import sys import moving, prediction, indicators, utils - +sys.path.append("D:/behaviourAnalysis/libs") +import trajLearning __metaclass__ = type class Interaction(moving.STObject): @@ -147,6 +149,67 @@ pPETs[i] = prediction.SafetyPoint.computeExpectedIndicator(self.crossingZones[i]) self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[9], pPETs)) + def computeCrosssingCollisionsPrototypeAtInstant(self, instant,route1,route2,predictionParameters, collisionDistanceThreshold, timeHorizon, prototypes,secondStepPrototypes,nMatching,objects,noiseEntryNums,noiseExitNums,minSimilarity=0.1,mostMatched=None, computeCZ = False, debug = False,useDestination=True,useSpeedPrototype=True): + inter1=moving.Interval(self.roadUser1.timeInterval.first,instant) + inter2=moving.Interval(self.roadUser2.timeInterval.first,instant) + partialObjPositions1= self.roadUser1.getObjectInTimeInterval(inter1).positions + partialObjPositions2= self.roadUser2.getObjectInTimeInterval(inter2).positions + if useSpeedPrototype: + prototypeTrajectories1=trajLearning.findPrototypesSpeed(prototypes,secondStepPrototypes,nMatching,objects,route1,partialObjPositions1,noiseEntryNums,noiseExitNums,minSimilarity,mostMatched,useDestination) + prototypeTrajectories2=trajLearning.findPrototypesSpeed(prototypes,secondStepPrototypes,nMatching,objects,route2,partialObjPositions2,noiseEntryNums,noiseExitNums,minSimilarity,mostMatched,useDestination) + else: + prototypeTrajectories1=trajLearning.findPrototypes(prototypes,nMatching,objects,route1,partialObjPositions1,noiseEntryNums,noiseExitNums,minSimilarity,mostMatched) + prototypeTrajectories2=trajLearning.findPrototypes(prototypes,nMatching,objects,route2,partialObjPositions2,noiseEntryNums,noiseExitNums,minSimilarity,mostMatched) + if prototypeTrajectories1=={}: + print self.roadUser1.num, 'is abnormal at instant', str(instant) + return [],[] + elif prototypeTrajectories2=={}: + print self.roadUser2.num, 'is abnormal at instant', str(instant) + return [],[] + else: + currentInstant,collisionPoints, crossingZones = predictionParameters.computeCrossingsCollisionsAtInstant(instant, self.roadUser1, self.roadUser2, collisionDistanceThreshold, timeHorizon, computeCZ, debug,prototypeTrajectories1,prototypeTrajectories2) + return collisionPoints,crossingZones + + def computeCrossingsCollisionsPrototype2stages(self, predictionParameters, collisionDistanceThreshold, timeHorizon, prototypes,secondStepPrototypes,nMatching,objects,noiseEntryNums,noiseExitNums,step=10,minSimilarity=0.1,mostMatched=None, computeCZ = False, debug = False, timeInterval = None,acceptPartialLength=30,useDestination=True,useSpeedPrototype=True): + '''Computes all crossing and collision points at each common instant for two road users. ''' + self.collisionPoints={} + self.crossingZones={} + TTCs = {} + route1=(self.roadUser1.startRouteID,self.roadUser1.endRouteID) + route2=(self.roadUser2.startRouteID,self.roadUser2.endRouteID) + if useDestination: + if route1 not in prototypes.keys(): + route1= trajLearning.findRoute(prototypes,objects,route1,self.roadUser1.num,noiseEntryNums,noiseExitNums) + if route2 not in prototypes.keys(): + route2= trajLearning.findRoute(prototypes,objects,route2,self.roadUser2.num,noiseEntryNums,noiseExitNums) + + if timeInterval: + commonTimeInterval = timeInterval + else: + commonTimeInterval = self.timeInterval + reCompute=False + for i in xrange(commonTimeInterval.first,commonTimeInterval.last,step): # incremental calculation of CP,CZ to save time + if i-self.roadUser1.timeInterval.first >= acceptPartialLength and i-self.roadUser2.timeInterval.first >= acceptPartialLength: + self.collisionPoints[i], self.crossingZones[i] = self.computeCrosssingCollisionsPrototypeAtInstant(i,route1,route2,predictionParameters, collisionDistanceThreshold, timeHorizon, prototypes,secondStepPrototypes,nMatching,objects,noiseEntryNums,noiseExitNums,minSimilarity,mostMatched, computeCZ, debug,useDestination,useSpeedPrototype) + if len(self.collisionPoints[i]) >0 or len(self.crossingZones[i])>0: + reCompute=True + break + if reCompute: + for i in list(commonTimeInterval)[:-1]: # do not look at the 1 last position/velocities, often with errors + if i-self.roadUser1.timeInterval.first >= acceptPartialLength and i-self.roadUser2.timeInterval.first >= acceptPartialLength: + self.collisionPoints[i], self.crossingZones[i] = self.computeCrosssingCollisionsPrototypeAtInstant(i,route1,route2,predictionParameters, collisionDistanceThreshold, timeHorizon, prototypes,secondStepPrototypes,nMatching,objects,noiseEntryNums,noiseExitNums,minSimilarity,mostMatched, computeCZ, debug,useDestination,useSpeedPrototype) + if len(self.collisionPoints[i]) > 0: + TTCs[i] = prediction.SafetyPoint.computeExpectedIndicator(self.collisionPoints[i]) + # add probability of collision, and probability of successful evasive action + self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[7], TTCs)) + + if computeCZ: + pPETs = {} + for i in list(commonTimeInterval)[:-1]: + if i in self.crossingZones.keys() and len(self.crossingZones[i]) > 0: + pPETs[i] = prediction.SafetyPoint.computeExpectedIndicator(self.crossingZones[i]) + self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[9], pPETs)) + def addVideoFilename(self,videoFilename): self.videoFilename= videoFilename