comparison trafficintelligence/events.py @ 1266:ebb18043616e

work in progress on categorization
author Nicolas Saunier <nicolas.saunier@polymtl.ca>
date Tue, 28 May 2024 17:16:41 -0400
parents 39740c4668ac
children ad60e5adf084
comparison
equal deleted inserted replaced
1265:0f5bebd62a55 1266:ebb18043616e
51 ''' 51 '''
52 52
53 categories = {'headon': 0, 53 categories = {'headon': 0,
54 'rearend': 1, 54 'rearend': 1,
55 'side': 2, 55 'side': 2,
56 'parallel': 3} 56 'parallel': 3,
57 'stationary': 4}
57 58
58 indicatorNames = ['Collision Course Dot Product', 59 indicatorNames = ['Collision Course Dot Product',
59 'Collision Course Angle', 60 'Collision Course Angle',
60 'Distance', 61 'Distance',
61 'Minimum Distance', 62 'Minimum Distance',
106 elif roadUser1 is not None and roadUser2 is not None: 107 elif roadUser1 is not None and roadUser2 is not None:
107 self.roadUserNumbers = set([roadUser1.getNum(), roadUser2.getNum()]) 108 self.roadUserNumbers = set([roadUser1.getNum(), roadUser2.getNum()])
108 else: 109 else:
109 self.roadUserNumbers = None 110 self.roadUserNumbers = None
110 self.indicators = {} 111 self.indicators = {}
111 self.interactionInterval = None
112 # list for collison points and crossing zones 112 # list for collison points and crossing zones
113 self.collisionPoints = None 113 self.collisionPoints = None
114 self.crossingZones = None 114 self.crossingZones = None
115 115
116 def getRoadUserNumbers(self): 116 def getRoadUserNumbers(self):
200 collisionCourseDotProducts = {} 200 collisionCourseDotProducts = {}
201 collisionCourseAngles = {} 201 collisionCourseAngles = {}
202 velocityAngles = {} 202 velocityAngles = {}
203 distances = {} 203 distances = {}
204 speedDifferentials = {} 204 speedDifferentials = {}
205 interactionInstants = []
206 for instant in self.timeInterval: 205 for instant in self.timeInterval:
207 deltap = self.roadUser1.getPositionAtInstant(instant)-self.roadUser2.getPositionAtInstant(instant) 206 deltap = self.roadUser1.getPositionAtInstant(instant)-self.roadUser2.getPositionAtInstant(instant)
208 v1 = self.roadUser1.getVelocityAtInstant(instant) 207 v1 = self.roadUser1.getVelocityAtInstant(instant)
209 v2 = self.roadUser2.getVelocityAtInstant(instant) 208 v2 = self.roadUser2.getVelocityAtInstant(instant)
210 deltav = v2-v1 209 deltav = v2-v1
213 if v1Norm != 0. and v2Norm != 0.: 212 if v1Norm != 0. and v2Norm != 0.:
214 velocityAngles[instant] = np.arccos(max(-1, min(1, moving.Point.dot(v1, v2)/(v1Norm*v2Norm)))) 213 velocityAngles[instant] = np.arccos(max(-1, min(1, moving.Point.dot(v1, v2)/(v1Norm*v2Norm))))
215 collisionCourseDotProducts[instant] = moving.Point.dot(deltap, deltav) 214 collisionCourseDotProducts[instant] = moving.Point.dot(deltap, deltav)
216 distances[instant] = deltap.norm2() 215 distances[instant] = deltap.norm2()
217 speedDifferentials[instant] = deltav.norm2() 216 speedDifferentials[instant] = deltav.norm2()
218 if collisionCourseDotProducts[instant] > 0:
219 interactionInstants.append(instant)
220 if distances[instant] != 0 and speedDifferentials[instant] != 0: 217 if distances[instant] != 0 and speedDifferentials[instant] != 0:
221 collisionCourseAngles[instant] = np.arccos(max(-1, min(1, collisionCourseDotProducts[instant]/(distances[instant]*speedDifferentials[instant])))) # avoid values slightly higher than 1.0 218 collisionCourseAngles[instant] = np.arccos(max(-1, min(1, collisionCourseDotProducts[instant]/(distances[instant]*speedDifferentials[instant])))) # avoid values slightly higher than 1.0
222 219
223 if len(interactionInstants) >= 2:
224 self.interactionInterval = moving.TimeInterval(interactionInstants[0], interactionInstants[-1])
225 else:
226 self.interactionInterval = moving.TimeInterval()
227 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[0], collisionCourseDotProducts)) 220 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[0], collisionCourseDotProducts))
228 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[1], collisionCourseAngles)) 221 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[1], collisionCourseAngles))
229 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[2], distances, mostSevereIsMax = False)) 222 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[2], distances, mostSevereIsMax = False))
230 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[4], velocityAngles)) 223 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[4], velocityAngles))
231 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[5], speedDifferentials)) 224 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[5], speedDifferentials))
235 minDistances={} 228 minDistances={}
236 for instant in self.timeInterval: 229 for instant in self.timeInterval:
237 minDistances[instant] = moving.MovingObject.minDistance(self.roadUser1, self.roadUser2, instant) 230 minDistances[instant] = moving.MovingObject.minDistance(self.roadUser1, self.roadUser2, instant)
238 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[3], minDistances, mostSevereIsMax = False)) 231 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[3], minDistances, mostSevereIsMax = False))
239 232
240 def categorize(self, velocityAngleTolerance, parallelAngleTolerance, headonCollisionCourseAngleTolerance = None): 233 def categorize(self, velocityAngleTolerance, parallelAngleTolerance, headonCollisionCourseAngleTolerance = None, speedThreshold = 0.):
241 '''Computes the interaction category by instant 234 '''Computes the interaction category by instant
242 all 3 angle arguments in radian 235 all 3 angle arguments in radian
243 velocityAngleTolerance: indicates the angle threshold for rear and head on (180-velocityAngleTolerance), as well as the maximum collision course angle for head on (if headonCollisionCourseAngleTolerance is None) 236 velocityAngleTolerance: indicates the angle threshold for rear and head on (180-velocityAngleTolerance), as well as the maximum collision course angle for head on (if headonCollisionCourseAngleTolerance is None)
244 parallelAngleTolerance: indicates the angle between velocity vector (average for parallel) and position vector 237 parallelAngleTolerance: indicates the angle between velocity vector (average for parallel) and position vector
245 238
246 an instant may not be categorized if it matches the side definition (angle) 239 an instant may not be categorized if it matches the side definition (angle)
247 but the distance is growing (at least one user is probably past the point of trajectory crossing)''' 240 but the distance is growing (at least one user is probably past the point of trajectory crossing)'''
248 parallelAngleToleranceCosine = np.cos(parallelAngleTolerance) 241 parallelAngleToleranceCosine = np.cos(parallelAngleTolerance)
249 if headonCollisionCourseAngleTolerance is None: 242 if headonCollisionCourseAngleTolerance is None:
250 headonCollisionCourseAngleTolerance = velocityAngleTolerance 243 headonCollisionCourseAngleTolerance = velocityAngleTolerance
244 speedThreshold2 = speedThreshold**2
251 245
252 self.categories = {} 246 self.categories = {}
253 collisionCourseDotProducts = self.getIndicator(Interaction.indicatorNames[0]) 247 collisionCourseDotProducts = self.getIndicator(Interaction.indicatorNames[0])
254 collisionCourseAngles = self.getIndicator(Interaction.indicatorNames[1]) 248 collisionCourseAngles = self.getIndicator(Interaction.indicatorNames[1])
255 distances = self.getIndicator(Interaction.indicatorNames[2]) 249 distances = self.getIndicator(Interaction.indicatorNames[2])
256 velocityAngles = self.getIndicator(Interaction.indicatorNames[4]) 250 velocityAngles = self.getIndicator(Interaction.indicatorNames[4])
257 for instant in self.timeInterval: 251 for instant in self.timeInterval:
258 if velocityAngles[instant] < velocityAngleTolerance: # parallel or rear end 252 if instant in velocityAngles:
259 midVelocity = self.roadUser1.getVelocityAtInstant(instant) + self.roadUser2.getVelocityAtInstant(instant) 253 if velocityAngles[instant] < velocityAngleTolerance: # parallel or rear end
260 deltap = self.roadUser1.getPositionAtInstant(instant)-self.roadUser2.getPositionAtInstant(instant) 254 midVelocity = self.roadUser1.getVelocityAtInstant(instant) + self.roadUser2.getVelocityAtInstant(instant)
261 if abs(moving.Point.dot(midVelocity, deltap)/(midVelocity.norm2()*distances[instant])) < parallelAngleToleranceCosine: 255 deltap = self.roadUser1.getPositionAtInstant(instant)-self.roadUser2.getPositionAtInstant(instant)
262 self.categories[instant] = Interaction.categories["parallel"] 256 if abs(moving.Point.dot(midVelocity, deltap)/(midVelocity.norm2()*distances[instant])) < parallelAngleToleranceCosine:
263 else: 257 self.categories[instant] = Interaction.categories["parallel"]
264 self.categories[instant] = Interaction.categories["rearend"] 258 else:
265 elif velocityAngles[instant] > np.pi - velocityAngleTolerance and collisionCourseAngles[instant] < headonCollisionCourseAngleTolerance: # head on 259 self.categories[instant] = Interaction.categories["rearend"]
266 self.categories[instant] = Interaction.categories["headon"] 260 elif velocityAngles[instant] > np.pi - velocityAngleTolerance and collisionCourseAngles[instant] < headonCollisionCourseAngleTolerance: # head on
267 elif collisionCourseDotProducts[instant] > 0: 261 self.categories[instant] = Interaction.categories["headon"]
268 self.categories[instant] = Interaction.categories["side"] 262 elif collisionCourseDotProducts[instant] > 0:
263 self.categories[instant] = Interaction.categories["side"]
264 # true stationary is when object does not move for the whole period of the interaction, otherwise get last (or next) velocity vector for user orientation
265 # if instant not in self.categories: # if it's none of the other categories (could be with almost stationary vehicle) and only one speed is 0
266 # stationaryUser1 = self.roadUser1.getVelocityAtInstant(instant).norm2Squared() <= speedThreshold2
267 # stationaryUser2 = self.roadUser2.getVelocityAtInstant(instant).norm2Squared() <= speedThreshold2
268 # if stationaryUser1 != stationaryUser2 and collisionCourseDotProducts[instant] > 0: # only one is not moving
269 # self.categories[instant] = Interaction.categories["stationary"]
270 # leaving is not a good interaction category (issue in Etienne's 2022 paper): means we are past the situation in which users are approaching
271 # could try to predict what happened before, but it's not observed
272
269 273
270 def computeCrossingsCollisions(self, predictionParameters, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None): 274 def computeCrossingsCollisions(self, predictionParameters, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None):
271 '''Computes all crossing and collision points at each common instant for two road users. ''' 275 '''Computes all crossing and collision points at each common instant for two road users. '''
272 TTCs = {} 276 TTCs = {}
273 collisionProbabilities = {} 277 collisionProbabilities = {}