Mercurial Hosting > traffic-intelligence
comparison trafficintelligence/events.py @ 1266:ebb18043616e
work in progress on categorization
author | Nicolas Saunier <nicolas.saunier@polymtl.ca> |
---|---|
date | Tue, 28 May 2024 17:16:41 -0400 |
parents | 39740c4668ac |
children | ad60e5adf084 |
comparison
equal
deleted
inserted
replaced
1265:0f5bebd62a55 | 1266:ebb18043616e |
---|---|
51 ''' | 51 ''' |
52 | 52 |
53 categories = {'headon': 0, | 53 categories = {'headon': 0, |
54 'rearend': 1, | 54 'rearend': 1, |
55 'side': 2, | 55 'side': 2, |
56 'parallel': 3} | 56 'parallel': 3, |
57 'stationary': 4} | |
57 | 58 |
58 indicatorNames = ['Collision Course Dot Product', | 59 indicatorNames = ['Collision Course Dot Product', |
59 'Collision Course Angle', | 60 'Collision Course Angle', |
60 'Distance', | 61 'Distance', |
61 'Minimum Distance', | 62 'Minimum Distance', |
106 elif roadUser1 is not None and roadUser2 is not None: | 107 elif roadUser1 is not None and roadUser2 is not None: |
107 self.roadUserNumbers = set([roadUser1.getNum(), roadUser2.getNum()]) | 108 self.roadUserNumbers = set([roadUser1.getNum(), roadUser2.getNum()]) |
108 else: | 109 else: |
109 self.roadUserNumbers = None | 110 self.roadUserNumbers = None |
110 self.indicators = {} | 111 self.indicators = {} |
111 self.interactionInterval = None | |
112 # list for collison points and crossing zones | 112 # list for collison points and crossing zones |
113 self.collisionPoints = None | 113 self.collisionPoints = None |
114 self.crossingZones = None | 114 self.crossingZones = None |
115 | 115 |
116 def getRoadUserNumbers(self): | 116 def getRoadUserNumbers(self): |
200 collisionCourseDotProducts = {} | 200 collisionCourseDotProducts = {} |
201 collisionCourseAngles = {} | 201 collisionCourseAngles = {} |
202 velocityAngles = {} | 202 velocityAngles = {} |
203 distances = {} | 203 distances = {} |
204 speedDifferentials = {} | 204 speedDifferentials = {} |
205 interactionInstants = [] | |
206 for instant in self.timeInterval: | 205 for instant in self.timeInterval: |
207 deltap = self.roadUser1.getPositionAtInstant(instant)-self.roadUser2.getPositionAtInstant(instant) | 206 deltap = self.roadUser1.getPositionAtInstant(instant)-self.roadUser2.getPositionAtInstant(instant) |
208 v1 = self.roadUser1.getVelocityAtInstant(instant) | 207 v1 = self.roadUser1.getVelocityAtInstant(instant) |
209 v2 = self.roadUser2.getVelocityAtInstant(instant) | 208 v2 = self.roadUser2.getVelocityAtInstant(instant) |
210 deltav = v2-v1 | 209 deltav = v2-v1 |
213 if v1Norm != 0. and v2Norm != 0.: | 212 if v1Norm != 0. and v2Norm != 0.: |
214 velocityAngles[instant] = np.arccos(max(-1, min(1, moving.Point.dot(v1, v2)/(v1Norm*v2Norm)))) | 213 velocityAngles[instant] = np.arccos(max(-1, min(1, moving.Point.dot(v1, v2)/(v1Norm*v2Norm)))) |
215 collisionCourseDotProducts[instant] = moving.Point.dot(deltap, deltav) | 214 collisionCourseDotProducts[instant] = moving.Point.dot(deltap, deltav) |
216 distances[instant] = deltap.norm2() | 215 distances[instant] = deltap.norm2() |
217 speedDifferentials[instant] = deltav.norm2() | 216 speedDifferentials[instant] = deltav.norm2() |
218 if collisionCourseDotProducts[instant] > 0: | |
219 interactionInstants.append(instant) | |
220 if distances[instant] != 0 and speedDifferentials[instant] != 0: | 217 if distances[instant] != 0 and speedDifferentials[instant] != 0: |
221 collisionCourseAngles[instant] = np.arccos(max(-1, min(1, collisionCourseDotProducts[instant]/(distances[instant]*speedDifferentials[instant])))) # avoid values slightly higher than 1.0 | 218 collisionCourseAngles[instant] = np.arccos(max(-1, min(1, collisionCourseDotProducts[instant]/(distances[instant]*speedDifferentials[instant])))) # avoid values slightly higher than 1.0 |
222 | 219 |
223 if len(interactionInstants) >= 2: | |
224 self.interactionInterval = moving.TimeInterval(interactionInstants[0], interactionInstants[-1]) | |
225 else: | |
226 self.interactionInterval = moving.TimeInterval() | |
227 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[0], collisionCourseDotProducts)) | 220 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[0], collisionCourseDotProducts)) |
228 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[1], collisionCourseAngles)) | 221 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[1], collisionCourseAngles)) |
229 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[2], distances, mostSevereIsMax = False)) | 222 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[2], distances, mostSevereIsMax = False)) |
230 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[4], velocityAngles)) | 223 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[4], velocityAngles)) |
231 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[5], speedDifferentials)) | 224 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[5], speedDifferentials)) |
235 minDistances={} | 228 minDistances={} |
236 for instant in self.timeInterval: | 229 for instant in self.timeInterval: |
237 minDistances[instant] = moving.MovingObject.minDistance(self.roadUser1, self.roadUser2, instant) | 230 minDistances[instant] = moving.MovingObject.minDistance(self.roadUser1, self.roadUser2, instant) |
238 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[3], minDistances, mostSevereIsMax = False)) | 231 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[3], minDistances, mostSevereIsMax = False)) |
239 | 232 |
240 def categorize(self, velocityAngleTolerance, parallelAngleTolerance, headonCollisionCourseAngleTolerance = None): | 233 def categorize(self, velocityAngleTolerance, parallelAngleTolerance, headonCollisionCourseAngleTolerance = None, speedThreshold = 0.): |
241 '''Computes the interaction category by instant | 234 '''Computes the interaction category by instant |
242 all 3 angle arguments in radian | 235 all 3 angle arguments in radian |
243 velocityAngleTolerance: indicates the angle threshold for rear and head on (180-velocityAngleTolerance), as well as the maximum collision course angle for head on (if headonCollisionCourseAngleTolerance is None) | 236 velocityAngleTolerance: indicates the angle threshold for rear and head on (180-velocityAngleTolerance), as well as the maximum collision course angle for head on (if headonCollisionCourseAngleTolerance is None) |
244 parallelAngleTolerance: indicates the angle between velocity vector (average for parallel) and position vector | 237 parallelAngleTolerance: indicates the angle between velocity vector (average for parallel) and position vector |
245 | 238 |
246 an instant may not be categorized if it matches the side definition (angle) | 239 an instant may not be categorized if it matches the side definition (angle) |
247 but the distance is growing (at least one user is probably past the point of trajectory crossing)''' | 240 but the distance is growing (at least one user is probably past the point of trajectory crossing)''' |
248 parallelAngleToleranceCosine = np.cos(parallelAngleTolerance) | 241 parallelAngleToleranceCosine = np.cos(parallelAngleTolerance) |
249 if headonCollisionCourseAngleTolerance is None: | 242 if headonCollisionCourseAngleTolerance is None: |
250 headonCollisionCourseAngleTolerance = velocityAngleTolerance | 243 headonCollisionCourseAngleTolerance = velocityAngleTolerance |
244 speedThreshold2 = speedThreshold**2 | |
251 | 245 |
252 self.categories = {} | 246 self.categories = {} |
253 collisionCourseDotProducts = self.getIndicator(Interaction.indicatorNames[0]) | 247 collisionCourseDotProducts = self.getIndicator(Interaction.indicatorNames[0]) |
254 collisionCourseAngles = self.getIndicator(Interaction.indicatorNames[1]) | 248 collisionCourseAngles = self.getIndicator(Interaction.indicatorNames[1]) |
255 distances = self.getIndicator(Interaction.indicatorNames[2]) | 249 distances = self.getIndicator(Interaction.indicatorNames[2]) |
256 velocityAngles = self.getIndicator(Interaction.indicatorNames[4]) | 250 velocityAngles = self.getIndicator(Interaction.indicatorNames[4]) |
257 for instant in self.timeInterval: | 251 for instant in self.timeInterval: |
258 if velocityAngles[instant] < velocityAngleTolerance: # parallel or rear end | 252 if instant in velocityAngles: |
259 midVelocity = self.roadUser1.getVelocityAtInstant(instant) + self.roadUser2.getVelocityAtInstant(instant) | 253 if velocityAngles[instant] < velocityAngleTolerance: # parallel or rear end |
260 deltap = self.roadUser1.getPositionAtInstant(instant)-self.roadUser2.getPositionAtInstant(instant) | 254 midVelocity = self.roadUser1.getVelocityAtInstant(instant) + self.roadUser2.getVelocityAtInstant(instant) |
261 if abs(moving.Point.dot(midVelocity, deltap)/(midVelocity.norm2()*distances[instant])) < parallelAngleToleranceCosine: | 255 deltap = self.roadUser1.getPositionAtInstant(instant)-self.roadUser2.getPositionAtInstant(instant) |
262 self.categories[instant] = Interaction.categories["parallel"] | 256 if abs(moving.Point.dot(midVelocity, deltap)/(midVelocity.norm2()*distances[instant])) < parallelAngleToleranceCosine: |
263 else: | 257 self.categories[instant] = Interaction.categories["parallel"] |
264 self.categories[instant] = Interaction.categories["rearend"] | 258 else: |
265 elif velocityAngles[instant] > np.pi - velocityAngleTolerance and collisionCourseAngles[instant] < headonCollisionCourseAngleTolerance: # head on | 259 self.categories[instant] = Interaction.categories["rearend"] |
266 self.categories[instant] = Interaction.categories["headon"] | 260 elif velocityAngles[instant] > np.pi - velocityAngleTolerance and collisionCourseAngles[instant] < headonCollisionCourseAngleTolerance: # head on |
267 elif collisionCourseDotProducts[instant] > 0: | 261 self.categories[instant] = Interaction.categories["headon"] |
268 self.categories[instant] = Interaction.categories["side"] | 262 elif collisionCourseDotProducts[instant] > 0: |
263 self.categories[instant] = Interaction.categories["side"] | |
264 # true stationary is when object does not move for the whole period of the interaction, otherwise get last (or next) velocity vector for user orientation | |
265 # if instant not in self.categories: # if it's none of the other categories (could be with almost stationary vehicle) and only one speed is 0 | |
266 # stationaryUser1 = self.roadUser1.getVelocityAtInstant(instant).norm2Squared() <= speedThreshold2 | |
267 # stationaryUser2 = self.roadUser2.getVelocityAtInstant(instant).norm2Squared() <= speedThreshold2 | |
268 # if stationaryUser1 != stationaryUser2 and collisionCourseDotProducts[instant] > 0: # only one is not moving | |
269 # self.categories[instant] = Interaction.categories["stationary"] | |
270 # leaving is not a good interaction category (issue in Etienne's 2022 paper): means we are past the situation in which users are approaching | |
271 # could try to predict what happened before, but it's not observed | |
272 | |
269 | 273 |
270 def computeCrossingsCollisions(self, predictionParameters, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None): | 274 def computeCrossingsCollisions(self, predictionParameters, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None): |
271 '''Computes all crossing and collision points at each common instant for two road users. ''' | 275 '''Computes all crossing and collision points at each common instant for two road users. ''' |
272 TTCs = {} | 276 TTCs = {} |
273 collisionProbabilities = {} | 277 collisionProbabilities = {} |