Mercurial Hosting > traffic-intelligence
comparison python/prediction.py @ 604:8ba4b8ad4c86
Motion Pattern Method
author | MohamedGomaa |
---|---|
date | Tue, 15 Jul 2014 13:25:15 -0400 |
parents | 727e3c529519 |
children |
comparison
equal
deleted
inserted
replaced
548:e6ab4caf359c | 604:8ba4b8ad4c86 |
---|---|
2 '''Library for motion prediction methods''' | 2 '''Library for motion prediction methods''' |
3 | 3 |
4 import moving | 4 import moving |
5 import math | 5 import math |
6 import random | 6 import random |
7 import numpy as np | |
7 | 8 |
8 class PredictedTrajectory: | 9 class PredictedTrajectory: |
9 '''Class for predicted trajectories with lazy evaluation | 10 '''Class for predicted trajectories with lazy evaluation |
10 if the predicted position has not been already computed, compute it | 11 if the predicted position has not been already computed, compute it |
11 | 12 |
45 self.predictedSpeedOrientations = {0: moving.NormAngle.fromPoint(initialVelocity)} | 46 self.predictedSpeedOrientations = {0: moving.NormAngle.fromPoint(initialVelocity)} |
46 | 47 |
47 def getControl(self): | 48 def getControl(self): |
48 return self.control | 49 return self.control |
49 | 50 |
51 def findNearestOrientation(initialPosition,prototypeTrajectory): | |
52 distances=[] | |
53 for p in prototypeTrajectory.positions: | |
54 distances.append(moving.Point.distanceNorm2(initialPosition, p)) | |
55 t=np.argmin(distances) | |
56 return moving.NormAngle.fromPoint(prototypeTrajectory.velocities[t]).angle | |
57 #todo: merge with previous function | |
58 def findNearestInstant(initialPosition,prototypeTrajectory): | |
59 distances=[] | |
60 for p in prototypeTrajectory.positions: | |
61 distances.append(moving.Point.distanceNorm2(initialPosition, p)) | |
62 t=np.argmin(distances) | |
63 return t | |
64 | |
50 class PredictedTrajectoryPrototype(PredictedTrajectory): | 65 class PredictedTrajectoryPrototype(PredictedTrajectory): |
51 '''Predicted trajectory that follows a prototype trajectory | 66 '''Predicted trajectory that follows a prototype trajectory |
52 The prototype is in the format of a moving.Trajectory: it could be | 67 The prototype is in the format of a moving.Trajectory: it could be |
53 1. an observed trajectory (extracted from video) | 68 1. an observed trajectory (extracted from video) |
54 2. a generic polyline (eg the road centerline) that a vehicle is supposed to follow | 69 2. a generic polyline (eg the road centerline) that a vehicle is supposed to follow |
57 1. at constant speed (the instantaneous user speed) | 72 1. at constant speed (the instantaneous user speed) |
58 2. following the trajectory path, at the speed of the user | 73 2. following the trajectory path, at the speed of the user |
59 (applying a constant ratio equal | 74 (applying a constant ratio equal |
60 to the ratio of the user instantaneous speed and the trajectory closest speed)''' | 75 to the ratio of the user instantaneous speed and the trajectory closest speed)''' |
61 | 76 |
62 def __init__(self, initialPosition, initialVelocity, prototypeTrajectory, constantSpeed = True, probability = 1.): | 77 def __init__(self, initialPosition, initialVelocity, prototypeTrajectory, constantSpeed = True, speedList=[], probability = 1.): |
63 self.prototypeTrajectory = prototypeTrajectory | 78 self.prototypeTrajectory = prototypeTrajectory |
64 self.constantSpeed = constantSpeed | 79 self.constantSpeed = constantSpeed |
80 self.speedList = speedList | |
65 self.probability = probability | 81 self.probability = probability |
66 self.predictedPositions = {0: initialPosition} | 82 self.predictedPositions = {0: initialPosition} |
67 self.predictedSpeedOrientations = {0: moving.NormAngle.fromPoint(initialVelocity)} | 83 self.predictedSpeedOrientations = {0: moving.NormAngle(moving.NormAngle.fromPoint(initialVelocity).norm, findNearestOrientation(initialPosition,prototypeTrajectory))}#moving.NormAngle.fromPoint(initialVelocity)} |
68 | 84 |
69 def predictPosition(self, nTimeSteps): | 85 def predictPosition(self, nTimeSteps): |
70 if nTimeSteps > 0 and not nTimeSteps in self.predictedPositions.keys(): | 86 if nTimeSteps > 0 and not nTimeSteps in self.predictedPositions.keys(): |
71 if constantSpeed: | 87 if self.constantSpeed: |
72 # calculate cumulative distance | 88 # calculate cumulative distance |
73 pass | 89 speedNorm= self.predictedSpeedOrientations[0].norm #moving.NormAngle.fromPoint(initialVelocity).norm |
90 anglePrototype = findNearestOrientation(self.predictedPositions[nTimeSteps-1],self.prototypeTrajectory) | |
91 self.predictedSpeedOrientations[nTimeSteps]= moving.NormAngle(speedNorm, anglePrototype) | |
92 self.predictedPositions[nTimeSteps],tmp= moving.predictPosition(self.predictedPositions[nTimeSteps-1], self.predictedSpeedOrientations[nTimeSteps-1], moving.NormAngle(0,0), None) | |
93 #pass | |
94 elif self.speedList!=[]: | |
95 pass | |
74 else: # see c++ code, calculate ratio | 96 else: # see c++ code, calculate ratio |
75 pass | 97 speedNorm= self.predictedSpeedOrientations[0].norm |
98 instant=findNearestInstant(self.predictedPositions[0],self.prototypeTrajectory) | |
99 prototypeSpeeds= self.prototypeTrajectory.getSpeeds()[instant:] | |
100 ratio=float(speedNorm)/prototypeSpeeds[0] | |
101 resampledSpeeds=[sp*ratio for sp in prototypeSpeeds] | |
102 anglePrototype = findNearestOrientation(self.predictedPositions[nTimeSteps-1],self.prototypeTrajectory) | |
103 if nTimeSteps<len(resampledSpeeds): | |
104 self.predictedSpeedOrientations[nTimeSteps]= moving.NormAngle(resampledSpeeds[nTimeSteps], anglePrototype) | |
105 self.predictedPositions[nTimeSteps],tmp= moving.predictPosition(self.predictedPositions[nTimeSteps-1], self.predictedSpeedOrientations[nTimeSteps-1], moving.NormAngle(0,0), None) | |
106 else: | |
107 self.predictedSpeedOrientations[nTimeSteps]= moving.NormAngle(resampledSpeeds[-1], anglePrototype) | |
108 self.predictedPositions[nTimeSteps],tmp= moving.predictPosition(self.predictedPositions[nTimeSteps-1], self.predictedSpeedOrientations[nTimeSteps-1], moving.NormAngle(0,0), None) | |
109 # pass | |
76 return self.predictedPositions[nTimeSteps] | 110 return self.predictedPositions[nTimeSteps] |
77 | 111 |
78 class PredictedTrajectoryRandomControl(PredictedTrajectory): | 112 class PredictedTrajectoryRandomControl(PredictedTrajectory): |
79 '''Random vehicle control: suitable for normal adaptation''' | 113 '''Random vehicle control: suitable for normal adaptation''' |
80 def __init__(self, initialPosition, initialVelocity, accelerationDistribution, steeringDistribution, probability = 1., maxSpeed = None): | 114 def __init__(self, initialPosition, initialVelocity, accelerationDistribution, steeringDistribution, probability = 1., maxSpeed = None): |
133 return '{0} {1}'.format(self.name, self.maxSpeed) | 167 return '{0} {1}'.format(self.name, self.maxSpeed) |
134 | 168 |
135 def generatePredictedTrajectories(self, obj, instant): | 169 def generatePredictedTrajectories(self, obj, instant): |
136 return [] | 170 return [] |
137 | 171 |
138 def computeCrossingsCollisionsAtInstant(self, currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False): | 172 def computeCrossingsCollisionsAtInstant(self, currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False,prototypeTrajectories1=None,prototypeTrajectories2=None): |
139 '''returns the lists of collision points and crossing zones''' | 173 '''returns the lists of collision points and crossing zones''' |
140 predictedTrajectories1 = self.generatePredictedTrajectories(obj1, currentInstant) | 174 if prototypeTrajectories1==None: |
141 predictedTrajectories2 = self.generatePredictedTrajectories(obj2, currentInstant) | 175 predictedTrajectories1 = self.generatePredictedTrajectories(obj1, currentInstant) |
176 predictedTrajectories2 = self.generatePredictedTrajectories(obj2, currentInstant) | |
177 else: | |
178 predictedTrajectories1 = self.generatePredictedTrajectories(obj1, currentInstant,prototypeTrajectories1) | |
179 predictedTrajectories2 = self.generatePredictedTrajectories(obj2, currentInstant,prototypeTrajectories2) | |
142 | 180 |
143 collisionPoints = [] | 181 collisionPoints = [] |
144 crossingZones = [] | 182 crossingZones = [] |
145 for et1 in predictedTrajectories1: | 183 for et1 in predictedTrajectories1: |
146 for et2 in predictedTrajectories2: | 184 for et2 in predictedTrajectories2: |
159 while not cz and t2 < timeHorizon: | 197 while not cz and t2 < timeHorizon: |
160 #if (et1.predictPosition(t1)-et2.predictPosition(t2)).norm2() < collisionDistanceThreshold: | 198 #if (et1.predictPosition(t1)-et2.predictPosition(t2)).norm2() < collisionDistanceThreshold: |
161 # cz = (et1.predictPosition(t1)+et2.predictPosition(t2)).multiply(0.5) | 199 # cz = (et1.predictPosition(t1)+et2.predictPosition(t2)).multiply(0.5) |
162 cz = moving.segmentIntersection(et1.predictPosition(t1), et1.predictPosition(t1+1), et2.predictPosition(t2), et2.predictPosition(t2+1)) | 200 cz = moving.segmentIntersection(et1.predictPosition(t1), et1.predictPosition(t1+1), et2.predictPosition(t2), et2.predictPosition(t2+1)) |
163 if cz: | 201 if cz: |
164 crossingZones.append(SafetyPoint(cz, et1.probability*et2.probability, abs(t1-t2))) | 202 deltaV= (et1.predictPosition(t1)- et1.predictPosition(t1+1) - et2.predictPosition(t2)+ et2.predictPosition(t2+1)).norm2() |
203 crossingZones.append(SafetyPoint(cz, et1.probability*et2.probability, abs(t1-t2)-(float(collisionDistanceThreshold)/deltaV))) | |
165 t2 += 1 | 204 t2 += 1 |
166 t1 += 1 | 205 t1 += 1 |
167 | 206 |
168 if debug: | 207 if debug: |
169 from matplotlib.pyplot import figure, axis, title | 208 from matplotlib.pyplot import figure, axis, title |
180 title('instant {0}'.format(currentInstant)) | 219 title('instant {0}'.format(currentInstant)) |
181 axis('equal') | 220 axis('equal') |
182 | 221 |
183 return collisionPoints, crossingZones | 222 return collisionPoints, crossingZones |
184 | 223 |
185 def computeCrossingsCollisions(self, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None): | 224 def computeCrossingsCollisions(self, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None,prototypeTrajectories1=None,prototypeTrajectories2=None): |
186 '''Computes all crossing and collision points at each common instant for two road users. ''' | 225 '''Computes all crossing and collision points at each common instant for two road users. ''' |
187 collisionPoints={} | 226 collisionPoints={} |
188 crossingZones={} | 227 crossingZones={} |
189 if timeInterval: | 228 if timeInterval: |
190 commonTimeInterval = timeInterval | 229 commonTimeInterval = timeInterval |
191 else: | 230 else: |
192 commonTimeInterval = obj1.commonTimeInterval(obj2) | 231 commonTimeInterval = obj1.commonTimeInterval(obj2) |
193 for i in list(commonTimeInterval)[:-1]: # do not look at the 1 last position/velocities, often with errors | 232 for i in list(commonTimeInterval)[:-1]: # do not look at the 1 last position/velocities, often with errors |
194 collisionPoints[i], crossingZones[i] = self.computeCrossingsCollisionsAtInstant(i, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ, debug) | 233 collisionPoints[i], crossingZones[i] = self.computeCrossingsCollisionsAtInstant(i, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ, debug,prototypeTrajectories1,prototypeTrajectories2) |
195 | 234 |
196 return collisionPoints, crossingZones | 235 return collisionPoints, crossingZones |
197 | 236 |
198 def computeCollisionProbability(self, obj1, obj2, collisionDistanceThreshold, timeHorizon, debug = False, timeInterval = None): | 237 def computeCollisionProbability(self, obj1, obj2, collisionDistanceThreshold, timeHorizon, debug = False, timeInterval = None): |
199 '''Computes only collision probabilities | 238 '''Computes only collision probabilities |
410 return [],[] | 449 return [],[] |
411 | 450 |
412 #### | 451 #### |
413 # Other Methods | 452 # Other Methods |
414 #### | 453 #### |
415 | 454 class prototypePredictionParameters(PredictionParameters): |
416 | 455 def __init__(self, maxSpeed, nPredictedTrajectories,constantSpeed = True, speedList=[]): |
417 | 456 #TODO speed profiles integration |
418 | 457 name = 'prototype' |
458 PredictionParameters.__init__(self, name, maxSpeed) | |
459 self.nPredictedTrajectories = nPredictedTrajectories | |
460 #self.prototypeTrajectory = prototypeTrajectory | |
461 self.constantSpeed = constantSpeed | |
462 self.speedList = speedList | |
463 #self.probability=probability | |
464 | |
465 def generatePredictedTrajectories(self, obj, instant,prototypeTrajectories): | |
466 predictedTrajectories = [] | |
467 initialPosition = obj.getPositionAtInstant(instant) | |
468 initialVelocity = obj.getVelocityAtInstant(instant) | |
469 for prototypeTraj in prototypeTrajectories.keys(): | |
470 predictedTrajectories.append(PredictedTrajectoryPrototype(initialPosition, initialVelocity, prototypeTraj, constantSpeed = self.constantSpeed, speedList=self.speedList, probability = prototypeTrajectories[prototypeTraj])) | |
471 return predictedTrajectories | |
419 | 472 |
420 if __name__ == "__main__": | 473 if __name__ == "__main__": |
421 import doctest | 474 import doctest |
422 import unittest | 475 import unittest |
423 suite = doctest.DocFileSuite('tests/prediction.txt') | 476 suite = doctest.DocFileSuite('tests/prediction.txt') |