changeset 359:619ae9a9a788

implemented prediction method at constant velocity with direct intersection computation
author Nicolas Saunier <nicolas.saunier@polymtl.ca>
date Thu, 11 Jul 2013 02:17:12 -0400
parents c41ff9f3c263
children 450dc0648aaa
files python/moving.py python/prediction.py scripts/safety-analysis.py
diffstat 3 files changed, 105 insertions(+), 43 deletions(-) [+]
line wrap: on
line diff
--- a/python/moving.py	Thu Jul 11 00:17:25 2013 -0400
+++ b/python/moving.py	Thu Jul 11 02:17:12 2013 -0400
@@ -30,6 +30,9 @@
     def empty(self):
         return self.first > self.last
 
+    def center(self):
+        return (self.first+self.last)/2.
+
     def length(self):
         '''Returns the length of the interval'''
         return float(max(0,self.last-self.first))
@@ -306,33 +309,39 @@
     def similar(f1, f2, maxDistance2, maxDeltavelocity2):
         return (f1.position-f2.position).norm2Squared()<maxDistance2 and (f1.velocity-f2.velocity).norm2Squared()<maxDeltavelocity2
 
+def intersection(p1, p2, dp1, dp2):
+    '''Returns the intersection point between the two lines 
+    defined by the respective vectors (dp) and origin points (p)'''
+    from numpy import matrix
+    from numpy.linalg import linalg
+    A = matrix([[dp1.y, -dp1.x],
+                [dp2.y, -dp2.x]])
+    B = matrix([[dp1.y*p1.x-dp1.x*p1.y],
+                [dp2.y*p2.x-dp2.x*p2.y]])
+    
+    if linalg.det(A) == 0:
+        return None
+    else:
+        intersection = linalg.solve(A,B)
+        return Point(intersection[0,0], intersection[1,0])
+
 def segmentIntersection(p1, p2, p3, p4):
     '''Returns the intersecting point of the segments [p1, p2] and [p3, p4], None otherwise'''
-    from numpy import matrix
-    from numpy.linalg import linalg, det
 
     if (Interval.intersection(Interval(p1.x,p2.x,True), Interval(p3.x,p4.x,True)).empty()) or (Interval.intersection(Interval(p1.y,p2.y,True), Interval(p3.y,p4.y,True)).empty()):
         return None
     else:
-        dp1 = p2-p1#[s1[0][1]-s1[0][0], s1[1][1]-s1[1][0]]
-        dp2 = p4-p3#[s2[0][1]-s2[0][0], s2[1][1]-s2[1][0]]
-
-        A = matrix([[dp1.y, -dp1.x],
-                    [dp2.y, -dp2.x]])
-        B = matrix([[dp1.y*p1.x-dp1.x*p1.y],
-                    [dp2.y*p3.x-dp2.x*p3.y]])
-
-        if linalg.det(A) == 0:#crossProduct(ds1, ds2) == 0:
+        dp1 = p2-p1
+        dp3 = p4-p3
+        inter = intersection(p1, p3, dp1, dp3)
+        if (inter != None 
+            and utils.inBetween(p1.x, p2.x, inter.x)
+            and utils.inBetween(p3.x, p4.x, inter.x)
+            and utils.inBetween(p1.y, p2.y, inter.y)
+            and utils.inBetween(p3.y, p4.y, inter.y)):
+            return inter
+        else:
             return None
-        else:
-            intersection = linalg.solve(A,B)
-            if (utils.inBetween(p1.x, p2.x, intersection[0,0])
-                and utils.inBetween(p3.x, p4.x, intersection[0,0])
-                and utils.inBetween(p1.y, p2.y, intersection[1,0])
-                and utils.inBetween(p3.y, p4.y, intersection[1,0])):
-                return Point(intersection[0,0], intersection[1,0])
-            else:
-                return None
 
 # TODO: implement a better algorithm for intersections of sets of segments http://en.wikipedia.org/wiki/Line_segment_intersection
 
--- a/python/prediction.py	Thu Jul 11 00:17:25 2013 -0400
+++ b/python/prediction.py	Thu Jul 11 02:17:12 2013 -0400
@@ -63,6 +63,28 @@
     def getControl(self):
         return moving.NormAngle(self.accelerationDistribution(),self.steeringDistribution())
 
+class SafetyPoint(moving.Point):
+    '''Can represent a collision point or crossing zone 
+    with respective safety indicator, TTC or pPET'''
+    def __init__(self, p, probability = 1., indicator = -1):
+        self.x = p.x
+        self.y = p.y
+        self.probability = probability
+        self.indicator = indicator
+
+    def __str__(self):
+        return '{0} {1} {2} {3}'.format(self.x, self.y, self.probability, self.indicator)
+
+    @staticmethod
+    def save(out, points, predictionInstant, objNum1, objNum2):
+        for p in points:
+            out.write('{0} {1} {2} {3}\n'.format(objNum1, objNum2, predictionInstant, p))
+
+    @staticmethod
+    def computeExpectedIndicator(points):
+        from numpy import sum
+        return sum([p.indicator*p.probability for p in points])/sum([p.probability for p in points])
+
 def computeCollisionTime(predictedTrajectory1, predictedTrajectory2, collisionDistanceThreshold, timeHorizon):
     '''Computes the first instant at which two predicted trajectories are within some distance threshold'''
     t = 1
@@ -86,9 +108,7 @@
         return []
 
     def computeCrossingsCollisionsAtInstant(self, currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False):
-        '''returns the lists of collision points and crossing zones
-
-        Check: Predicting all the points together, as if they represent the whole vehicle'''
+        '''returns the lists of collision points and crossing zones'''
         predictedTrajectories1 = self.generatePredictedTrajectories(obj1, currentInstant)
         predictedTrajectories2 = self.generatePredictedTrajectories(obj2, currentInstant)
 
@@ -288,27 +308,58 @@
                                                                          maxSpeed = self.maxSpeed))
         return predictedTrajectories
 
-class SafetyPoint(moving.Point):
-    '''Can represent a collision point or crossing zone 
-    with respective safety indicator, TTC or pPET'''
-    def __init__(self, p, probability = 1., indicator = -1):
-        self.x = p.x
-        self.y = p.y
-        self.probability = probability
-        self.indicator = indicator
+
+class CVDirectPredictionParameters(PredictionParameters):
+    '''Prediction parameters of prediction at constant velocity
+    using direct computation of the intersecting point'''
+    
+    def __init__(self, maxSpeed):
+        PredictionParameters.__init__(self, 'constant velocity (direct computation)', maxSpeed)
+
+    def computeCrossingsCollisionsAtInstant(self, currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False):
+        collisionPoints = []
+        crossingZones = []
+
+        p1 = obj1.getPositionAtInstant(currentInstant)
+        p2 = obj2.getPositionAtInstant(currentInstant)
+        if (p1-p2).norm2() <= collisionDistanceThreshold:
+            collisionPoints = [SafetyPoint((p1+p1).multiply(0.5), 1., 0.)]
+        else:
+            v1 = obj1.getVelocityAtInstant(currentInstant)
+            v2 = obj2.getVelocityAtInstant(currentInstant)
+            intersection = moving.intersection(p1, p2, v1, v2)
 
-    def __str__(self):
-        return '{0} {1} {2} {3}'.format(self.x, self.y, self.probability, self.indicator)
+            if intersection != None:
+                dp1 = intersection-p1
+                dp2 = intersection-p2
+                if moving.Point.dot(dp1, v1) > 0 and moving.Point.dot(dp2, v2) > 0: # if the road users are moving towards the intersection
+                    dist1 = dp1.norm2()
+                    dist2 = dp2.norm2()
+                    s1 = v1.norm2()
+                    s2 = v2.norm2()
+                    halfCollisionDistanceThreshold = collisionDistanceThreshold/2.
+                    timeInterval1 = moving.TimeInterval(max(0,dist1-halfCollisionDistanceThreshold)/s1, (dist1+halfCollisionDistanceThreshold)/s1)
+                    timeInterval2 = moving.TimeInterval(max(0,dist2-halfCollisionDistanceThreshold)/s2, (dist2+halfCollisionDistanceThreshold)/s2)
+                    collisionTimeInterval = moving.TimeInterval.intersection(timeInterval1, timeInterval2)
+                    if not collisionTimeInterval.empty():
+                        collisionPoints = [SafetyPoint(intersection, 1., collisionTimeInterval.center())]
+                    else:
+                        crossingZones = [SafetyPoint(intersection, 1., timeInterval1.distance(timeInterval2))]
+    
+        if debug and intersection!= None:
+            from matplotlib.pyplot import plot, figure, axis, title
+            figure()
+            plot([p1.x, intersection.x], [p1.y, intersection.y], 'r')
+            plot([p2.x, intersection.x], [p2.y, intersection.y], 'b')
+            intersection.draw()            
+            obj1.draw('r')
+            obj2.draw('b')
+            title('instant {0}'.format(currentInstant))
+            axis('equal')
 
-    @staticmethod
-    def save(out, points, predictionInstant, objNum1, objNum2):
-        for p in points:
-            out.write('{0} {1} {2} {3}\n'.format(objNum1, objNum2, predictionInstant, p))
+        return collisionPoints, crossingZones
 
-    @staticmethod
-    def computeExpectedIndicator(points):
-        from numpy import sum
-        return sum([p.indicator*p.probability for p in points])/sum([p.probability for p in points])
+
 
 
 ####
--- a/scripts/safety-analysis.py	Thu Jul 11 00:17:25 2013 -0400
+++ b/scripts/safety-analysis.py	Thu Jul 11 02:17:12 2013 -0400
@@ -12,7 +12,7 @@
 
 parser = argparse.ArgumentParser(description='The program processes indicators for all pairs of road users in the scene')
 parser.add_argument('--cfg', dest = 'configFilename', help = 'name of the configuration file')
-parser.add_argument('--prediction-method', dest = 'predictionMethod', help = 'prediction method (constant velocity, normal adaptation, point set prediction)', choices = ['cv', 'na', 'ps'])
+parser.add_argument('--prediction-method', dest = 'predictionMethod', help = 'prediction method (constant velocity (vector computation), constant velocity, normal adaptation, point set prediction)', choices = ['cvd', 'cv', 'na', 'ps'])
 parser.add_argument('--display-cp', dest = 'displayCollisionPoints', help = 'display collision points')
 args = parser.parse_args()
 
@@ -25,7 +25,9 @@
 else:
     predictionMethod = params.predictionMethod
 
-if predictionMethod == 'cv':
+if predictionMethod == 'cvd':
+    predictionParameters = prediction.CVDirectPredictionParameters(params.maxPredictedSpeed)
+elif predictionMethod == 'cv':
     predictionParameters = prediction.ConstantPredictionParameters(params.maxPredictedSpeed)
 elif predictionMethod == 'na':
     predictionParameters = prediction.NormalAdaptationPredictionParameters(params.maxPredictedSpeed,