changeset 1252:fe35473acee3

adding method to compute PET using polygon for the outline of a vehicle (bird eye view of the vehicle)
author Nicolas Saunier <nicolas.saunier@polymtl.ca>
date Fri, 22 Mar 2024 14:33:25 -0400
parents 2b1c8fe8f7e4
children ef68d4ba7dae
files scripts/safety-analysis.py trafficintelligence/events.py trafficintelligence/moving.py trafficintelligence/tests/moving.txt
diffstat 4 files changed, 34 insertions(+), 14 deletions(-) [+]
line wrap: on
line diff
--- a/scripts/safety-analysis.py	Fri Mar 15 17:05:54 2024 -0400
+++ b/scripts/safety-analysis.py	Fri Mar 22 14:33:25 2024 -0400
@@ -90,11 +90,11 @@
 
 interactions = events.createInteractions(objects)
 if args.nProcesses == 1:
-    processed = events.computeIndicators(interactions, not args.noMotionPrediction, args.computePET, predictionParameters, params.collisionDistance, params.predictionTimeHorizon, params.crossingZones, False, None)
+    processed = events.computeIndicators(interactions, not args.noMotionPrediction, args.computePET, predictionParameters, params.collisionDistance, False, params.predictionTimeHorizon, params.crossingZones, False, None)
 else:
     pool = Pool(processes = args.nProcesses)
     nInteractionPerProcess = int(np.ceil(len(interactions)/float(args.nProcesses)))
-    jobs = [pool.apply_async(events.computeIndicators, args = (interactions[i*nInteractionPerProcess:(i+1)*nInteractionPerProcess], not args.noMotionPrediction, args.computePET, predictionParameters, params.collisionDistance, params.predictionTimeHorizon, params.crossingZones, False, None)) for i in range(args.nProcesses)]
+    jobs = [pool.apply_async(events.computeIndicators, args = (interactions[i*nInteractionPerProcess:(i+1)*nInteractionPerProcess], not args.noMotionPrediction, args.computePET, predictionParameters, params.collisionDistance, False, params.predictionTimeHorizon, params.crossingZones, False, None)) for i in range(args.nProcesses)]
     processed = []
     for job in jobs:
         processed += job.get()
--- a/trafficintelligence/events.py	Fri Mar 15 17:05:54 2024 -0400
+++ b/trafficintelligence/events.py	Fri Mar 22 14:33:25 2024 -0400
@@ -277,8 +277,9 @@
             self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[9], pPETs, mostSevereIsMax=False))
         # TODO add probability of collision, and probability of successful evasive action
 
-    def computePET(self, collisionDistanceThreshold):
-        pet, t1, t2=  moving.MovingObject.computePET(self.roadUser1, self.roadUser2, collisionDistanceThreshold)
+    def computePET(self, collisionDistanceThreshold, computePetWithBoundingPoly):
+        'Warning: when computing PET from interactions, there could be PETs between objects that do not coexist and therefore are not considered interactions'
+        pet, t1, t2=  moving.MovingObject.computePET(self.roadUser1, self.roadUser2, collisionDistanceThreshold, computePetWithBoundingPoly)
         if pet is not None:
             self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[10], {min(t1, t2): pet}, mostSevereIsMax = False))
 
@@ -325,14 +326,14 @@
     else:
         return None
 
-def computeIndicators(interactions, computeMotionPrediction, computePET, predictionParameters, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None):
+def computeIndicators(interactions, computeMotionPrediction, computePET, predictionParameters, collisionDistanceThreshold, computePetWithBoundingPoly, timeHorizon, computeCZ = False, debug = False, timeInterval = None):
     for inter in interactions:
         print('processing interaction {}'.format(inter.getNum())) # logging.debug('processing interaction {}'.format(inter.getNum()))
         inter.computeIndicators()
         if computeMotionPrediction:
             inter.computeCrossingsCollisions(predictionParameters, collisionDistanceThreshold, timeHorizon, computeCZ, debug, timeInterval)
         if computePET:
-            inter.computePET(collisionDistanceThreshold)
+            inter.computePET(collisionDistanceThreshold, computePetWithBoundingPoly)
     return interactions
     
 def aggregateSafetyPoints(interactions, pointType = 'collision'):
--- a/trafficintelligence/moving.py	Fri Mar 15 17:05:54 2024 -0400
+++ b/trafficintelligence/moving.py	Fri Mar 22 14:33:25 2024 -0400
@@ -4,7 +4,7 @@
 import copy
 from math import sqrt, atan2, cos, sin, inf
 
-from numpy import median, mean, array, arange, zeros, ones, hypot, NaN, std, floor, ceil, float32, argwhere, minimum,  issubdtype, integer as npinteger, percentile
+from numpy import median, mean, array, arange, zeros, ones, hypot, NaN, std, floor, ceil, float32, argwhere, minimum,  issubdtype, integer as npinteger, percentile, full
 from matplotlib.pyplot import plot, text, arrow
 from scipy.spatial.distance import cdist
 from scipy.signal import savgol_filter
@@ -1938,21 +1938,35 @@
                 self.prototypeSimilarities.append(similarities/minimum(arange(1., len(similarities)+1), proto.getMovingObject().length()*ones(len(similarities))))
 
     @staticmethod
-    def computePET(obj1, obj2, collisionDistanceThreshold):
+    def computePET(obj1, obj2, collisionDistanceThreshold = None, useBoundingPoly = False):
         '''Post-encroachment time based on distance threshold
 
         Returns the smallest time difference when the object positions are within collisionDistanceThreshold
         and the instants at which each object is passing through its corresponding position'''
-        positions1 = [p.astuple() for p in obj1.getPositions()]
-        positions2 = [p.astuple() for p in obj2.getPositions()]
-        n1 = len(positions1)
-        n2 = len(positions2)
+        n1 = int(obj1.length())
+        n2 = int(obj2.length())
         pets = zeros((n1, n2))
         for i,t1 in enumerate(obj1.getTimeInterval()):
             for j,t2 in enumerate(obj2.getTimeInterval()):
                 pets[i,j] = abs(t1-t2)
-        distances = cdist(positions1, positions2, metric = 'euclidean')
-        smallDistances = (distances <= collisionDistanceThreshold)
+        if useBoundingPoly:
+            polygons1 = [pointsToShapely(obj1.getBoundingPolygon(i)) for i in obj1.getTimeInterval()]
+            polygons2 = [pointsToShapely(obj2.getBoundingPolygon(i)) for i in obj2.getTimeInterval()]
+            overlaps = []
+            for poly2 in polygons2:
+                prep(poly2)
+            for poly1 in polygons1:
+                prep(poly1)
+                overlaps.append([poly1.overlaps(poly2) for poly2 in polygons2])
+            smallDistances = array(overlaps)
+        elif collisionDistanceThreshold is not None:
+            positions1 = [p.astuple() for p in obj1.getPositions()]
+            positions2 = [p.astuple() for p in obj2.getPositions()]
+            distances = cdist(positions1, positions2, metric = 'euclidean')
+            smallDistances = (distances <= collisionDistanceThreshold)
+        else:
+            smallDistances = array([])
+        
         if smallDistances.any():
             smallPets = pets[smallDistances]
             petIdx = smallPets.argmin()
--- a/trafficintelligence/tests/moving.txt	Fri Mar 15 17:05:54 2024 -0400
+++ b/trafficintelligence/tests/moving.txt	Fri Mar 22 14:33:25 2024 -0400
@@ -261,6 +261,11 @@
 >>> MovingObject.computePET(o1, o2, 0.1)
 (15.0, 5, 20)
 
+>>> o1 = MovingObject(1, TimeInterval(0,10), features=[MovingObject.generate(1, Point(0., 3.), Point(1., 0.), TimeInterval(0,10)), MovingObject.generate(2, Point(2., 3.), Point(1., 0.), TimeInterval(0,10)), MovingObject.generate(3, Point(2., 4.), Point(1., 0.), TimeInterval(0,10)), MovingObject.generate(4, Point(0., 4.), Point(1., 0.), TimeInterval(0,10))])
+>>> o2 = MovingObject(2, TimeInterval(0,10), features=[MovingObject.generate(5, Point(6., 0.), Point(0., 1.), TimeInterval(0,10)), MovingObject.generate(6, Point(7., 0.), Point(0., 1.), TimeInterval(0,10)), MovingObject.generate(7, Point(7., 2.), Point(0., 1.), TimeInterval(0,10)), MovingObject.generate(8, Point(6., 2.), Point(0., 1.), TimeInterval(0,10))])
+>>> MovingObject.computePET(o1, o2, useBoundingPoly = True)
+(2.0, 5, 3)
+
 >>> t1 = CurvilinearTrajectory.generate(3, 1., 10, 'b')
 >>> t1.length()
 10