Mercurial Hosting > traffic-intelligence
comparison python/events.py @ 607:84690dfe5560
add some functions for behaviour analysis
author | MohamedGomaa |
---|---|
date | Tue, 25 Nov 2014 22:49:47 -0500 |
parents | 07b1bd0785cd |
children | 0dc36203973d |
comparison
equal
deleted
inserted
replaced
606:75ad9c0d6cc3 | 607:84690dfe5560 |
---|---|
6 from numpy import arccos | 6 from numpy import arccos |
7 | 7 |
8 import multiprocessing | 8 import multiprocessing |
9 import itertools | 9 import itertools |
10 | 10 |
11 import sys | |
11 import moving, prediction, indicators, utils | 12 import moving, prediction, indicators, utils |
12 | 13 sys.path.append("D:/behaviourAnalysis/libs") |
14 import trajLearning | |
13 __metaclass__ = type | 15 __metaclass__ = type |
14 | 16 |
15 class Interaction(moving.STObject): | 17 class Interaction(moving.STObject): |
16 '''Class for an interaction between two road users | 18 '''Class for an interaction between two road users |
17 or a road user and an obstacle | 19 or a road user and an obstacle |
145 for i in list(commonTimeInterval)[:-1]: | 147 for i in list(commonTimeInterval)[:-1]: |
146 if len(self.crossingZones[i]) > 0: | 148 if len(self.crossingZones[i]) > 0: |
147 pPETs[i] = prediction.SafetyPoint.computeExpectedIndicator(self.crossingZones[i]) | 149 pPETs[i] = prediction.SafetyPoint.computeExpectedIndicator(self.crossingZones[i]) |
148 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[9], pPETs)) | 150 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[9], pPETs)) |
149 | 151 |
152 def computeCrosssingCollisionsPrototypeAtInstant(self, instant,route1,route2,predictionParameters, collisionDistanceThreshold, timeHorizon, prototypes,secondStepPrototypes,nMatching,objects,noiseEntryNums,noiseExitNums,minSimilarity=0.1,mostMatched=None, computeCZ = False, debug = False,useDestination=True,useSpeedPrototype=True): | |
153 inter1=moving.Interval(self.roadUser1.timeInterval.first,instant) | |
154 inter2=moving.Interval(self.roadUser2.timeInterval.first,instant) | |
155 partialObjPositions1= self.roadUser1.getObjectInTimeInterval(inter1).positions | |
156 partialObjPositions2= self.roadUser2.getObjectInTimeInterval(inter2).positions | |
157 if useSpeedPrototype: | |
158 prototypeTrajectories1=trajLearning.findPrototypesSpeed(prototypes,secondStepPrototypes,nMatching,objects,route1,partialObjPositions1,noiseEntryNums,noiseExitNums,minSimilarity,mostMatched,useDestination) | |
159 prototypeTrajectories2=trajLearning.findPrototypesSpeed(prototypes,secondStepPrototypes,nMatching,objects,route2,partialObjPositions2,noiseEntryNums,noiseExitNums,minSimilarity,mostMatched,useDestination) | |
160 else: | |
161 prototypeTrajectories1=trajLearning.findPrototypes(prototypes,nMatching,objects,route1,partialObjPositions1,noiseEntryNums,noiseExitNums,minSimilarity,mostMatched) | |
162 prototypeTrajectories2=trajLearning.findPrototypes(prototypes,nMatching,objects,route2,partialObjPositions2,noiseEntryNums,noiseExitNums,minSimilarity,mostMatched) | |
163 if prototypeTrajectories1=={}: | |
164 print self.roadUser1.num, 'is abnormal at instant', str(instant) | |
165 return [],[] | |
166 elif prototypeTrajectories2=={}: | |
167 print self.roadUser2.num, 'is abnormal at instant', str(instant) | |
168 return [],[] | |
169 else: | |
170 currentInstant,collisionPoints, crossingZones = predictionParameters.computeCrossingsCollisionsAtInstant(instant, self.roadUser1, self.roadUser2, collisionDistanceThreshold, timeHorizon, computeCZ, debug,prototypeTrajectories1,prototypeTrajectories2) | |
171 return collisionPoints,crossingZones | |
172 | |
173 def computeCrossingsCollisionsPrototype2stages(self, predictionParameters, collisionDistanceThreshold, timeHorizon, prototypes,secondStepPrototypes,nMatching,objects,noiseEntryNums,noiseExitNums,step=10,minSimilarity=0.1,mostMatched=None, computeCZ = False, debug = False, timeInterval = None,acceptPartialLength=30,useDestination=True,useSpeedPrototype=True): | |
174 '''Computes all crossing and collision points at each common instant for two road users. ''' | |
175 self.collisionPoints={} | |
176 self.crossingZones={} | |
177 TTCs = {} | |
178 route1=(self.roadUser1.startRouteID,self.roadUser1.endRouteID) | |
179 route2=(self.roadUser2.startRouteID,self.roadUser2.endRouteID) | |
180 if useDestination: | |
181 if route1 not in prototypes.keys(): | |
182 route1= trajLearning.findRoute(prototypes,objects,route1,self.roadUser1.num,noiseEntryNums,noiseExitNums) | |
183 if route2 not in prototypes.keys(): | |
184 route2= trajLearning.findRoute(prototypes,objects,route2,self.roadUser2.num,noiseEntryNums,noiseExitNums) | |
185 | |
186 if timeInterval: | |
187 commonTimeInterval = timeInterval | |
188 else: | |
189 commonTimeInterval = self.timeInterval | |
190 reCompute=False | |
191 for i in xrange(commonTimeInterval.first,commonTimeInterval.last,step): # incremental calculation of CP,CZ to save time | |
192 if i-self.roadUser1.timeInterval.first >= acceptPartialLength and i-self.roadUser2.timeInterval.first >= acceptPartialLength: | |
193 self.collisionPoints[i], self.crossingZones[i] = self.computeCrosssingCollisionsPrototypeAtInstant(i,route1,route2,predictionParameters, collisionDistanceThreshold, timeHorizon, prototypes,secondStepPrototypes,nMatching,objects,noiseEntryNums,noiseExitNums,minSimilarity,mostMatched, computeCZ, debug,useDestination,useSpeedPrototype) | |
194 if len(self.collisionPoints[i]) >0 or len(self.crossingZones[i])>0: | |
195 reCompute=True | |
196 break | |
197 if reCompute: | |
198 for i in list(commonTimeInterval)[:-1]: # do not look at the 1 last position/velocities, often with errors | |
199 if i-self.roadUser1.timeInterval.first >= acceptPartialLength and i-self.roadUser2.timeInterval.first >= acceptPartialLength: | |
200 self.collisionPoints[i], self.crossingZones[i] = self.computeCrosssingCollisionsPrototypeAtInstant(i,route1,route2,predictionParameters, collisionDistanceThreshold, timeHorizon, prototypes,secondStepPrototypes,nMatching,objects,noiseEntryNums,noiseExitNums,minSimilarity,mostMatched, computeCZ, debug,useDestination,useSpeedPrototype) | |
201 if len(self.collisionPoints[i]) > 0: | |
202 TTCs[i] = prediction.SafetyPoint.computeExpectedIndicator(self.collisionPoints[i]) | |
203 # add probability of collision, and probability of successful evasive action | |
204 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[7], TTCs)) | |
205 | |
206 if computeCZ: | |
207 pPETs = {} | |
208 for i in list(commonTimeInterval)[:-1]: | |
209 if i in self.crossingZones.keys() and len(self.crossingZones[i]) > 0: | |
210 pPETs[i] = prediction.SafetyPoint.computeExpectedIndicator(self.crossingZones[i]) | |
211 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[9], pPETs)) | |
212 | |
150 def addVideoFilename(self,videoFilename): | 213 def addVideoFilename(self,videoFilename): |
151 self.videoFilename= videoFilename | 214 self.videoFilename= videoFilename |
152 | 215 |
153 def addInteractionType(self,interactionType): | 216 def addInteractionType(self,interactionType): |
154 ''' interaction types: conflict or collision if they are known''' | 217 ''' interaction types: conflict or collision if they are known''' |