Mercurial Hosting > traffic-intelligence
changeset 1266:ebb18043616e
work in progress on categorization
author | Nicolas Saunier <nicolas.saunier@polymtl.ca> |
---|---|
date | Tue, 28 May 2024 17:16:41 -0400 |
parents | 0f5bebd62a55 |
children | ad60e5adf084 |
files | trafficintelligence/events.py |
diffstat | 1 files changed, 25 insertions(+), 21 deletions(-) [+] |
line wrap: on
line diff
--- a/trafficintelligence/events.py Fri May 24 16:15:38 2024 -0400 +++ b/trafficintelligence/events.py Tue May 28 17:16:41 2024 -0400 @@ -53,7 +53,8 @@ categories = {'headon': 0, 'rearend': 1, 'side': 2, - 'parallel': 3} + 'parallel': 3, + 'stationary': 4} indicatorNames = ['Collision Course Dot Product', 'Collision Course Angle', @@ -108,7 +109,6 @@ else: self.roadUserNumbers = None self.indicators = {} - self.interactionInterval = None # list for collison points and crossing zones self.collisionPoints = None self.crossingZones = None @@ -202,7 +202,6 @@ velocityAngles = {} distances = {} speedDifferentials = {} - interactionInstants = [] for instant in self.timeInterval: deltap = self.roadUser1.getPositionAtInstant(instant)-self.roadUser2.getPositionAtInstant(instant) v1 = self.roadUser1.getVelocityAtInstant(instant) @@ -215,15 +214,9 @@ collisionCourseDotProducts[instant] = moving.Point.dot(deltap, deltav) distances[instant] = deltap.norm2() speedDifferentials[instant] = deltav.norm2() - if collisionCourseDotProducts[instant] > 0: - interactionInstants.append(instant) if distances[instant] != 0 and speedDifferentials[instant] != 0: collisionCourseAngles[instant] = np.arccos(max(-1, min(1, collisionCourseDotProducts[instant]/(distances[instant]*speedDifferentials[instant])))) # avoid values slightly higher than 1.0 - if len(interactionInstants) >= 2: - self.interactionInterval = moving.TimeInterval(interactionInstants[0], interactionInstants[-1]) - else: - self.interactionInterval = moving.TimeInterval() self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[0], collisionCourseDotProducts)) self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[1], collisionCourseAngles)) self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[2], distances, mostSevereIsMax = False)) @@ -237,7 +230,7 @@ minDistances[instant] = moving.MovingObject.minDistance(self.roadUser1, self.roadUser2, instant) self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[3], minDistances, mostSevereIsMax = False)) - def categorize(self, velocityAngleTolerance, parallelAngleTolerance, headonCollisionCourseAngleTolerance = None): + def categorize(self, velocityAngleTolerance, parallelAngleTolerance, headonCollisionCourseAngleTolerance = None, speedThreshold = 0.): '''Computes the interaction category by instant all 3 angle arguments in radian velocityAngleTolerance: indicates the angle threshold for rear and head on (180-velocityAngleTolerance), as well as the maximum collision course angle for head on (if headonCollisionCourseAngleTolerance is None) @@ -248,6 +241,7 @@ parallelAngleToleranceCosine = np.cos(parallelAngleTolerance) if headonCollisionCourseAngleTolerance is None: headonCollisionCourseAngleTolerance = velocityAngleTolerance + speedThreshold2 = speedThreshold**2 self.categories = {} collisionCourseDotProducts = self.getIndicator(Interaction.indicatorNames[0]) @@ -255,17 +249,27 @@ distances = self.getIndicator(Interaction.indicatorNames[2]) velocityAngles = self.getIndicator(Interaction.indicatorNames[4]) for instant in self.timeInterval: - if velocityAngles[instant] < velocityAngleTolerance: # parallel or rear end - midVelocity = self.roadUser1.getVelocityAtInstant(instant) + self.roadUser2.getVelocityAtInstant(instant) - deltap = self.roadUser1.getPositionAtInstant(instant)-self.roadUser2.getPositionAtInstant(instant) - if abs(moving.Point.dot(midVelocity, deltap)/(midVelocity.norm2()*distances[instant])) < parallelAngleToleranceCosine: - self.categories[instant] = Interaction.categories["parallel"] - else: - self.categories[instant] = Interaction.categories["rearend"] - elif velocityAngles[instant] > np.pi - velocityAngleTolerance and collisionCourseAngles[instant] < headonCollisionCourseAngleTolerance: # head on - self.categories[instant] = Interaction.categories["headon"] - elif collisionCourseDotProducts[instant] > 0: - self.categories[instant] = Interaction.categories["side"] + if instant in velocityAngles: + if velocityAngles[instant] < velocityAngleTolerance: # parallel or rear end + midVelocity = self.roadUser1.getVelocityAtInstant(instant) + self.roadUser2.getVelocityAtInstant(instant) + deltap = self.roadUser1.getPositionAtInstant(instant)-self.roadUser2.getPositionAtInstant(instant) + if abs(moving.Point.dot(midVelocity, deltap)/(midVelocity.norm2()*distances[instant])) < parallelAngleToleranceCosine: + self.categories[instant] = Interaction.categories["parallel"] + else: + self.categories[instant] = Interaction.categories["rearend"] + elif velocityAngles[instant] > np.pi - velocityAngleTolerance and collisionCourseAngles[instant] < headonCollisionCourseAngleTolerance: # head on + self.categories[instant] = Interaction.categories["headon"] + elif collisionCourseDotProducts[instant] > 0: + self.categories[instant] = Interaction.categories["side"] + # true stationary is when object does not move for the whole period of the interaction, otherwise get last (or next) velocity vector for user orientation + # if instant not in self.categories: # if it's none of the other categories (could be with almost stationary vehicle) and only one speed is 0 + # stationaryUser1 = self.roadUser1.getVelocityAtInstant(instant).norm2Squared() <= speedThreshold2 + # stationaryUser2 = self.roadUser2.getVelocityAtInstant(instant).norm2Squared() <= speedThreshold2 + # if stationaryUser1 != stationaryUser2 and collisionCourseDotProducts[instant] > 0: # only one is not moving + # self.categories[instant] = Interaction.categories["stationary"] + # leaving is not a good interaction category (issue in Etienne's 2022 paper): means we are past the situation in which users are approaching + # could try to predict what happened before, but it's not observed + def computeCrossingsCollisions(self, predictionParameters, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None): '''Computes all crossing and collision points at each common instant for two road users. '''