comparison python/prediction.py @ 604:8ba4b8ad4c86

Motion Pattern Method
author MohamedGomaa
date Tue, 15 Jul 2014 13:25:15 -0400
parents 727e3c529519
children
comparison
equal deleted inserted replaced
548:e6ab4caf359c 604:8ba4b8ad4c86
2 '''Library for motion prediction methods''' 2 '''Library for motion prediction methods'''
3 3
4 import moving 4 import moving
5 import math 5 import math
6 import random 6 import random
7 import numpy as np
7 8
8 class PredictedTrajectory: 9 class PredictedTrajectory:
9 '''Class for predicted trajectories with lazy evaluation 10 '''Class for predicted trajectories with lazy evaluation
10 if the predicted position has not been already computed, compute it 11 if the predicted position has not been already computed, compute it
11 12
45 self.predictedSpeedOrientations = {0: moving.NormAngle.fromPoint(initialVelocity)} 46 self.predictedSpeedOrientations = {0: moving.NormAngle.fromPoint(initialVelocity)}
46 47
47 def getControl(self): 48 def getControl(self):
48 return self.control 49 return self.control
49 50
51 def findNearestOrientation(initialPosition,prototypeTrajectory):
52 distances=[]
53 for p in prototypeTrajectory.positions:
54 distances.append(moving.Point.distanceNorm2(initialPosition, p))
55 t=np.argmin(distances)
56 return moving.NormAngle.fromPoint(prototypeTrajectory.velocities[t]).angle
57 #todo: merge with previous function
58 def findNearestInstant(initialPosition,prototypeTrajectory):
59 distances=[]
60 for p in prototypeTrajectory.positions:
61 distances.append(moving.Point.distanceNorm2(initialPosition, p))
62 t=np.argmin(distances)
63 return t
64
50 class PredictedTrajectoryPrototype(PredictedTrajectory): 65 class PredictedTrajectoryPrototype(PredictedTrajectory):
51 '''Predicted trajectory that follows a prototype trajectory 66 '''Predicted trajectory that follows a prototype trajectory
52 The prototype is in the format of a moving.Trajectory: it could be 67 The prototype is in the format of a moving.Trajectory: it could be
53 1. an observed trajectory (extracted from video) 68 1. an observed trajectory (extracted from video)
54 2. a generic polyline (eg the road centerline) that a vehicle is supposed to follow 69 2. a generic polyline (eg the road centerline) that a vehicle is supposed to follow
57 1. at constant speed (the instantaneous user speed) 72 1. at constant speed (the instantaneous user speed)
58 2. following the trajectory path, at the speed of the user 73 2. following the trajectory path, at the speed of the user
59 (applying a constant ratio equal 74 (applying a constant ratio equal
60 to the ratio of the user instantaneous speed and the trajectory closest speed)''' 75 to the ratio of the user instantaneous speed and the trajectory closest speed)'''
61 76
62 def __init__(self, initialPosition, initialVelocity, prototypeTrajectory, constantSpeed = True, probability = 1.): 77 def __init__(self, initialPosition, initialVelocity, prototypeTrajectory, constantSpeed = True, speedList=[], probability = 1.):
63 self.prototypeTrajectory = prototypeTrajectory 78 self.prototypeTrajectory = prototypeTrajectory
64 self.constantSpeed = constantSpeed 79 self.constantSpeed = constantSpeed
80 self.speedList = speedList
65 self.probability = probability 81 self.probability = probability
66 self.predictedPositions = {0: initialPosition} 82 self.predictedPositions = {0: initialPosition}
67 self.predictedSpeedOrientations = {0: moving.NormAngle.fromPoint(initialVelocity)} 83 self.predictedSpeedOrientations = {0: moving.NormAngle(moving.NormAngle.fromPoint(initialVelocity).norm, findNearestOrientation(initialPosition,prototypeTrajectory))}#moving.NormAngle.fromPoint(initialVelocity)}
68 84
69 def predictPosition(self, nTimeSteps): 85 def predictPosition(self, nTimeSteps):
70 if nTimeSteps > 0 and not nTimeSteps in self.predictedPositions.keys(): 86 if nTimeSteps > 0 and not nTimeSteps in self.predictedPositions.keys():
71 if constantSpeed: 87 if self.constantSpeed:
72 # calculate cumulative distance 88 # calculate cumulative distance
73 pass 89 speedNorm= self.predictedSpeedOrientations[0].norm #moving.NormAngle.fromPoint(initialVelocity).norm
90 anglePrototype = findNearestOrientation(self.predictedPositions[nTimeSteps-1],self.prototypeTrajectory)
91 self.predictedSpeedOrientations[nTimeSteps]= moving.NormAngle(speedNorm, anglePrototype)
92 self.predictedPositions[nTimeSteps],tmp= moving.predictPosition(self.predictedPositions[nTimeSteps-1], self.predictedSpeedOrientations[nTimeSteps-1], moving.NormAngle(0,0), None)
93 #pass
94 elif self.speedList!=[]:
95 pass
74 else: # see c++ code, calculate ratio 96 else: # see c++ code, calculate ratio
75 pass 97 speedNorm= self.predictedSpeedOrientations[0].norm
98 instant=findNearestInstant(self.predictedPositions[0],self.prototypeTrajectory)
99 prototypeSpeeds= self.prototypeTrajectory.getSpeeds()[instant:]
100 ratio=float(speedNorm)/prototypeSpeeds[0]
101 resampledSpeeds=[sp*ratio for sp in prototypeSpeeds]
102 anglePrototype = findNearestOrientation(self.predictedPositions[nTimeSteps-1],self.prototypeTrajectory)
103 if nTimeSteps<len(resampledSpeeds):
104 self.predictedSpeedOrientations[nTimeSteps]= moving.NormAngle(resampledSpeeds[nTimeSteps], anglePrototype)
105 self.predictedPositions[nTimeSteps],tmp= moving.predictPosition(self.predictedPositions[nTimeSteps-1], self.predictedSpeedOrientations[nTimeSteps-1], moving.NormAngle(0,0), None)
106 else:
107 self.predictedSpeedOrientations[nTimeSteps]= moving.NormAngle(resampledSpeeds[-1], anglePrototype)
108 self.predictedPositions[nTimeSteps],tmp= moving.predictPosition(self.predictedPositions[nTimeSteps-1], self.predictedSpeedOrientations[nTimeSteps-1], moving.NormAngle(0,0), None)
109 # pass
76 return self.predictedPositions[nTimeSteps] 110 return self.predictedPositions[nTimeSteps]
77 111
78 class PredictedTrajectoryRandomControl(PredictedTrajectory): 112 class PredictedTrajectoryRandomControl(PredictedTrajectory):
79 '''Random vehicle control: suitable for normal adaptation''' 113 '''Random vehicle control: suitable for normal adaptation'''
80 def __init__(self, initialPosition, initialVelocity, accelerationDistribution, steeringDistribution, probability = 1., maxSpeed = None): 114 def __init__(self, initialPosition, initialVelocity, accelerationDistribution, steeringDistribution, probability = 1., maxSpeed = None):
133 return '{0} {1}'.format(self.name, self.maxSpeed) 167 return '{0} {1}'.format(self.name, self.maxSpeed)
134 168
135 def generatePredictedTrajectories(self, obj, instant): 169 def generatePredictedTrajectories(self, obj, instant):
136 return [] 170 return []
137 171
138 def computeCrossingsCollisionsAtInstant(self, currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False): 172 def computeCrossingsCollisionsAtInstant(self, currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False,prototypeTrajectories1=None,prototypeTrajectories2=None):
139 '''returns the lists of collision points and crossing zones''' 173 '''returns the lists of collision points and crossing zones'''
140 predictedTrajectories1 = self.generatePredictedTrajectories(obj1, currentInstant) 174 if prototypeTrajectories1==None:
141 predictedTrajectories2 = self.generatePredictedTrajectories(obj2, currentInstant) 175 predictedTrajectories1 = self.generatePredictedTrajectories(obj1, currentInstant)
176 predictedTrajectories2 = self.generatePredictedTrajectories(obj2, currentInstant)
177 else:
178 predictedTrajectories1 = self.generatePredictedTrajectories(obj1, currentInstant,prototypeTrajectories1)
179 predictedTrajectories2 = self.generatePredictedTrajectories(obj2, currentInstant,prototypeTrajectories2)
142 180
143 collisionPoints = [] 181 collisionPoints = []
144 crossingZones = [] 182 crossingZones = []
145 for et1 in predictedTrajectories1: 183 for et1 in predictedTrajectories1:
146 for et2 in predictedTrajectories2: 184 for et2 in predictedTrajectories2:
159 while not cz and t2 < timeHorizon: 197 while not cz and t2 < timeHorizon:
160 #if (et1.predictPosition(t1)-et2.predictPosition(t2)).norm2() < collisionDistanceThreshold: 198 #if (et1.predictPosition(t1)-et2.predictPosition(t2)).norm2() < collisionDistanceThreshold:
161 # cz = (et1.predictPosition(t1)+et2.predictPosition(t2)).multiply(0.5) 199 # cz = (et1.predictPosition(t1)+et2.predictPosition(t2)).multiply(0.5)
162 cz = moving.segmentIntersection(et1.predictPosition(t1), et1.predictPosition(t1+1), et2.predictPosition(t2), et2.predictPosition(t2+1)) 200 cz = moving.segmentIntersection(et1.predictPosition(t1), et1.predictPosition(t1+1), et2.predictPosition(t2), et2.predictPosition(t2+1))
163 if cz: 201 if cz:
164 crossingZones.append(SafetyPoint(cz, et1.probability*et2.probability, abs(t1-t2))) 202 deltaV= (et1.predictPosition(t1)- et1.predictPosition(t1+1) - et2.predictPosition(t2)+ et2.predictPosition(t2+1)).norm2()
203 crossingZones.append(SafetyPoint(cz, et1.probability*et2.probability, abs(t1-t2)-(float(collisionDistanceThreshold)/deltaV)))
165 t2 += 1 204 t2 += 1
166 t1 += 1 205 t1 += 1
167 206
168 if debug: 207 if debug:
169 from matplotlib.pyplot import figure, axis, title 208 from matplotlib.pyplot import figure, axis, title
180 title('instant {0}'.format(currentInstant)) 219 title('instant {0}'.format(currentInstant))
181 axis('equal') 220 axis('equal')
182 221
183 return collisionPoints, crossingZones 222 return collisionPoints, crossingZones
184 223
185 def computeCrossingsCollisions(self, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None): 224 def computeCrossingsCollisions(self, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None,prototypeTrajectories1=None,prototypeTrajectories2=None):
186 '''Computes all crossing and collision points at each common instant for two road users. ''' 225 '''Computes all crossing and collision points at each common instant for two road users. '''
187 collisionPoints={} 226 collisionPoints={}
188 crossingZones={} 227 crossingZones={}
189 if timeInterval: 228 if timeInterval:
190 commonTimeInterval = timeInterval 229 commonTimeInterval = timeInterval
191 else: 230 else:
192 commonTimeInterval = obj1.commonTimeInterval(obj2) 231 commonTimeInterval = obj1.commonTimeInterval(obj2)
193 for i in list(commonTimeInterval)[:-1]: # do not look at the 1 last position/velocities, often with errors 232 for i in list(commonTimeInterval)[:-1]: # do not look at the 1 last position/velocities, often with errors
194 collisionPoints[i], crossingZones[i] = self.computeCrossingsCollisionsAtInstant(i, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ, debug) 233 collisionPoints[i], crossingZones[i] = self.computeCrossingsCollisionsAtInstant(i, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ, debug,prototypeTrajectories1,prototypeTrajectories2)
195 234
196 return collisionPoints, crossingZones 235 return collisionPoints, crossingZones
197 236
198 def computeCollisionProbability(self, obj1, obj2, collisionDistanceThreshold, timeHorizon, debug = False, timeInterval = None): 237 def computeCollisionProbability(self, obj1, obj2, collisionDistanceThreshold, timeHorizon, debug = False, timeInterval = None):
199 '''Computes only collision probabilities 238 '''Computes only collision probabilities
410 return [],[] 449 return [],[]
411 450
412 #### 451 ####
413 # Other Methods 452 # Other Methods
414 #### 453 ####
415 454 class prototypePredictionParameters(PredictionParameters):
416 455 def __init__(self, maxSpeed, nPredictedTrajectories,constantSpeed = True, speedList=[]):
417 456 #TODO speed profiles integration
418 457 name = 'prototype'
458 PredictionParameters.__init__(self, name, maxSpeed)
459 self.nPredictedTrajectories = nPredictedTrajectories
460 #self.prototypeTrajectory = prototypeTrajectory
461 self.constantSpeed = constantSpeed
462 self.speedList = speedList
463 #self.probability=probability
464
465 def generatePredictedTrajectories(self, obj, instant,prototypeTrajectories):
466 predictedTrajectories = []
467 initialPosition = obj.getPositionAtInstant(instant)
468 initialVelocity = obj.getVelocityAtInstant(instant)
469 for prototypeTraj in prototypeTrajectories.keys():
470 predictedTrajectories.append(PredictedTrajectoryPrototype(initialPosition, initialVelocity, prototypeTraj, constantSpeed = self.constantSpeed, speedList=self.speedList, probability = prototypeTrajectories[prototypeTraj]))
471 return predictedTrajectories
419 472
420 if __name__ == "__main__": 473 if __name__ == "__main__":
421 import doctest 474 import doctest
422 import unittest 475 import unittest
423 suite = doctest.DocFileSuite('tests/prediction.txt') 476 suite = doctest.DocFileSuite('tests/prediction.txt')