Mercurial Hosting > traffic-intelligence
comparison python/prediction.py @ 607:84690dfe5560
add some functions for behaviour analysis
author | MohamedGomaa |
---|---|
date | Tue, 25 Nov 2014 22:49:47 -0500 |
parents | a9c1d61a89b4 |
children | 0dc36203973d |
comparison
equal
deleted
inserted
replaced
606:75ad9c0d6cc3 | 607:84690dfe5560 |
---|---|
2 '''Library for motion prediction methods''' | 2 '''Library for motion prediction methods''' |
3 | 3 |
4 import moving | 4 import moving |
5 import math | 5 import math |
6 import random | 6 import random |
7 import numpy as np | |
7 | 8 |
8 class PredictedTrajectory: | 9 class PredictedTrajectory: |
9 '''Class for predicted trajectories with lazy evaluation | 10 '''Class for predicted trajectories with lazy evaluation |
10 if the predicted position has not been already computed, compute it | 11 if the predicted position has not been already computed, compute it |
11 | 12 |
44 self.predictedPositions = {0: initialPosition} | 45 self.predictedPositions = {0: initialPosition} |
45 self.predictedSpeedOrientations = {0: moving.NormAngle.fromPoint(initialVelocity)} | 46 self.predictedSpeedOrientations = {0: moving.NormAngle.fromPoint(initialVelocity)} |
46 | 47 |
47 def getControl(self): | 48 def getControl(self): |
48 return self.control | 49 return self.control |
50 | |
51 def findNearestParams(initialPosition,prototypeTrajectory): | |
52 ''' nearest parameters are the index of minDistance and the orientation ''' | |
53 distances=[] | |
54 for position in prototypeTrajectory.positions: | |
55 distances.append(moving.Point.distanceNorm2(initialPosition, position)) | |
56 minDistanceIndex= np.argmin(distances) | |
57 return minDistanceIndex, moving.NormAngle.fromPoint(prototypeTrajectory.velocities[minDistanceIndex]).angle | |
49 | 58 |
50 class PredictedTrajectoryPrototype(PredictedTrajectory): | 59 class PredictedTrajectoryPrototype(PredictedTrajectory): |
51 '''Predicted trajectory that follows a prototype trajectory | 60 '''Predicted trajectory that follows a prototype trajectory |
52 The prototype is in the format of a moving.Trajectory: it could be | 61 The prototype is in the format of a moving.Trajectory: it could be |
53 1. an observed trajectory (extracted from video) | 62 1. an observed trajectory (extracted from video) |
62 def __init__(self, initialPosition, initialVelocity, prototypeTrajectory, constantSpeed = True, probability = 1.): | 71 def __init__(self, initialPosition, initialVelocity, prototypeTrajectory, constantSpeed = True, probability = 1.): |
63 self.prototypeTrajectory = prototypeTrajectory | 72 self.prototypeTrajectory = prototypeTrajectory |
64 self.constantSpeed = constantSpeed | 73 self.constantSpeed = constantSpeed |
65 self.probability = probability | 74 self.probability = probability |
66 self.predictedPositions = {0: initialPosition} | 75 self.predictedPositions = {0: initialPosition} |
67 self.predictedSpeedOrientations = {0: moving.NormAngle.fromPoint(initialVelocity)} | 76 self.predictedSpeedOrientations = {0: moving.NormAngle(moving.NormAngle.fromPoint(initialVelocity).norm, findNearestParams(initialPosition,prototypeTrajectory)[1])}#moving.NormAngle.fromPoint(initialVelocity)} |
68 | 77 |
69 def predictPosition(self, nTimeSteps): | 78 def predictPosition(self, nTimeSteps): |
70 if nTimeSteps > 0 and not nTimeSteps in self.predictedPositions.keys(): | 79 if nTimeSteps > 0 and not nTimeSteps in self.predictedPositions.keys(): |
71 if constantSpeed: | 80 if self.constantSpeed: |
72 # calculate cumulative distance | 81 # calculate cumulative distance |
73 pass | 82 speedNorm= self.predictedSpeedOrientations[0].norm #moving.NormAngle.fromPoint(initialVelocity).norm |
83 anglePrototype = findNearestParams(self.predictedPositions[nTimeSteps-1],self.prototypeTrajectory)[1] | |
84 self.predictedSpeedOrientations[nTimeSteps]= moving.NormAngle(speedNorm, anglePrototype) | |
85 self.predictedPositions[nTimeSteps],tmp= moving.predictPosition(self.predictedPositions[nTimeSteps-1], self.predictedSpeedOrientations[nTimeSteps-1], moving.NormAngle(0,0), None) | |
86 | |
74 else: # see c++ code, calculate ratio | 87 else: # see c++ code, calculate ratio |
75 pass | 88 speedNorm= self.predictedSpeedOrientations[0].norm |
89 instant=findNearestParams(self.predictedPositions[0],self.prototypeTrajectory)[0] | |
90 prototypeSpeeds= self.prototypeTrajectory.getSpeeds()[instant:] | |
91 ratio=float(speedNorm)/prototypeSpeeds[0] | |
92 resampledSpeeds=[sp*ratio for sp in prototypeSpeeds] | |
93 anglePrototype = findNearestParams(self.predictedPositions[nTimeSteps-1],self.prototypeTrajectory)[1] | |
94 if nTimeSteps<len(resampledSpeeds): | |
95 self.predictedSpeedOrientations[nTimeSteps]= moving.NormAngle(resampledSpeeds[nTimeSteps], anglePrototype) | |
96 self.predictedPositions[nTimeSteps],tmp= moving.predictPosition(self.predictedPositions[nTimeSteps-1], self.predictedSpeedOrientations[nTimeSteps-1], moving.NormAngle(0,0), None) | |
97 else: | |
98 self.predictedSpeedOrientations[nTimeSteps]= moving.NormAngle(resampledSpeeds[-1], anglePrototype) | |
99 self.predictedPositions[nTimeSteps],tmp= moving.predictPosition(self.predictedPositions[nTimeSteps-1], self.predictedSpeedOrientations[nTimeSteps-1], moving.NormAngle(0,0), None) | |
100 | |
76 return self.predictedPositions[nTimeSteps] | 101 return self.predictedPositions[nTimeSteps] |
77 | 102 |
78 class PredictedTrajectoryRandomControl(PredictedTrajectory): | 103 class PredictedTrajectoryRandomControl(PredictedTrajectory): |
79 '''Random vehicle control: suitable for normal adaptation''' | 104 '''Random vehicle control: suitable for normal adaptation''' |
80 def __init__(self, initialPosition, initialVelocity, accelerationDistribution, steeringDistribution, probability = 1., maxSpeed = None): | 105 def __init__(self, initialPosition, initialVelocity, accelerationDistribution, steeringDistribution, probability = 1., maxSpeed = None): |
139 title('instant {0}'.format(currentInstant)) | 164 title('instant {0}'.format(currentInstant)) |
140 axis('equal') | 165 axis('equal') |
141 savefig('predicted-trajectories-t-{0}.png'.format(currentInstant)) | 166 savefig('predicted-trajectories-t-{0}.png'.format(currentInstant)) |
142 close() | 167 close() |
143 | 168 |
144 def computeCrossingsCollisionsAtInstant(predictionParams, currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False): | 169 def computeCrossingsCollisionsAtInstant(predictionParams,currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False,prototypeTrajectories1=None,prototypeTrajectories2=None): |
145 '''returns the lists of collision points and crossing zones''' | 170 '''returns the lists of collision points and crossing zones''' |
146 predictedTrajectories1 = predictionParams.generatePredictedTrajectories(obj1, currentInstant) | 171 if prototypeTrajectories1==None: |
147 predictedTrajectories2 = predictionParams.generatePredictedTrajectories(obj2, currentInstant) | 172 predictedTrajectories1 = predictionParams.generatePredictedTrajectories(obj1, currentInstant) |
173 predictedTrajectories2 = predictionParams.generatePredictedTrajectories(obj2, currentInstant) | |
174 else: | |
175 predictedTrajectories1 = predictionParams.generatePredictedTrajectories(obj1, currentInstant,prototypeTrajectories1) | |
176 predictedTrajectories2 = predictionParams.generatePredictedTrajectories(obj2, currentInstant,prototypeTrajectories2) | |
148 | 177 |
149 collisionPoints = [] | 178 collisionPoints = [] |
150 crossingZones = [] | 179 crossingZones = [] |
151 for et1 in predictedTrajectories1: | 180 for et1 in predictedTrajectories1: |
152 for et2 in predictedTrajectories2: | 181 for et2 in predictedTrajectories2: |
164 t2 = 0 | 193 t2 = 0 |
165 while not cz and t2 < timeHorizon: | 194 while not cz and t2 < timeHorizon: |
166 #if (et1.predictPosition(t1)-et2.predictPosition(t2)).norm2() < collisionDistanceThreshold: | 195 #if (et1.predictPosition(t1)-et2.predictPosition(t2)).norm2() < collisionDistanceThreshold: |
167 # cz = (et1.predictPosition(t1)+et2.predictPosition(t2)).multiply(0.5) | 196 # cz = (et1.predictPosition(t1)+et2.predictPosition(t2)).multiply(0.5) |
168 cz = moving.segmentIntersection(et1.predictPosition(t1), et1.predictPosition(t1+1), et2.predictPosition(t2), et2.predictPosition(t2+1)) | 197 cz = moving.segmentIntersection(et1.predictPosition(t1), et1.predictPosition(t1+1), et2.predictPosition(t2), et2.predictPosition(t2+1)) |
169 if cz != None: | 198 if cz: |
170 crossingZones.append(SafetyPoint(cz, et1.probability*et2.probability, abs(t1-t2))) | 199 deltaV= (et1.predictPosition(t1)- et1.predictPosition(t1+1) - et2.predictPosition(t2)+ et2.predictPosition(t2+1)).norm2() |
200 crossingZones.append(SafetyPoint(cz, et1.probability*et2.probability, abs(t1-t2)-(float(collisionDistanceThreshold)/deltaV))) | |
171 t2 += 1 | 201 t2 += 1 |
172 t1 += 1 | 202 t1 += 1 |
173 | 203 |
174 if debug: | 204 if debug: |
175 savePredictedTrajectoriesFigure(currentInstant, obj1, obj2, predictedTrajectories1, predictedTrajectories2, timeHorizon) | 205 savePredictedTrajectoriesFigure(currentInstant, obj1, obj2, predictedTrajectories1, predictedTrajectories2, timeHorizon) |
176 | 206 return currentInstant,collisionPoints, crossingZones |
177 return currentInstant, collisionPoints, crossingZones | 207 |
178 | 208 def computeCrossingsCollisions(predictionParams, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None,nProcesses = 1,prototypeTrajectories1=None,prototypeTrajectories2=None): |
179 def computeCrossingsCollisions(predictionParams, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None, nProcesses = 1): | |
180 '''Computes all crossing and collision points at each common instant for two road users. ''' | 209 '''Computes all crossing and collision points at each common instant for two road users. ''' |
181 collisionPoints={} | 210 collisionPoints={} |
182 crossingZones={} | 211 crossingZones={} |
183 if timeInterval: | 212 if timeInterval: |
184 commonTimeInterval = timeInterval | 213 commonTimeInterval = timeInterval |
185 else: | 214 else: |
186 commonTimeInterval = obj1.commonTimeInterval(obj2) | 215 commonTimeInterval = obj1.commonTimeInterval(obj2) |
187 if nProcesses == 1: | 216 if nProcesses == 1: |
188 for i in list(commonTimeInterval)[:-1]: # do not look at the 1 last position/velocities, often with errors | 217 for i in list(commonTimeInterval)[:-1]: # do not look at the 1 last position/velocities, often with errors |
189 i, cp, cz = computeCrossingsCollisionsAtInstant(predictionParams, i, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ, debug) | 218 i, cp, cz = computeCrossingsCollisionsAtInstant(predictionParams, i, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ, debug,prototypeTrajectories1,prototypeTrajectories2) |
190 if len(cp) != 0: | 219 if len(cp) != 0: |
191 collisionPoints[i] = cp | 220 collisionPoints[i] = cp |
192 if len(cz) != 0: | 221 if len(cz) != 0: |
193 crossingZones[i] = cz | 222 crossingZones[i] = cz |
194 else: | 223 else: |
195 from multiprocessing import Pool | 224 from multiprocessing import Pool |
196 pool = Pool(processes = nProcesses) | 225 pool = Pool(processes = nProcesses) |
197 jobs = [pool.apply_async(computeCrossingsCollisionsAtInstant, args = (predictionParams, i, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ, debug)) for i in list(commonTimeInterval)[:-1]] | 226 jobs = [pool.apply_async(computeCrossingsCollisionsAtInstant, args = (predictionParams, i, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ, debug,prototypeTrajectories1,prototypeTrajectories2)) for i in list(commonTimeInterval)[:-1]] |
198 #results = [j.get() for j in jobs] | 227 #results = [j.get() for j in jobs] |
199 #results.sort() | 228 #results.sort() |
200 for j in jobs: | 229 for j in jobs: |
201 i, cp, cz = j.get() | 230 i, cp, cz = j.get() |
202 #if len(cp) != 0 or len(cz) != 0: | 231 #if len(cp) != 0 or len(cz) != 0: |
203 if len(cp) != 0: | 232 if len(cp) != 0: |
204 collisionPoints[i] = cp | 233 collisionPoints[i] = cp |
205 if len(cz) != 0: | 234 if len(cz) != 0: |
206 crossingZones[i] = cz | 235 crossingZones[i] = cz |
207 pool.close() | 236 pool.close() |
208 return collisionPoints, crossingZones | 237 return collisionPoints, crossingZones |
209 | 238 |
210 class PredictionParameters: | 239 class PredictionParameters: |
211 def __init__(self, name, maxSpeed): | 240 def __init__(self, name, maxSpeed): |
212 self.name = name | 241 self.name = name |
213 self.maxSpeed = maxSpeed | 242 self.maxSpeed = maxSpeed |
216 return '{0} {1}'.format(self.name, self.maxSpeed) | 245 return '{0} {1}'.format(self.name, self.maxSpeed) |
217 | 246 |
218 def generatePredictedTrajectories(self, obj, instant): | 247 def generatePredictedTrajectories(self, obj, instant): |
219 return [] | 248 return [] |
220 | 249 |
221 def computeCrossingsCollisionsAtInstant(self, currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False): | 250 def computeCrossingsCollisionsAtInstant(self, currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False,prototypeTrajectories1=None,prototypeTrajectories2=None): |
222 return computeCrossingsCollisionsAtInstant(self, currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ, debug) | 251 return computeCrossingsCollisionsAtInstant(self, currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ, debug,prototypeTrajectories1,prototypeTrajectories2) |
223 | 252 |
224 def computeCrossingsCollisions(self, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None, nProcesses = 1): | 253 def computeCrossingsCollisions(self, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None, nProcesses = 1,prototypeTrajectories1=None,prototypeTrajectories2=None): |
225 return computeCrossingsCollisions(self, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ, debug, timeInterval, nProcesses) | 254 return computeCrossingsCollisions(self, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ, debug, timeInterval, nProcesses,prototypeTrajectories1,prototypeTrajectories2) |
226 | 255 |
227 def computeCollisionProbability(self, obj1, obj2, collisionDistanceThreshold, timeHorizon, debug = False, timeInterval = None): | 256 def computeCollisionProbability(self, obj1, obj2, collisionDistanceThreshold, timeHorizon, debug = False, timeInterval = None): |
228 '''Computes only collision probabilities | 257 '''Computes only collision probabilities |
229 Returns for each instant the collision probability and number of samples drawn''' | 258 Returns for each instant the collision probability and number of samples drawn''' |
230 collisionProbabilities = {} | 259 collisionProbabilities = {} |
426 return [],[] | 455 return [],[] |
427 | 456 |
428 #### | 457 #### |
429 # Other Methods | 458 # Other Methods |
430 #### | 459 #### |
431 | 460 class prototypePredictionParameters(PredictionParameters): |
432 | 461 def __init__(self, maxSpeed, nPredictedTrajectories,constantSpeed = True): |
433 | 462 name = 'prototype' |
434 | 463 PredictionParameters.__init__(self, name, maxSpeed) |
464 self.nPredictedTrajectories = nPredictedTrajectories | |
465 self.constantSpeed = constantSpeed | |
466 | |
467 def generatePredictedTrajectories(self, obj, instant,prototypeTrajectories): | |
468 predictedTrajectories = [] | |
469 initialPosition = obj.getPositionAtInstant(instant) | |
470 initialVelocity = obj.getVelocityAtInstant(instant) | |
471 for prototypeTraj in prototypeTrajectories.keys(): | |
472 predictedTrajectories.append(PredictedTrajectoryPrototype(initialPosition, initialVelocity, prototypeTraj, constantSpeed = self.constantSpeed, probability = prototypeTrajectories[prototypeTraj])) | |
473 return predictedTrajectories | |
435 | 474 |
436 if __name__ == "__main__": | 475 if __name__ == "__main__": |
437 import doctest | 476 import doctest |
438 import unittest | 477 import unittest |
439 suite = doctest.DocFileSuite('tests/prediction.txt') | 478 suite = doctest.DocFileSuite('tests/prediction.txt') |