changeset 1269:ca70a79688ae

adding a speed threshold to avoid computing TTC at very low speeds
author Nicolas Saunier <nicolas.saunier@polymtl.ca>
date Wed, 05 Jun 2024 10:12:43 -0400
parents 27b206d118b7
children 20a5e1292321
files trafficintelligence/events.py trafficintelligence/prediction.py trafficintelligence/tests/events.txt
diffstat 3 files changed, 32 insertions(+), 12 deletions(-) [+]
line wrap: on
line diff
--- a/trafficintelligence/events.py	Fri May 31 16:52:51 2024 -0400
+++ b/trafficintelligence/events.py	Wed Jun 05 10:12:43 2024 -0400
@@ -278,15 +278,17 @@
         # could try to predict what happened before, but it's not observed
         
 
-    def computeCrossingsCollisions(self, predictionParameters, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None):
-        '''Computes all crossing and collision points at each common instant for two road users. '''
+    def computeCrossingsCollisions(self, predictionParameters, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None, speedThreshold = 0.):
+        '''Computes all crossing and collision points at each common instant for two road users.
+
+        speedThreshold defines when users are stationary: TTC is not computed when both users are stationary'''
         TTCs = {}
         collisionProbabilities = {}
         if timeInterval is not None:
             commonTimeInterval = timeInterval
         else:
             commonTimeInterval = self.timeInterval
-        self.collisionPoints, crossingZones = predictionParameters.computeCrossingsCollisions(self.roadUser1, self.roadUser2, collisionDistanceThreshold, timeHorizon, computeCZ, debug, commonTimeInterval)
+        self.collisionPoints, crossingZones = predictionParameters.computeCrossingsCollisions(self.roadUser1, self.roadUser2, collisionDistanceThreshold, timeHorizon, computeCZ, debug, commonTimeInterval, speedThreshold)
         for i, cps in self.collisionPoints.items():
             TTCs[i] = prediction.SafetyPoint.computeExpectedIndicator(cps)
             collisionProbabilities[i] = sum([p.probability for p in cps])
--- a/trafficintelligence/prediction.py	Fri May 31 16:52:51 2024 -0400
+++ b/trafficintelligence/prediction.py	Wed Jun 05 10:12:43 2024 -0400
@@ -355,8 +355,9 @@
 
         return collisionPoints, crossingZones
 
-    def computeCrossingsCollisions(self, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None):#, nProcesses = 1):
-        '''Computes all crossing and collision points at each common instant for two road users. '''
+    def computeCrossingsCollisions(self, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None, speedThreshold = 0.):
+        '''Computes all crossing and collision points at each common instant for two road users. 
+        No movement prediction below a certain speedThreshold for both objects'''
         collisionPoints = {}
         if computeCZ:
             crossingZones = {}
@@ -366,13 +367,14 @@
             commonTimeInterval = timeInterval
         else:
             commonTimeInterval = obj1.commonTimeInterval(obj2)
-        #if nProcesses == 1:
-        for i in list(commonTimeInterval)[:-1]: # do not look at the 1 last position/velocities, often with errors
-            cp, cz = self.computeCrossingsCollisionsAtInstant(i, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ, debug)
-            if len(cp) != 0:
-                collisionPoints[i] = cp
-            if computeCZ and len(cz) != 0:
-                crossingZones[i] = cz
+        speedThreshold2 = max(0,speedThreshold)**2
+        for t in list(commonTimeInterval)[:-1]: # do not look at the 1 last position/velocities, often with errors
+            if obj1.getVelocityAtInstant(t).norm2Squared() > speedThreshold2 and obj2.getVelocityAtInstant(t).norm2Squared() > speedThreshold2:
+                cp, cz = self.computeCrossingsCollisionsAtInstant(t, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ, debug)
+                if len(cp) != 0:
+                    collisionPoints[t] = cp
+                if computeCZ and len(cz) != 0:
+                    crossingZones[t] = cz
         return collisionPoints, crossingZones
 
     def computeCollisionProbability(self, obj1, obj2, collisionDistanceThreshold, timeHorizon, debug = False, timeInterval = None):
--- a/trafficintelligence/tests/events.txt	Fri May 31 16:52:51 2024 -0400
+++ b/trafficintelligence/tests/events.txt	Wed Jun 05 10:12:43 2024 -0400
@@ -52,6 +52,22 @@
 >>> inter.getIndicator(Interaction.indicatorNames[1])[6] # doctest:+ELLIPSIS
 3.1415...
 
+# test low speed
+>>> inter = Interaction(roadUser1 = o1, roadUser2 = o2)
+>>> inter.computeIndicators()
+>>> predictionParams = ConstantPredictionParameters()
+>>> inter.computeCrossingsCollisions(predictionParams, 0.1, 10, speedThreshold = 1.)
+>>> inter.getIndicator("Time to Collision") is None
+True
+
+# 0 speed users
+>>> inter = Interaction(roadUser1 = MovingObject.generate(1, Point(-5.,0.), Point(0.,0.), TimeInterval(0,10)), roadUser2 = MovingObject.generate(2, Point(0.,-5.), Point(0.,0.), TimeInterval(0,10)))
+>>> inter.computeIndicators()
+>>> predictionParams = ConstantPredictionParameters()
+>>> inter.computeCrossingsCollisions(predictionParams, 0.1, 10)
+>>> inter.getIndicator("Time to Collision") is None
+True
+
 # test categorize
 >>> from collections import Counter
 >>> from numpy import pi