comparison python/prediction.py @ 614:5e09583275a4

Merged Nicolas/trafficintelligence into default
author Mohamed Gomaa <eng.m.gom3a@gmail.com>
date Fri, 05 Dec 2014 12:13:53 -0500
parents a9c1d61a89b4
children 84690dfe5560
comparison
equal deleted inserted replaced
598:11f96bd08552 614:5e09583275a4
28 return moving.Trajectory.fromPointList(self.predictedPositions.values()) 28 return moving.Trajectory.fromPointList(self.predictedPositions.values())
29 29
30 def getPredictedSpeeds(self): 30 def getPredictedSpeeds(self):
31 return [so.norm for so in self.predictedSpeedOrientations.values()] 31 return [so.norm for so in self.predictedSpeedOrientations.values()]
32 32
33 def draw(self, options = '', withOrigin = False, timeStep = 1, **kwargs): 33 def plot(self, options = '', withOrigin = False, timeStep = 1, **kwargs):
34 self.getPredictedTrajectory().draw(options, withOrigin, timeStep, **kwargs) 34 self.getPredictedTrajectory().plot(options, withOrigin, timeStep, **kwargs)
35 35
36 class PredictedTrajectoryConstant(PredictedTrajectory): 36 class PredictedTrajectoryConstant(PredictedTrajectory):
37 '''Predicted trajectory at constant speed or acceleration 37 '''Predicted trajectory at constant speed or acceleration
38 TODO generalize by passing a series of velocities/accelerations''' 38 TODO generalize by passing a series of velocities/accelerations'''
39 39
40 def __init__(self, initialPosition, initialVelocity, control = moving.NormAngle(0,0), probability = 1, maxSpeed = None): 40 def __init__(self, initialPosition, initialVelocity, control = moving.NormAngle(0,0), probability = 1., maxSpeed = None):
41 self.control = control 41 self.control = control
42 self.maxSpeed = maxSpeed 42 self.maxSpeed = maxSpeed
43 self.probability = probability 43 self.probability = probability
44 self.predictedPositions = {0: initialPosition} 44 self.predictedPositions = {0: initialPosition}
45 self.predictedSpeedOrientations = {0: moving.NormAngle.fromPoint(initialVelocity)} 45 self.predictedSpeedOrientations = {0: moving.NormAngle.fromPoint(initialVelocity)}
46 46
47 def getControl(self): 47 def getControl(self):
48 return self.control 48 return self.control
49 49
50 class PredictedTrajectoryNormalAdaptation(PredictedTrajectory): 50 class PredictedTrajectoryPrototype(PredictedTrajectory):
51 '''Random small adaptation of vehicle control ''' 51 '''Predicted trajectory that follows a prototype trajectory
52 def __init__(self, initialPosition, initialVelocity, accelerationDistribution, steeringDistribution, probability = 1, maxSpeed = None): 52 The prototype is in the format of a moving.Trajectory: it could be
53 1. an observed trajectory (extracted from video)
54 2. a generic polyline (eg the road centerline) that a vehicle is supposed to follow
55
56 Prediction can be done
57 1. at constant speed (the instantaneous user speed)
58 2. following the trajectory path, at the speed of the user
59 (applying a constant ratio equal
60 to the ratio of the user instantaneous speed and the trajectory closest speed)'''
61
62 def __init__(self, initialPosition, initialVelocity, prototypeTrajectory, constantSpeed = True, probability = 1.):
63 self.prototypeTrajectory = prototypeTrajectory
64 self.constantSpeed = constantSpeed
65 self.probability = probability
66 self.predictedPositions = {0: initialPosition}
67 self.predictedSpeedOrientations = {0: moving.NormAngle.fromPoint(initialVelocity)}
68
69 def predictPosition(self, nTimeSteps):
70 if nTimeSteps > 0 and not nTimeSteps in self.predictedPositions.keys():
71 if constantSpeed:
72 # calculate cumulative distance
73 pass
74 else: # see c++ code, calculate ratio
75 pass
76 return self.predictedPositions[nTimeSteps]
77
78 class PredictedTrajectoryRandomControl(PredictedTrajectory):
79 '''Random vehicle control: suitable for normal adaptation'''
80 def __init__(self, initialPosition, initialVelocity, accelerationDistribution, steeringDistribution, probability = 1., maxSpeed = None):
53 '''Constructor 81 '''Constructor
54 accelerationDistribution and steeringDistribution are distributions 82 accelerationDistribution and steeringDistribution are distributions
55 that return random numbers drawn from them''' 83 that return random numbers drawn from them'''
56 self.accelerationDistribution = accelerationDistribution 84 self.accelerationDistribution = accelerationDistribution
57 self.steeringDistribution = steeringDistribution 85 self.steeringDistribution = steeringDistribution
61 self.predictedSpeedOrientations = {0: moving.NormAngle.fromPoint(initialVelocity)} 89 self.predictedSpeedOrientations = {0: moving.NormAngle.fromPoint(initialVelocity)}
62 90
63 def getControl(self): 91 def getControl(self):
64 return moving.NormAngle(self.accelerationDistribution(),self.steeringDistribution()) 92 return moving.NormAngle(self.accelerationDistribution(),self.steeringDistribution())
65 93
66 class PredictionParameters:
67 def __init__(self, name, maxSpeed):
68 self.name = name
69 self.maxSpeed = maxSpeed
70
71 def __str__(self):
72 return '{0} {1}'.format(self.name, self.maxSpeed)
73
74 class ConstantPredictionParameters(PredictionParameters):
75 def __init__(self, maxSpeed):
76 PredictionParameters.__init__(self, 'constant velocity', maxSpeed)
77
78 def generatePredictedTrajectories(self, obj, instant):
79 return [PredictedTrajectoryConstant(obj.getPositionAtInstant(instant), obj.getVelocityAtInstant(instant),
80 maxSpeed = self.maxSpeed)]
81
82 class NormalAdaptationPredictionParameters(PredictionParameters):
83 def __init__(self, maxSpeed, nPredictedTrajectories, maxAcceleration, maxSteering):
84 PredictionParameters.__init__(self, 'normal adaptation', maxSpeed)
85 self.nPredictedTrajectories = nPredictedTrajectories
86 self.maxAcceleration = maxAcceleration
87 self.maxSteering = maxSteering
88 self.accelerationDistribution = lambda: random.triangular(-self.maxAcceleration,
89 self.maxAcceleration, 0.)
90 self.steeringDistribution = lambda: random.triangular(-self.maxSteering,
91 self.maxSteering, 0.)
92
93 def __str__(self):
94 return PredictionParameters.__str__(self)+' {0} {1} {2}'.format(self.nPredictedTrajectories,
95 self.maxAcceleration,
96 self.maxSteering)
97
98 def generatePredictedTrajectories(self, obj, instant):
99 predictedTrajectories = []
100 for i in xrange(self.nPredictedTrajectories):
101 predictedTrajectories.append(PredictedTrajectoryNormalAdaptation(obj.getPositionAtInstant(instant),
102 obj.getVelocityAtInstant(instant),
103 self.accelerationDistribution,
104 self.steeringDistribution,
105 maxSpeed = self.maxSpeed))
106 return predictedTrajectories
107
108 class PointSetPredictionParameters(PredictionParameters):
109 # todo generate several trajectories with normal adaptatoins from each position (feature)
110 def __init__(self, maxSpeed):
111 PredictionParameters.__init__(self, 'point set', maxSpeed)
112
113 def generatePredictedTrajectories(self, obj, instant):
114 predictedTrajectories = []
115 features = [f for f in obj.features if f.existsAtInstant(instant)]
116 positions = [f.getPositionAtInstant(instant) for f in features]
117 velocities = [f.getVelocityAtInstant(instant) for f in features]
118 for initialPosition,initialVelocity in zip(positions, velocities):
119 predictedTrajectories.append(PredictedTrajectoryConstant(initialPosition, initialVelocity,
120 maxSpeed = self.maxSpeed))
121 return predictedTrajectories
122
123 class EvasiveActionPredictionParameters(PredictionParameters):
124 def __init__(self, maxSpeed, nPredictedTrajectories, minAcceleration, maxAcceleration, maxSteering, useFeatures = False):
125 if useFeatures:
126 name = 'point set evasive action'
127 else:
128 name = 'evasive action'
129 PredictionParameters.__init__(self, name, maxSpeed)
130 self.nPredictedTrajectories = nPredictedTrajectories
131 self.minAcceleration = minAcceleration
132 self.maxAcceleration = maxAcceleration
133 self.maxSteering = maxSteering
134 self.useFeatures = useFeatures
135 self.accelerationDistribution = lambda: random.triangular(self.minAcceleration,
136 self.maxAcceleration, 0.)
137 self.steeringDistribution = lambda: random.triangular(-self.maxSteering,
138 self.maxSteering, 0.)
139
140 def __str__(self):
141 return PredictionParameters.__str__(self)+' {0} {1} {2} {3}'.format(self.nPredictedTrajectories, self.minAcceleration, self.maxAcceleration, self.maxSteering)
142
143 def generatePredictedTrajectories(self, obj, instant):
144 predictedTrajectories = []
145 if self.useFeatures:
146 features = [f for f in obj.features if f.existsAtInstant(instant)]
147 positions = [f.getPositionAtInstant(instant) for f in features]
148 velocities = [f.getVelocityAtInstant(instant) for f in features]
149 else:
150 positions = [obj.getPositionAtInstant(instant)]
151 velocities = [obj.getVelocityAtInstant(instant)]
152 for i in xrange(self.nPredictedTrajectories):
153 for initialPosition,initialVelocity in zip(positions, velocities):
154 predictedTrajectories.append(PredictedTrajectoryConstant(initialPosition,
155 initialVelocity,
156 moving.NormAngle(self.accelerationDistribution(),
157 self.steeringDistribution()),
158 maxSpeed = self.maxSpeed))
159 return predictedTrajectories
160
161 class SafetyPoint(moving.Point): 94 class SafetyPoint(moving.Point):
162 '''Can represent a collision point or crossing zone 95 '''Can represent a collision point or crossing zone
163 with respective safety indicator, TTC or pPET''' 96 with respective safety indicator, TTC or pPET'''
164 def __init__(self, p, probability = 1., indicator = -1): 97 def __init__(self, p, probability = 1., indicator = -1):
165 self.x = p.x 98 self.x = p.x
173 @staticmethod 106 @staticmethod
174 def save(out, points, predictionInstant, objNum1, objNum2): 107 def save(out, points, predictionInstant, objNum1, objNum2):
175 for p in points: 108 for p in points:
176 out.write('{0} {1} {2} {3}\n'.format(objNum1, objNum2, predictionInstant, p)) 109 out.write('{0} {1} {2} {3}\n'.format(objNum1, objNum2, predictionInstant, p))
177 110
178 def computeExpectedIndicator(points): 111 @staticmethod
179 from numpy import sum 112 def computeExpectedIndicator(points):
180 return sum([p.indicator*p.probability for p in points])/sum([p.probability for p in points]) 113 from numpy import sum
114 return sum([p.indicator*p.probability for p in points])/sum([p.probability for p in points])
181 115
182 def computeCollisionTime(predictedTrajectory1, predictedTrajectory2, collisionDistanceThreshold, timeHorizon): 116 def computeCollisionTime(predictedTrajectory1, predictedTrajectory2, collisionDistanceThreshold, timeHorizon):
183 '''Computes the first instant at which two predicted trajectories are within some distance threshold''' 117 '''Computes the first instant at which two predicted trajectories are within some distance threshold'''
184 t = 1 118 t = 1
185 p1 = predictedTrajectory1.predictPosition(t) 119 p1 = predictedTrajectory1.predictPosition(t)
188 p1 = predictedTrajectory1.predictPosition(t) 122 p1 = predictedTrajectory1.predictPosition(t)
189 p2 = predictedTrajectory2.predictPosition(t) 123 p2 = predictedTrajectory2.predictPosition(t)
190 t += 1 124 t += 1
191 return t, p1, p2 125 return t, p1, p2
192 126
193 def computeCrossingsCollisionsAtInstant(currentInstant, obj1, obj2, predictionParameters, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False): 127 def savePredictedTrajectoriesFigure(currentInstant, obj1, obj2, predictedTrajectories1, predictedTrajectories2, timeHorizon):
194 '''returns the lists of collision points and crossing zones 128 from matplotlib.pyplot import figure, axis, title, close, savefig
195 129 figure()
196 Check: Predicting all the points together, as if they represent the whole vehicle''' 130 for et in predictedTrajectories1:
197 predictedTrajectories1 = predictionParameters.generatePredictedTrajectories(obj1, currentInstant) 131 et.predictPosition(timeHorizon)
198 predictedTrajectories2 = predictionParameters.generatePredictedTrajectories(obj2, currentInstant) 132 et.plot('rx')
133
134 for et in predictedTrajectories2:
135 et.predictPosition(timeHorizon)
136 et.plot('bx')
137 obj1.plot('r')
138 obj2.plot('b')
139 title('instant {0}'.format(currentInstant))
140 axis('equal')
141 savefig('predicted-trajectories-t-{0}.png'.format(currentInstant))
142 close()
143
144 def computeCrossingsCollisionsAtInstant(predictionParams, currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False):
145 '''returns the lists of collision points and crossing zones'''
146 predictedTrajectories1 = predictionParams.generatePredictedTrajectories(obj1, currentInstant)
147 predictedTrajectories2 = predictionParams.generatePredictedTrajectories(obj2, currentInstant)
199 148
200 collisionPoints = [] 149 collisionPoints = []
201 crossingZones = [] 150 crossingZones = []
202 for et1 in predictedTrajectories1: 151 for et1 in predictedTrajectories1:
203 for et2 in predictedTrajectories2: 152 for et2 in predictedTrajectories2:
204 t, p1, p2 = computeCollisionTime(et1, et2, collisionDistanceThreshold, timeHorizon) 153 t, p1, p2 = computeCollisionTime(et1, et2, collisionDistanceThreshold, timeHorizon)
205 154
206 if t <= timeHorizon: 155 if t <= timeHorizon:
207 collisionPoints.append(SafetyPoint((p1+p2).multiply(0.5), et1.probability*et2.probability, t)) 156 collisionPoints.append(SafetyPoint((p1+p2).multiply(0.5), et1.probability*et2.probability, t))
208 elif computeCZ: # check if there is a crossing zone 157 elif computeCZ: # check if there is a crossing zone
209 # TODO? zone should be around the points at which the traj are the closest 158 # TODO? zone should be around the points at which the traj are the closest
210 # look for CZ at different times, otherwise it would be a collision 159 # look for CZ at different times, otherwise it would be a collision
215 t2 = 0 164 t2 = 0
216 while not cz and t2 < timeHorizon: 165 while not cz and t2 < timeHorizon:
217 #if (et1.predictPosition(t1)-et2.predictPosition(t2)).norm2() < collisionDistanceThreshold: 166 #if (et1.predictPosition(t1)-et2.predictPosition(t2)).norm2() < collisionDistanceThreshold:
218 # cz = (et1.predictPosition(t1)+et2.predictPosition(t2)).multiply(0.5) 167 # cz = (et1.predictPosition(t1)+et2.predictPosition(t2)).multiply(0.5)
219 cz = moving.segmentIntersection(et1.predictPosition(t1), et1.predictPosition(t1+1), et2.predictPosition(t2), et2.predictPosition(t2+1)) 168 cz = moving.segmentIntersection(et1.predictPosition(t1), et1.predictPosition(t1+1), et2.predictPosition(t2), et2.predictPosition(t2+1))
220 if cz: 169 if cz != None:
221 crossingZones.append(SafetyPoint(cz, et1.probability*et2.probability, abs(t1-t2))) 170 crossingZones.append(SafetyPoint(cz, et1.probability*et2.probability, abs(t1-t2)))
222 t2 += 1 171 t2 += 1
223 t1 += 1 172 t1 += 1
224 173
225 if debug: 174 if debug:
226 from matplotlib.pyplot import figure, axis, title 175 savePredictedTrajectoriesFigure(currentInstant, obj1, obj2, predictedTrajectories1, predictedTrajectories2, timeHorizon)
227 figure() 176
228 for et in predictedTrajectories1: 177 return currentInstant, collisionPoints, crossingZones
229 et.predictPosition(timeHorizon) 178
230 et.draw('rx') 179 def computeCrossingsCollisions(predictionParams, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None, nProcesses = 1):
231
232 for et in predictedTrajectories2:
233 et.predictPosition(timeHorizon)
234 et.draw('bx')
235 obj1.draw('r')
236 obj2.draw('b')
237 title('instant {0}'.format(i))
238 axis('equal')
239
240 return collisionPoints, crossingZones
241
242 def computeCrossingsCollisions(obj1, obj2, predictionParameters, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None):
243 '''Computes all crossing and collision points at each common instant for two road users. ''' 180 '''Computes all crossing and collision points at each common instant for two road users. '''
244 collisionPoints={} 181 collisionPoints={}
245 crossingZones={} 182 crossingZones={}
246 if timeInterval: 183 if timeInterval:
247 commonTimeInterval = timeInterval 184 commonTimeInterval = timeInterval
248 else: 185 else:
249 commonTimeInterval = obj1.commonTimeInterval(obj2) 186 commonTimeInterval = obj1.commonTimeInterval(obj2)
250 for i in list(commonTimeInterval)[:-1]: # do not look at the 1 last position/velocities, often with errors 187 if nProcesses == 1:
251 collisionPoints[i], crossingZones[i] = computeCrossingsCollisionsAtInstant(i, obj1, obj2, predictionParameters, collisionDistanceThreshold, timeHorizon, computeCZ, debug) 188 for i in list(commonTimeInterval)[:-1]: # do not look at the 1 last position/velocities, often with errors
252 189 i, cp, cz = computeCrossingsCollisionsAtInstant(predictionParams, i, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ, debug)
190 if len(cp) != 0:
191 collisionPoints[i] = cp
192 if len(cz) != 0:
193 crossingZones[i] = cz
194 else:
195 from multiprocessing import Pool
196 pool = Pool(processes = nProcesses)
197 jobs = [pool.apply_async(computeCrossingsCollisionsAtInstant, args = (predictionParams, i, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ, debug)) for i in list(commonTimeInterval)[:-1]]
198 #results = [j.get() for j in jobs]
199 #results.sort()
200 for j in jobs:
201 i, cp, cz = j.get()
202 #if len(cp) != 0 or len(cz) != 0:
203 if len(cp) != 0:
204 collisionPoints[i] = cp
205 if len(cz) != 0:
206 crossingZones[i] = cz
207 pool.close()
253 return collisionPoints, crossingZones 208 return collisionPoints, crossingZones
254 209
255 def computeCollisionProbability(obj1, obj2, predictionParameters, collisionDistanceThreshold, timeHorizon, debug = False, timeInterval = None): 210 class PredictionParameters:
256 '''Computes only collision probabilities 211 def __init__(self, name, maxSpeed):
257 Returns for each instant the collision probability and number of samples drawn''' 212 self.name = name
258 collisionProbabilities = {} 213 self.maxSpeed = maxSpeed
259 if timeInterval: 214
260 commonTimeInterval = timeInterval 215 def __str__(self):
261 else: 216 return '{0} {1}'.format(self.name, self.maxSpeed)
262 commonTimeInterval = obj1.commonTimeInterval(obj2) 217
263 for i in list(commonTimeInterval)[:-1]: 218 def generatePredictedTrajectories(self, obj, instant):
264 nCollisions = 0 219 return []
265 print(obj1.num, obj2.num, i) 220
266 predictedTrajectories1 = predictionParameters.generatePredictedTrajectories(obj1, i) 221 def computeCrossingsCollisionsAtInstant(self, currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False):
267 predictedTrajectories2 = predictionParameters.generatePredictedTrajectories(obj2, i) 222 return computeCrossingsCollisionsAtInstant(self, currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ, debug)
268 for et1 in predictedTrajectories1: 223
269 for et2 in predictedTrajectories2: 224 def computeCrossingsCollisions(self, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None, nProcesses = 1):
270 t, p1, p2 = computeCollisionTime(et1, et2, collisionDistanceThreshold, timeHorizon) 225 return computeCrossingsCollisions(self, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ, debug, timeInterval, nProcesses)
271 if t <= timeHorizon: 226
272 nCollisions += 1 227 def computeCollisionProbability(self, obj1, obj2, collisionDistanceThreshold, timeHorizon, debug = False, timeInterval = None):
273 # take into account probabilities ?? 228 '''Computes only collision probabilities
274 nSamples = float(len(predictedTrajectories1)*len(predictedTrajectories2)) 229 Returns for each instant the collision probability and number of samples drawn'''
275 collisionProbabilities[i] = [nSamples, float(nCollisions)/nSamples] 230 collisionProbabilities = {}
276 231 if timeInterval:
277 if debug: 232 commonTimeInterval = timeInterval
278 from matplotlib.pyplot import figure, axis, title 233 else:
234 commonTimeInterval = obj1.commonTimeInterval(obj2)
235 for i in list(commonTimeInterval)[:-1]:
236 nCollisions = 0
237 predictedTrajectories1 = self.generatePredictedTrajectories(obj1, i)
238 predictedTrajectories2 = self.generatePredictedTrajectories(obj2, i)
239 for et1 in predictedTrajectories1:
240 for et2 in predictedTrajectories2:
241 t, p1, p2 = computeCollisionTime(et1, et2, collisionDistanceThreshold, timeHorizon)
242 if t <= timeHorizon:
243 nCollisions += 1
244 # take into account probabilities ??
245 nSamples = float(len(predictedTrajectories1)*len(predictedTrajectories2))
246 collisionProbabilities[i] = [nSamples, float(nCollisions)/nSamples]
247
248 if debug:
249 savePredictedTrajectoriesFigure(i, obj1, obj2, predictedTrajectories1, predictedTrajectories2, timeHorizon)
250
251 return collisionProbabilities
252
253 class ConstantPredictionParameters(PredictionParameters):
254 def __init__(self, maxSpeed):
255 PredictionParameters.__init__(self, 'constant velocity', maxSpeed)
256
257 def generatePredictedTrajectories(self, obj, instant):
258 return [PredictedTrajectoryConstant(obj.getPositionAtInstant(instant), obj.getVelocityAtInstant(instant),
259 maxSpeed = self.maxSpeed)]
260
261 class NormalAdaptationPredictionParameters(PredictionParameters):
262 def __init__(self, maxSpeed, nPredictedTrajectories, accelerationDistribution, steeringDistribution, useFeatures = False):
263 '''An example of acceleration and steering distributions is
264 lambda: random.triangular(-self.maxAcceleration, self.maxAcceleration, 0.)
265 '''
266 if useFeatures:
267 name = 'point set normal adaptation'
268 else:
269 name = 'normal adaptation'
270 PredictionParameters.__init__(self, name, maxSpeed)
271 self.nPredictedTrajectories = nPredictedTrajectories
272 self.useFeatures = useFeatures
273 self.accelerationDistribution = accelerationDistribution
274 self.steeringDistribution = steeringDistribution
275
276 def __str__(self):
277 return PredictionParameters.__str__(self)+' {0} {1} {2}'.format(self.nPredictedTrajectories,
278 self.maxAcceleration,
279 self.maxSteering)
280
281 def generatePredictedTrajectories(self, obj, instant):
282 predictedTrajectories = []
283 if self.useFeatures:
284 features = [f for f in obj.features if f.existsAtInstant(instant)]
285 positions = [f.getPositionAtInstant(instant) for f in features]
286 velocities = [f.getVelocityAtInstant(instant) for f in features]
287 else:
288 positions = [obj.getPositionAtInstant(instant)]
289 velocities = [obj.getVelocityAtInstant(instant)]
290 for i in xrange(self.nPredictedTrajectories):
291 for initialPosition,initialVelocity in zip(positions, velocities):
292 predictedTrajectories.append(PredictedTrajectoryRandomControl(initialPosition,
293 initialVelocity,
294 self.accelerationDistribution,
295 self.steeringDistribution,
296 maxSpeed = self.maxSpeed))
297 return predictedTrajectories
298
299 class PointSetPredictionParameters(PredictionParameters):
300 # todo generate several trajectories with normal adaptatoins from each position (feature)
301 def __init__(self, maxSpeed):
302 PredictionParameters.__init__(self, 'point set', maxSpeed)
303 #self.nPredictedTrajectories = nPredictedTrajectories
304
305 def generatePredictedTrajectories(self, obj, instant):
306 predictedTrajectories = []
307 features = [f for f in obj.features if f.existsAtInstant(instant)]
308 positions = [f.getPositionAtInstant(instant) for f in features]
309 velocities = [f.getVelocityAtInstant(instant) for f in features]
310 #for i in xrange(self.nPredictedTrajectories):
311 for initialPosition,initialVelocity in zip(positions, velocities):
312 predictedTrajectories.append(PredictedTrajectoryConstant(initialPosition, initialVelocity,
313 maxSpeed = self.maxSpeed))
314 return predictedTrajectories
315
316 class EvasiveActionPredictionParameters(PredictionParameters):
317 def __init__(self, maxSpeed, nPredictedTrajectories, accelerationDistribution, steeringDistribution, useFeatures = False):
318 '''Suggested acceleration distribution may not be symmetric, eg
319 lambda: random.triangular(self.minAcceleration, self.maxAcceleration, 0.)'''
320
321 if useFeatures:
322 name = 'point set evasive action'
323 else:
324 name = 'evasive action'
325 PredictionParameters.__init__(self, name, maxSpeed)
326 self.nPredictedTrajectories = nPredictedTrajectories
327 self.useFeatures = useFeatures
328 self.accelerationDistribution = accelerationDistribution
329 self.steeringDistribution = steeringDistribution
330
331 def __str__(self):
332 return PredictionParameters.__str__(self)+' {0} {1} {2} {3}'.format(self.nPredictedTrajectories, self.minAcceleration, self.maxAcceleration, self.maxSteering)
333
334 def generatePredictedTrajectories(self, obj, instant):
335 predictedTrajectories = []
336 if self.useFeatures:
337 features = [f for f in obj.features if f.existsAtInstant(instant)]
338 positions = [f.getPositionAtInstant(instant) for f in features]
339 velocities = [f.getVelocityAtInstant(instant) for f in features]
340 else:
341 positions = [obj.getPositionAtInstant(instant)]
342 velocities = [obj.getVelocityAtInstant(instant)]
343 for i in xrange(self.nPredictedTrajectories):
344 for initialPosition,initialVelocity in zip(positions, velocities):
345 predictedTrajectories.append(PredictedTrajectoryConstant(initialPosition,
346 initialVelocity,
347 moving.NormAngle(self.accelerationDistribution(),
348 self.steeringDistribution()),
349 maxSpeed = self.maxSpeed))
350 return predictedTrajectories
351
352
353 class CVDirectPredictionParameters(PredictionParameters):
354 '''Prediction parameters of prediction at constant velocity
355 using direct computation of the intersecting point'''
356
357 def __init__(self):
358 PredictionParameters.__init__(self, 'constant velocity (direct computation)', None)
359
360 def computeCrossingsCollisionsAtInstant(self, currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False):
361 collisionPoints = []
362 crossingZones = []
363
364 p1 = obj1.getPositionAtInstant(currentInstant)
365 p2 = obj2.getPositionAtInstant(currentInstant)
366 if (p1-p2).norm2() <= collisionDistanceThreshold:
367 collisionPoints = [SafetyPoint((p1+p1).multiply(0.5), 1., 0.)]
368 else:
369 v1 = obj1.getVelocityAtInstant(currentInstant)
370 v2 = obj2.getVelocityAtInstant(currentInstant)
371 intersection = moving.intersection(p1, p2, v1, v2)
372
373 if intersection != None:
374 dp1 = intersection-p1
375 dp2 = intersection-p2
376 if moving.Point.dot(dp1, v1) > 0 and moving.Point.dot(dp2, v2) > 0: # if the road users are moving towards the intersection
377 dist1 = dp1.norm2()
378 dist2 = dp2.norm2()
379 s1 = v1.norm2()
380 s2 = v2.norm2()
381 halfCollisionDistanceThreshold = collisionDistanceThreshold/2.
382 timeInterval1 = moving.TimeInterval(max(0,dist1-halfCollisionDistanceThreshold)/s1, (dist1+halfCollisionDistanceThreshold)/s1)
383 timeInterval2 = moving.TimeInterval(max(0,dist2-halfCollisionDistanceThreshold)/s2, (dist2+halfCollisionDistanceThreshold)/s2)
384 collisionTimeInterval = moving.TimeInterval.intersection(timeInterval1, timeInterval2)
385 if computeCZ and collisionTimeInterval.empty():
386 crossingZones = [SafetyPoint(intersection, 1., timeInterval1.distance(timeInterval2))]
387 else:
388 collisionPoints = [SafetyPoint(intersection, 1., collisionTimeInterval.center())]
389
390 if debug and intersection!= None:
391 from matplotlib.pyplot import plot, figure, axis, title
279 figure() 392 figure()
280 for et in predictedTrajectories1: 393 plot([p1.x, intersection.x], [p1.y, intersection.y], 'r')
281 et.predictPosition(timeHorizon) 394 plot([p2.x, intersection.x], [p2.y, intersection.y], 'b')
282 et.draw('rx') 395 intersection.plot()
283 396 obj1.plot('r')
284 for et in predictedTrajectories2: 397 obj2.plot('b')
285 et.predictPosition(timeHorizon) 398 title('instant {0}'.format(currentInstant))
286 et.draw('bx')
287 obj1.draw('r')
288 obj2.draw('b')
289 title('instant {0}'.format(i))
290 axis('equal') 399 axis('equal')
291 400
292 return collisionProbabilities 401 return collisionPoints, crossingZones
402
403 class CVExactPredictionParameters(PredictionParameters):
404 '''Prediction parameters of prediction at constant velocity
405 using direct computation of the intersecting point (solving for the equation'''
406
407 def __init__(self):
408 PredictionParameters.__init__(self, 'constant velocity (direct exact computation)', None)
409
410 def computeCrossingsCollisionsAtInstant(self, currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False):
411 'TODO add collision point coordinates, compute pPET'
412 #collisionPoints = []
413 #crossingZones = []
414
415 p1 = obj1.getPositionAtInstant(currentInstant)
416 p2 = obj2.getPositionAtInstant(currentInstant)
417 v1 = obj1.getVelocityAtInstant(currentInstant)
418 v2 = obj2.getVelocityAtInstant(currentInstant)
419 intersection = moving.intersection(p1, p2, v1, v2)
420
421 if intersection != None:
422 ttc = moving.Point.timeToCollision(p1, p2, v1, v2, collisionDistanceThreshold)
423 if ttc:
424 return [SafetyPoint(intersection, 1., ttc)], [] # (p1+v1.multiply(ttc)+p2+v2.multiply(ttc)).multiply(0.5)
425 else:
426 return [],[]
427
428 ####
429 # Other Methods
430 ####
431
432
433
293 434
294 435
295 if __name__ == "__main__": 436 if __name__ == "__main__":
296 import doctest 437 import doctest
297 import unittest 438 import unittest