diff python/prediction.py @ 358:c41ff9f3c263

moved current method for collision points and crossing zones computation into prediction parameters (put expectedindicator in SafetyPoint)
author Nicolas Saunier <nicolas.saunier@polymtl.ca>
date Thu, 11 Jul 2013 00:17:25 -0400
parents e5fe0e6d48a1
children 619ae9a9a788
line wrap: on
line diff
--- a/python/prediction.py	Thu Jul 11 00:07:47 2013 -0400
+++ b/python/prediction.py	Thu Jul 11 00:17:25 2013 -0400
@@ -63,6 +63,17 @@
     def getControl(self):
         return moving.NormAngle(self.accelerationDistribution(),self.steeringDistribution())
 
+def computeCollisionTime(predictedTrajectory1, predictedTrajectory2, collisionDistanceThreshold, timeHorizon):
+    '''Computes the first instant at which two predicted trajectories are within some distance threshold'''
+    t = 1
+    p1 = predictedTrajectory1.predictPosition(t)
+    p2 = predictedTrajectory2.predictPosition(t)
+    while t <= timeHorizon and (p1-p2).norm2() > collisionDistanceThreshold:
+        p1 = predictedTrajectory1.predictPosition(t)
+        p2 = predictedTrajectory2.predictPosition(t)
+        t += 1
+    return t, p1, p2
+
 class PredictionParameters:
     def __init__(self, name, maxSpeed):
         self.name = name
@@ -71,6 +82,110 @@
     def __str__(self):
         return '{0} {1}'.format(self.name, self.maxSpeed)
 
+    def generatePredictedTrajectories(self, obj, instant):
+        return []
+
+    def computeCrossingsCollisionsAtInstant(self, currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False):
+        '''returns the lists of collision points and crossing zones
+
+        Check: Predicting all the points together, as if they represent the whole vehicle'''
+        predictedTrajectories1 = self.generatePredictedTrajectories(obj1, currentInstant)
+        predictedTrajectories2 = self.generatePredictedTrajectories(obj2, currentInstant)
+
+        collisionPoints = []
+        crossingZones = []
+        for et1 in predictedTrajectories1:
+            for et2 in predictedTrajectories2:
+                t, p1, p2 = computeCollisionTime(et1, et2, collisionDistanceThreshold, timeHorizon)
+
+                if t <= timeHorizon:
+                    collisionPoints.append(SafetyPoint((p1+p2).multiply(0.5), et1.probability*et2.probability, t))
+                elif computeCZ: # check if there is a crossing zone
+                    # TODO? zone should be around the points at which the traj are the closest
+                    # look for CZ at different times, otherwise it would be a collision
+                    # an approximation would be to look for close points at different times, ie the complementary of collision points
+                    cz = None
+                    t1 = 0
+                    while not cz and t1 < timeHorizon: # t1 <= timeHorizon-1
+                        t2 = 0
+                        while not cz and t2 < timeHorizon:
+                            #if (et1.predictPosition(t1)-et2.predictPosition(t2)).norm2() < collisionDistanceThreshold:
+                            #    cz = (et1.predictPosition(t1)+et2.predictPosition(t2)).multiply(0.5)
+                            cz = moving.segmentIntersection(et1.predictPosition(t1), et1.predictPosition(t1+1), et2.predictPosition(t2), et2.predictPosition(t2+1))
+                            if cz:
+                                crossingZones.append(SafetyPoint(cz, et1.probability*et2.probability, abs(t1-t2)))
+                            t2 += 1
+                        t1 += 1                        
+
+        if debug:
+            from matplotlib.pyplot import figure, axis, title
+            figure()
+            for et in predictedTrajectories1:
+                et.predictPosition(timeHorizon)
+                et.draw('rx')
+
+            for et in predictedTrajectories2:
+                et.predictPosition(timeHorizon)
+                et.draw('bx')
+            obj1.draw('r')
+            obj2.draw('b')
+            title('instant {0}'.format(currentInstant))
+            axis('equal')
+
+        return collisionPoints, crossingZones
+
+    def computeCrossingsCollisions(self, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None):
+        '''Computes all crossing and collision points at each common instant for two road users. '''
+        collisionPoints={}
+        crossingZones={}
+        if timeInterval:
+            commonTimeInterval = timeInterval
+        else:
+            commonTimeInterval = obj1.commonTimeInterval(obj2)
+        for i in list(commonTimeInterval)[:-1]: # do not look at the 1 last position/velocities, often with errors
+            collisionPoints[i], crossingZones[i] = self.computeCrossingsCollisionsAtInstant(i, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ, debug)
+
+        return collisionPoints, crossingZones
+
+    def computeCollisionProbability(self, obj1, obj2, collisionDistanceThreshold, timeHorizon, debug = False, timeInterval = None):
+        '''Computes only collision probabilities
+        Returns for each instant the collision probability and number of samples drawn'''
+        collisionProbabilities = {}
+        if timeInterval:
+            commonTimeInterval = timeInterval
+        else:
+            commonTimeInterval = obj1.commonTimeInterval(obj2)
+        for i in list(commonTimeInterval)[:-1]:
+            nCollisions = 0
+            print(obj1.num, obj2.num, i)
+            predictedTrajectories1 = self.generatePredictedTrajectories(obj1, i)
+            predictedTrajectories2 = self.generatePredictedTrajectories(obj2, i)
+            for et1 in predictedTrajectories1:
+                for et2 in predictedTrajectories2:
+                    t, p1, p2 = computeCollisionTime(et1, et2, collisionDistanceThreshold, timeHorizon)
+                    if t <= timeHorizon:
+                        nCollisions += 1
+            # take into account probabilities ??
+            nSamples = float(len(predictedTrajectories1)*len(predictedTrajectories2))
+            collisionProbabilities[i] = [nSamples, float(nCollisions)/nSamples]
+
+            if debug:
+                from matplotlib.pyplot import figure, axis, title
+                figure()
+                for et in predictedTrajectories1:
+                    et.predictPosition(timeHorizon)
+                    et.draw('rx')
+
+                for et in predictedTrajectories2:
+                    et.predictPosition(timeHorizon)
+                    et.draw('bx')
+                obj1.draw('r')
+                obj2.draw('b')
+                title('instant {0}'.format(i))
+                axis('equal')
+
+        return collisionProbabilities
+
 class ConstantPredictionParameters(PredictionParameters):
     def __init__(self, maxSpeed):
         PredictionParameters.__init__(self, 'constant velocity', maxSpeed)
@@ -190,121 +305,11 @@
         for p in points:
             out.write('{0} {1} {2} {3}\n'.format(objNum1, objNum2, predictionInstant, p))
 
-def computeExpectedIndicator(points):
-    from numpy import sum
-    return sum([p.indicator*p.probability for p in points])/sum([p.probability for p in points])
-
-def computeCollisionTime(predictedTrajectory1, predictedTrajectory2, collisionDistanceThreshold, timeHorizon):
-    '''Computes the first instant at which two predicted trajectories are within some distance threshold'''
-    t = 1
-    p1 = predictedTrajectory1.predictPosition(t)
-    p2 = predictedTrajectory2.predictPosition(t)
-    while t <= timeHorizon and (p1-p2).norm2() > collisionDistanceThreshold:
-        p1 = predictedTrajectory1.predictPosition(t)
-        p2 = predictedTrajectory2.predictPosition(t)
-        t += 1
-    return t, p1, p2
-
-def computeCrossingsCollisionsAtInstant(currentInstant, obj1, obj2, predictionParameters, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False):
-    '''returns the lists of collision points and crossing zones
-    
-    Check: Predicting all the points together, as if they represent the whole vehicle'''
-    predictedTrajectories1 = predictionParameters.generatePredictedTrajectories(obj1, currentInstant)
-    predictedTrajectories2 = predictionParameters.generatePredictedTrajectories(obj2, currentInstant)
-
-    collisionPoints = []
-    crossingZones = []
-    for et1 in predictedTrajectories1:
-        for et2 in predictedTrajectories2:
-            t, p1, p2 = computeCollisionTime(et1, et2, collisionDistanceThreshold, timeHorizon)
-            
-            if t <= timeHorizon:
-                collisionPoints.append(SafetyPoint((p1+p2).multiply(0.5), et1.probability*et2.probability, t))
-            elif computeCZ: # check if there is a crossing zone
-                # TODO? zone should be around the points at which the traj are the closest
-                # look for CZ at different times, otherwise it would be a collision
-                # an approximation would be to look for close points at different times, ie the complementary of collision points
-                cz = None
-                t1 = 0
-                while not cz and t1 < timeHorizon: # t1 <= timeHorizon-1
-                    t2 = 0
-                    while not cz and t2 < timeHorizon:
-                        #if (et1.predictPosition(t1)-et2.predictPosition(t2)).norm2() < collisionDistanceThreshold:
-                        #    cz = (et1.predictPosition(t1)+et2.predictPosition(t2)).multiply(0.5)
-                        cz = moving.segmentIntersection(et1.predictPosition(t1), et1.predictPosition(t1+1), et2.predictPosition(t2), et2.predictPosition(t2+1))
-                        if cz:
-                            crossingZones.append(SafetyPoint(cz, et1.probability*et2.probability, abs(t1-t2)))
-                        t2 += 1
-                    t1 += 1                        
-
-    if debug:
-        from matplotlib.pyplot import figure, axis, title
-        figure()
-        for et in predictedTrajectories1:
-            et.predictPosition(timeHorizon)
-            et.draw('rx')
+    @staticmethod
+    def computeExpectedIndicator(points):
+        from numpy import sum
+        return sum([p.indicator*p.probability for p in points])/sum([p.probability for p in points])
 
-        for et in predictedTrajectories2:
-            et.predictPosition(timeHorizon)
-            et.draw('bx')
-        obj1.draw('r')
-        obj2.draw('b')
-        title('instant {0}'.format(currentInstant))
-        axis('equal')
-
-    return collisionPoints, crossingZones
-    
-def computeCrossingsCollisions(obj1, obj2, predictionParameters, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None):
-    '''Computes all crossing and collision points at each common instant for two road users. '''
-    collisionPoints={}
-    crossingZones={}
-    if timeInterval:
-        commonTimeInterval = timeInterval
-    else:
-        commonTimeInterval = obj1.commonTimeInterval(obj2)
-    for i in list(commonTimeInterval)[:-1]: # do not look at the 1 last position/velocities, often with errors
-        collisionPoints[i], crossingZones[i] = computeCrossingsCollisionsAtInstant(i, obj1, obj2, predictionParameters, collisionDistanceThreshold, timeHorizon, computeCZ, debug)
-
-    return collisionPoints, crossingZones
- 
-def computeCollisionProbability(obj1, obj2, predictionParameters, collisionDistanceThreshold, timeHorizon, debug = False, timeInterval = None):
-    '''Computes only collision probabilities
-    Returns for each instant the collision probability and number of samples drawn'''
-    collisionProbabilities = {}
-    if timeInterval:
-        commonTimeInterval = timeInterval
-    else:
-        commonTimeInterval = obj1.commonTimeInterval(obj2)
-    for i in list(commonTimeInterval)[:-1]:
-        nCollisions = 0
-        print(obj1.num, obj2.num, i)
-        predictedTrajectories1 = predictionParameters.generatePredictedTrajectories(obj1, i)
-        predictedTrajectories2 = predictionParameters.generatePredictedTrajectories(obj2, i)
-        for et1 in predictedTrajectories1:
-            for et2 in predictedTrajectories2:
-                t, p1, p2 = computeCollisionTime(et1, et2, collisionDistanceThreshold, timeHorizon)
-                if t <= timeHorizon:
-                    nCollisions += 1
-        # take into account probabilities ??
-        nSamples = float(len(predictedTrajectories1)*len(predictedTrajectories2))
-        collisionProbabilities[i] = [nSamples, float(nCollisions)/nSamples]
-
-        if debug:
-            from matplotlib.pyplot import figure, axis, title
-            figure()
-            for et in predictedTrajectories1:
-                et.predictPosition(timeHorizon)
-                et.draw('rx')
-                
-            for et in predictedTrajectories2:
-                et.predictPosition(timeHorizon)
-                et.draw('bx')
-            obj1.draw('r')
-            obj2.draw('b')
-            title('instant {0}'.format(i))
-            axis('equal')
-
-    return collisionProbabilities
 
 ####
 # Other Methods