changeset 1267:ad60e5adf084

cleaned interaction categorization and added stationary category
author Nicolas Saunier <nicolas.saunier@polymtl.ca>
date Wed, 29 May 2024 09:52:42 -0400
parents ebb18043616e
children 27b206d118b7
files trafficintelligence/events.py trafficintelligence/indicators.py trafficintelligence/tests/events.txt trafficintelligence/utils.py
diffstat 4 files changed, 38 insertions(+), 18 deletions(-) [+]
line wrap: on
line diff
--- a/trafficintelligence/events.py	Tue May 28 17:16:41 2024 -0400
+++ b/trafficintelligence/events.py	Wed May 29 09:52:42 2024 -0400
@@ -196,7 +196,7 @@
             print('Please set the interaction road user attributes roadUser1 and roadUser1 through the method setRoadUsers')
 
     def computeIndicators(self):
-        '''Computes the collision course cosine only if the cosine is positive'''
+        '''Computes all cinematic indicators but the expensive safety indicators (TTC, PET)'''
         collisionCourseDotProducts = {}
         collisionCourseAngles = {}
         velocityAngles = {}
@@ -233,12 +233,17 @@
     def categorize(self, velocityAngleTolerance, parallelAngleTolerance, headonCollisionCourseAngleTolerance = None, speedThreshold = 0.):
         '''Computes the interaction category by instant
         all 3 angle arguments in radian
-        velocityAngleTolerance: indicates the angle threshold for rear and head on (180-velocityAngleTolerance), as well as the maximum collision course angle for head on (if headonCollisionCourseAngleTolerance is None)
-        parallelAngleTolerance: indicates the angle between velocity vector (average for parallel) and position vector
+        velocityAngleTolerance: indicates the angle threshold for rear and head on (180-velocityAngleTolerance),
+        as well as the maximum collision course angle for head on (if headonCollisionCourseAngleTolerance is None)
+        parallelAngleTolerance: indicates the tolerance on the expected 90 deg angle
+        between velocity vector (average for parallel) and position vector for a parallel interaction
+        speedThreshold defines stationary users: a stationary interaction is between one moving and
+        one stationary user, and their distance decreases
 
         an instant may not be categorized if it matches the side definition (angle)
         but the distance is growing (at least one user is probably past the point of trajectory crossing)'''
-        parallelAngleToleranceCosine = np.cos(parallelAngleTolerance)
+        minParallelAngleCosine = np.cos(np.pi/2+parallelAngleTolerance)
+        maxParallelAngleCosine = np.cos(np.pi/2-parallelAngleTolerance)
         if headonCollisionCourseAngleTolerance is None:
             headonCollisionCourseAngleTolerance = velocityAngleTolerance
         speedThreshold2 = speedThreshold**2
@@ -249,11 +254,17 @@
         distances = self.getIndicator(Interaction.indicatorNames[2])
         velocityAngles = self.getIndicator(Interaction.indicatorNames[4])
         for instant in self.timeInterval:
-            if instant in velocityAngles:
+            stationaryUser1 = self.roadUser1.getVelocityAtInstant(instant).norm2Squared() <= speedThreshold2
+            stationaryUser2 = self.roadUser2.getVelocityAtInstant(instant).norm2Squared() <= speedThreshold2
+            if stationaryUser1 != stationaryUser2 and collisionCourseDotProducts[instant] > 0: # only one is not moving and is getting closer
+                self.categories[instant] = Interaction.categories["stationary"]
+                # true stationary would be all the times (parked, difficult without semantic knowledge of the scene
+                # alternatively, one could get the previous or next non zero velocity to identify user orientation
+            elif velocityAngles.existsAtInstant(instant):
                 if velocityAngles[instant] < velocityAngleTolerance: # parallel or rear end
                     midVelocity = self.roadUser1.getVelocityAtInstant(instant) + self.roadUser2.getVelocityAtInstant(instant)
                     deltap = self.roadUser1.getPositionAtInstant(instant)-self.roadUser2.getPositionAtInstant(instant)
-                    if abs(moving.Point.dot(midVelocity, deltap)/(midVelocity.norm2()*distances[instant])) < parallelAngleToleranceCosine:
+                    if minParallelAngleCosine < abs(moving.Point.dot(midVelocity, deltap)/(midVelocity.norm2()*distances[instant])) < maxParallelAngleCosine:
                         self.categories[instant] = Interaction.categories["parallel"]
                     else:
                         self.categories[instant] = Interaction.categories["rearend"]
@@ -262,12 +273,8 @@
                 elif collisionCourseDotProducts[instant] > 0:
                     self.categories[instant] = Interaction.categories["side"]
             # true stationary is when object does not move for the whole period of the interaction, otherwise get last (or next) velocity vector for user orientation
-            # if instant not in self.categories: # if it's none of the other categories (could be with almost stationary vehicle) and only one speed is 0
-            #     stationaryUser1 = self.roadUser1.getVelocityAtInstant(instant).norm2Squared() <= speedThreshold2
-            #     stationaryUser2 = self.roadUser2.getVelocityAtInstant(instant).norm2Squared() <= speedThreshold2
-            #     if stationaryUser1 != stationaryUser2 and collisionCourseDotProducts[instant] > 0: # only one is not moving
-            #         self.categories[instant] = Interaction.categories["stationary"]
-        # leaving is not a good interaction category (issue in Etienne's 2022 paper): means we are past the situation in which users are approaching
+        # leaving is not a good interaction category (issue in Etienne's 2022 paper):
+        # means we are past the situation in which users are approaching
         # could try to predict what happened before, but it's not observed
         
 
--- a/trafficintelligence/indicators.py	Tue May 28 17:16:41 2024 -0400
+++ b/trafficintelligence/indicators.py	Wed May 29 09:52:42 2024 -0400
@@ -85,6 +85,9 @@
     def getInstants(self):
         return list(self.values.keys())
 
+    def existsAtInstant(self, instant):
+        return instant in self.values
+
     def plot(self, options = '', xfactor = 1., yfactor = 1., timeShift = 0, **kwargs):
         if self.getTimeInterval().length() == 1:
             marker = 'o'
--- a/trafficintelligence/tests/events.txt	Tue May 28 17:16:41 2024 -0400
+++ b/trafficintelligence/tests/events.txt	Wed May 29 09:52:42 2024 -0400
@@ -59,7 +59,7 @@
 >>> o2 = MovingObject.generate(0, Point(100,1), Point(-1,0), TimeInterval(0,100))
 >>> inter12 = Interaction(roadUser1 = o1, roadUser2 = o2)
 >>> inter12.computeIndicators()
->>> inter12.categorize(pi*20/180, pi*60/180)
+>>> inter12.categorize(pi*20/180, pi*45/180)
 >>> Counter(inter12.categories.values()).most_common()[0][0] # head on
 0
 >>> inter12.categories[max(inter12.categories.keys())] # then side
@@ -67,7 +67,7 @@
 >>> o3 = MovingObject.generate(0, Point(0,2), Point(1,0), TimeInterval(0,100))
 >>> inter13 = Interaction(roadUser1 = o1, roadUser2 = o3)
 >>> inter13.computeIndicators()
->>> inter13.categorize(pi*20/180, pi*60/180)
+>>> inter13.categorize(pi*20/180, pi*45/180)
 >>> Counter(inter13.categories.values()).most_common()[0][0] # parallel
 3
 >>> len(Counter(inter13.categories.values()))
@@ -75,18 +75,28 @@
 >>> o4 = MovingObject.generate(0, Point(100,20), Point(-1,0), TimeInterval(0,100))
 >>> inter14 = Interaction(roadUser1 = o1, roadUser2 = o4)
 >>> inter14.computeIndicators()
->>> inter14.categorize(pi*20/180, pi*60/180)
+>>> inter14.categorize(pi*20/180, pi*45/180)
 >>> Counter(inter14.categories.values()).most_common()[0][0] # side
 2
->>> inter12.categories[0] # first head one
+>>> inter12.categories[0] # first head on
 0
 >>> inter12.categories[max(inter12.categories.keys())] # then side
 2
 >>> o5 = MovingObject.generate(0, Point(50,50), Point(0,-1), TimeInterval(0,100))
 >>> inter15 = Interaction(roadUser1 = o1, roadUser2 = o5)
 >>> inter15.computeIndicators()
->>> inter15.categorize(pi*20/180, pi*60/180)
+>>> inter15.categorize(pi*20/180, pi*45/180)
 >>> Counter(inter15.categories.values()).most_common()[0][0] # side
 2
 >>> len(Counter(inter15.categories.values()))
 1
+>>> o6 = MovingObject.generate(0, Point(50,1), Point(0,0), TimeInterval(0,100))
+>>> inter16 = Interaction(roadUser1 = o1, roadUser2 = o6)
+>>> inter16.computeIndicators()
+>>> inter16.categorize(pi*20/180, pi*45/180, speedThreshold = 0.1)
+>>> Counter(inter16.categories.values()).most_common()[0][0] # stationary
+4
+>>> 49 in inter16.categories
+True
+>>> 51 in inter16.categories # not stationary interaction past the user
+False
--- a/trafficintelligence/utils.py	Tue May 28 17:16:41 2024 -0400
+++ b/trafficintelligence/utils.py	Wed May 29 09:52:42 2024 -0400
@@ -138,7 +138,7 @@
         k = round(t.ppf(0.5+percentConfidence/200., nSamples-1), 2)
     e = k*stdev/sqrt(nSamples)
     if printLatex:
-        print('${0} \pm {1}\\frac{{{2}}}{{\sqrt{{{3}}}}}$'.format(mean, k, stdev, nSamples))
+        print('${0} \\pm {1}\\frac{{{2}}}{{\\sqrt{{{3}}}}}$'.format(mean, k, stdev, nSamples))
     return mean-e, mean+e
 
 def computeChi2(expected, observed):