diff python/prediction.py @ 604:8ba4b8ad4c86

Motion Pattern Method
author MohamedGomaa
date Tue, 15 Jul 2014 13:25:15 -0400
parents 727e3c529519
children
line wrap: on
line diff
--- a/python/prediction.py	Tue Jul 08 15:22:30 2014 -0400
+++ b/python/prediction.py	Tue Jul 15 13:25:15 2014 -0400
@@ -4,6 +4,7 @@
 import moving
 import math
 import random
+import numpy as np
 
 class PredictedTrajectory:
     '''Class for predicted trajectories with lazy evaluation
@@ -47,6 +48,20 @@
     def getControl(self):
         return self.control
 
+def findNearestOrientation(initialPosition,prototypeTrajectory):
+	distances=[]
+	for p in prototypeTrajectory.positions:
+		distances.append(moving.Point.distanceNorm2(initialPosition, p))
+	t=np.argmin(distances)
+	return moving.NormAngle.fromPoint(prototypeTrajectory.velocities[t]).angle
+#todo: merge with previous function
+def findNearestInstant(initialPosition,prototypeTrajectory):
+	distances=[]
+	for p in prototypeTrajectory.positions:
+		distances.append(moving.Point.distanceNorm2(initialPosition, p))
+	t=np.argmin(distances)
+	return t
+	
 class PredictedTrajectoryPrototype(PredictedTrajectory):
     '''Predicted trajectory that follows a prototype trajectory
     The prototype is in the format of a moving.Trajectory: it could be
@@ -59,20 +74,39 @@
     (applying a constant ratio equal 
     to the ratio of the user instantaneous speed and the trajectory closest speed)'''
 
-    def __init__(self, initialPosition, initialVelocity, prototypeTrajectory, constantSpeed = True, probability = 1.):
+    def __init__(self, initialPosition, initialVelocity, prototypeTrajectory, constantSpeed = True, speedList=[], probability = 1.):
         self.prototypeTrajectory = prototypeTrajectory
         self.constantSpeed = constantSpeed
+        self.speedList = speedList
         self.probability = probability
         self.predictedPositions = {0: initialPosition}
-        self.predictedSpeedOrientations = {0: moving.NormAngle.fromPoint(initialVelocity)}
+        self.predictedSpeedOrientations = {0: moving.NormAngle(moving.NormAngle.fromPoint(initialVelocity).norm, findNearestOrientation(initialPosition,prototypeTrajectory))}#moving.NormAngle.fromPoint(initialVelocity)}
 
     def predictPosition(self, nTimeSteps):
         if nTimeSteps > 0 and not nTimeSteps in self.predictedPositions.keys():
-            if constantSpeed:
+            if self.constantSpeed:
                 # calculate cumulative distance
-                pass
+                speedNorm= self.predictedSpeedOrientations[0].norm #moving.NormAngle.fromPoint(initialVelocity).norm
+                anglePrototype = findNearestOrientation(self.predictedPositions[nTimeSteps-1],self.prototypeTrajectory)
+                self.predictedSpeedOrientations[nTimeSteps]= moving.NormAngle(speedNorm, anglePrototype)
+                self.predictedPositions[nTimeSteps],tmp= moving.predictPosition(self.predictedPositions[nTimeSteps-1], self.predictedSpeedOrientations[nTimeSteps-1], moving.NormAngle(0,0), None)
+                #pass
+            elif self.speedList!=[]:
+                pass			
             else: # see c++ code, calculate ratio
-                pass
+                speedNorm= self.predictedSpeedOrientations[0].norm
+                instant=findNearestInstant(self.predictedPositions[0],self.prototypeTrajectory)
+                prototypeSpeeds= self.prototypeTrajectory.getSpeeds()[instant:]
+                ratio=float(speedNorm)/prototypeSpeeds[0]
+                resampledSpeeds=[sp*ratio for sp in prototypeSpeeds]
+                anglePrototype = findNearestOrientation(self.predictedPositions[nTimeSteps-1],self.prototypeTrajectory)
+                if nTimeSteps<len(resampledSpeeds):
+                    self.predictedSpeedOrientations[nTimeSteps]= moving.NormAngle(resampledSpeeds[nTimeSteps], anglePrototype)
+                    self.predictedPositions[nTimeSteps],tmp= moving.predictPosition(self.predictedPositions[nTimeSteps-1], self.predictedSpeedOrientations[nTimeSteps-1], moving.NormAngle(0,0), None)				
+                else:
+                    self.predictedSpeedOrientations[nTimeSteps]= moving.NormAngle(resampledSpeeds[-1], anglePrototype)
+                    self.predictedPositions[nTimeSteps],tmp= moving.predictPosition(self.predictedPositions[nTimeSteps-1], self.predictedSpeedOrientations[nTimeSteps-1], moving.NormAngle(0,0), None)
+                # pass
         return self.predictedPositions[nTimeSteps]
 
 class PredictedTrajectoryRandomControl(PredictedTrajectory):
@@ -135,10 +169,14 @@
     def generatePredictedTrajectories(self, obj, instant):
         return []
 
-    def computeCrossingsCollisionsAtInstant(self, currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False):
+    def computeCrossingsCollisionsAtInstant(self, currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False,prototypeTrajectories1=None,prototypeTrajectories2=None):
         '''returns the lists of collision points and crossing zones'''
-        predictedTrajectories1 = self.generatePredictedTrajectories(obj1, currentInstant)
-        predictedTrajectories2 = self.generatePredictedTrajectories(obj2, currentInstant)
+        if prototypeTrajectories1==None: 
+            predictedTrajectories1 = self.generatePredictedTrajectories(obj1, currentInstant)
+            predictedTrajectories2 = self.generatePredictedTrajectories(obj2, currentInstant)		
+        else:
+            predictedTrajectories1 = self.generatePredictedTrajectories(obj1, currentInstant,prototypeTrajectories1)
+            predictedTrajectories2 = self.generatePredictedTrajectories(obj2, currentInstant,prototypeTrajectories2)
 
         collisionPoints = []
         crossingZones = []
@@ -161,7 +199,8 @@
                             #    cz = (et1.predictPosition(t1)+et2.predictPosition(t2)).multiply(0.5)
                             cz = moving.segmentIntersection(et1.predictPosition(t1), et1.predictPosition(t1+1), et2.predictPosition(t2), et2.predictPosition(t2+1))
                             if cz:
-                                crossingZones.append(SafetyPoint(cz, et1.probability*et2.probability, abs(t1-t2)))
+                                deltaV= (et1.predictPosition(t1)- et1.predictPosition(t1+1) - et2.predictPosition(t2)+ et2.predictPosition(t2+1)).norm2()
+                                crossingZones.append(SafetyPoint(cz, et1.probability*et2.probability, abs(t1-t2)-(float(collisionDistanceThreshold)/deltaV)))
                             t2 += 1
                         t1 += 1                        
 
@@ -182,7 +221,7 @@
 
         return collisionPoints, crossingZones
 
-    def computeCrossingsCollisions(self, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None):
+    def computeCrossingsCollisions(self, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None,prototypeTrajectories1=None,prototypeTrajectories2=None):
         '''Computes all crossing and collision points at each common instant for two road users. '''
         collisionPoints={}
         crossingZones={}
@@ -191,7 +230,7 @@
         else:
             commonTimeInterval = obj1.commonTimeInterval(obj2)
         for i in list(commonTimeInterval)[:-1]: # do not look at the 1 last position/velocities, often with errors
-            collisionPoints[i], crossingZones[i] = self.computeCrossingsCollisionsAtInstant(i, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ, debug)
+            collisionPoints[i], crossingZones[i] = self.computeCrossingsCollisionsAtInstant(i, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ, debug,prototypeTrajectories1,prototypeTrajectories2)
 
         return collisionPoints, crossingZones
 
@@ -412,10 +451,24 @@
 ####
 # Other Methods
 ####
-
-
-
-
+class prototypePredictionParameters(PredictionParameters):
+    def __init__(self, maxSpeed, nPredictedTrajectories,constantSpeed = True, speedList=[]):
+		#TODO speed profiles integration 
+        name = 'prototype'
+        PredictionParameters.__init__(self, name, maxSpeed)
+        self.nPredictedTrajectories = nPredictedTrajectories
+        #self.prototypeTrajectory = prototypeTrajectory
+        self.constantSpeed = constantSpeed
+        self.speedList = speedList
+        #self.probability=probability		
+        
+    def generatePredictedTrajectories(self, obj, instant,prototypeTrajectories):
+        predictedTrajectories = []
+        initialPosition = obj.getPositionAtInstant(instant)
+        initialVelocity = obj.getVelocityAtInstant(instant)
+        for prototypeTraj in prototypeTrajectories.keys():
+			predictedTrajectories.append(PredictedTrajectoryPrototype(initialPosition, initialVelocity, prototypeTraj, constantSpeed = self.constantSpeed, speedList=self.speedList, probability = prototypeTrajectories[prototypeTraj])) 
+        return predictedTrajectories
 
 if __name__ == "__main__":
     import doctest