Mercurial Hosting > traffic-intelligence
comparison trafficintelligence/prediction.py @ 1028:cc5cb04b04b0
major update using the trafficintelligence package name and install through pip
author | Nicolas Saunier <nicolas.saunier@polymtl.ca> |
---|---|
date | Fri, 15 Jun 2018 11:19:10 -0400 |
parents | python/prediction.py@933670761a57 |
children | c6cf75a2ed08 |
comparison
equal
deleted
inserted
replaced
1027:6129296848d3 | 1028:cc5cb04b04b0 |
---|---|
1 #! /usr/bin/env python | |
2 '''Library for motion prediction methods''' | |
3 | |
4 import moving | |
5 from utils import LCSS | |
6 | |
7 import math, random | |
8 from copy import copy | |
9 import numpy as np | |
10 #from multiprocessing import Pool | |
11 | |
12 | |
13 class PredictedTrajectory(object): | |
14 '''Class for predicted trajectories with lazy evaluation | |
15 if the predicted position has not been already computed, compute it | |
16 | |
17 it should also have a probability''' | |
18 | |
19 def __init__(self): | |
20 self.probability = 0. | |
21 self.predictedPositions = {} | |
22 self.predictedSpeedOrientations = {} | |
23 #self.collisionPoints = {} | |
24 #self.crossingZones = {} | |
25 | |
26 def predictPosition(self, nTimeSteps): | |
27 if nTimeSteps > 0 and not nTimeSteps in self.predictedPositions: | |
28 self.predictPosition(nTimeSteps-1) | |
29 self.predictedPositions[nTimeSteps], self.predictedSpeedOrientations[nTimeSteps] = moving.predictPosition(self.predictedPositions[nTimeSteps-1], self.predictedSpeedOrientations[nTimeSteps-1], self.getControl(), self.maxSpeed) | |
30 return self.predictedPositions[nTimeSteps] | |
31 | |
32 def getPredictedTrajectory(self): | |
33 return moving.Trajectory.fromPointList(list(self.predictedPositions.values())) | |
34 | |
35 def getPredictedSpeeds(self): | |
36 return [so.norm for so in self.predictedSpeedOrientations.values()] | |
37 | |
38 def plot(self, options = '', withOrigin = False, timeStep = 1, **kwargs): | |
39 self.getPredictedTrajectory().plot(options, withOrigin, timeStep, **kwargs) | |
40 | |
41 class PredictedTrajectoryConstant(PredictedTrajectory): | |
42 '''Predicted trajectory at constant speed or acceleration | |
43 TODO generalize by passing a series of velocities/accelerations''' | |
44 | |
45 def __init__(self, initialPosition, initialVelocity, control = moving.NormAngle(0,0), probability = 1., maxSpeed = None): | |
46 self.control = control | |
47 self.maxSpeed = maxSpeed | |
48 self.probability = probability | |
49 self.predictedPositions = {0: initialPosition} | |
50 self.predictedSpeedOrientations = {0: moving.NormAngle.fromPoint(initialVelocity)} | |
51 | |
52 def getControl(self): | |
53 return self.control | |
54 | |
55 class PredictedTrajectoryPrototype(PredictedTrajectory): | |
56 '''Predicted trajectory that follows a prototype trajectory | |
57 The prototype is in the format of a moving.Trajectory: it could be | |
58 1. an observed trajectory (extracted from video) | |
59 2. a generic polyline (eg the road centerline) that a vehicle is supposed to follow | |
60 | |
61 Prediction can be done | |
62 1. at constant speed (the instantaneous user speed) | |
63 2. following the trajectory path, at the speed of the user | |
64 (applying a constant ratio equal | |
65 to the ratio of the user instantaneous speed and the trajectory closest speed)''' | |
66 | |
67 def __init__(self, initialPosition, initialVelocity, prototype, constantSpeed = False, nFramesIgnore = 3, probability = 1.): | |
68 ''' prototype is a MovingObject | |
69 | |
70 Prediction at constant speed will not work for unrealistic trajectories | |
71 that do not follow a slowly changing velocity (eg moving object trajectories, | |
72 but is good for realistic motion (eg features)''' | |
73 self.prototype = prototype | |
74 self.constantSpeed = constantSpeed | |
75 self.nFramesIgnore = nFramesIgnore | |
76 self.probability = probability | |
77 self.predictedPositions = {0: initialPosition} | |
78 self.closestPointIdx = prototype.getPositions().getClosestPoint(initialPosition) | |
79 self.deltaPosition = initialPosition-prototype.getPositionAt(self.closestPointIdx) #should be computed in relative coordinates to position | |
80 self.theta = prototype.getVelocityAt(self.closestPointIdx).angle() | |
81 self.initialSpeed = initialVelocity.norm2() | |
82 if not constantSpeed: | |
83 self.ratio = self.initialSpeed/prototype.getVelocityAt(self.closestPointIdx).norm2() | |
84 | |
85 def predictPosition(self, nTimeSteps): | |
86 if nTimeSteps > 0 and not nTimeSteps in self.predictedPositions: | |
87 deltaPosition = copy(self.deltaPosition) | |
88 if self.constantSpeed: | |
89 traj = self.prototype.getPositions() | |
90 trajLength = traj.length() | |
91 traveledDistance = nTimeSteps*self.initialSpeed + traj.getCumulativeDistance(self.closestPointIdx) | |
92 i = self.closestPointIdx | |
93 while i < trajLength and traj.getCumulativeDistance(i) < traveledDistance: | |
94 i += 1 | |
95 if i == trajLength: | |
96 v = self.prototype.getVelocityAt(-1-self.nFramesIgnore) | |
97 self.predictedPositions[nTimeSteps] = deltaPosition.rotate(v.angle()-self.theta)+traj[i-1]+v*((traveledDistance-traj.getCumulativeDistance(i-1))/v.norm2()) | |
98 else: | |
99 v = self.prototype.getVelocityAt(min(i-1, int(self.prototype.length())-1-self.nFramesIgnore)) | |
100 self.predictedPositions[nTimeSteps] = deltaPosition.rotate(v.angle()-self.theta)+traj[i-1]+(traj[i]-traj[i-1])*((traveledDistance-traj.getCumulativeDistance(i-1))/traj.getDistance(i-1)) | |
101 else: | |
102 traj = self.prototype.getPositions() | |
103 trajLength = traj.length() | |
104 nSteps = self.ratio*nTimeSteps+self.closestPointIdx | |
105 i = int(np.floor(nSteps)) | |
106 if nSteps < trajLength-1: | |
107 v = self.prototype.getVelocityAt(min(i, int(self.prototype.length())-1-self.nFramesIgnore)) | |
108 self.predictedPositions[nTimeSteps] = deltaPosition.rotate(v.angle()-self.theta)+traj[i]+(traj[i+1]-traj[i])*(nSteps-i) | |
109 else: | |
110 v = self.prototype.getVelocityAt(-1-self.nFramesIgnore) | |
111 self.predictedPositions[nTimeSteps] = deltaPosition.rotate(v.angle()-self.theta)+traj[-1]+v*(nSteps-trajLength+1) | |
112 return self.predictedPositions[nTimeSteps] | |
113 | |
114 class PredictedTrajectoryRandomControl(PredictedTrajectory): | |
115 '''Random vehicle control: suitable for normal adaptation''' | |
116 def __init__(self, initialPosition, initialVelocity, accelerationDistribution, steeringDistribution, probability = 1., maxSpeed = None): | |
117 '''Constructor | |
118 accelerationDistribution and steeringDistribution are distributions | |
119 that return random numbers drawn from them''' | |
120 self.accelerationDistribution = accelerationDistribution | |
121 self.steeringDistribution = steeringDistribution | |
122 self.maxSpeed = maxSpeed | |
123 self.probability = probability | |
124 self.predictedPositions = {0: initialPosition} | |
125 self.predictedSpeedOrientations = {0: moving.NormAngle.fromPoint(initialVelocity)} | |
126 | |
127 def getControl(self): | |
128 return moving.NormAngle(self.accelerationDistribution(),self.steeringDistribution()) | |
129 | |
130 class SafetyPoint(moving.Point): | |
131 '''Can represent a collision point or crossing zone | |
132 with respective safety indicator, TTC or pPET''' | |
133 def __init__(self, p, probability = 1., indicator = -1): | |
134 self.x = p.x | |
135 self.y = p.y | |
136 self.probability = probability | |
137 self.indicator = indicator | |
138 | |
139 def __str__(self): | |
140 return '{0} {1} {2} {3}'.format(self.x, self.y, self.probability, self.indicator) | |
141 | |
142 @staticmethod | |
143 def save(out, points, predictionInstant, objNum1, objNum2): | |
144 for p in points: | |
145 out.write('{0} {1} {2} {3}\n'.format(objNum1, objNum2, predictionInstant, p)) | |
146 | |
147 @staticmethod | |
148 def computeExpectedIndicator(points): | |
149 return np.sum([p.indicator*p.probability for p in points])/sum([p.probability for p in points]) | |
150 | |
151 def computeCollisionTime(predictedTrajectory1, predictedTrajectory2, collisionDistanceThreshold, timeHorizon): | |
152 '''Computes the first instant | |
153 at which two predicted trajectories are within some distance threshold | |
154 Computes all the times including timeHorizon | |
155 | |
156 User has to check the first variable collision to know about a collision''' | |
157 t = 1 | |
158 p1 = predictedTrajectory1.predictPosition(t) | |
159 p2 = predictedTrajectory2.predictPosition(t) | |
160 collision = (p1-p2).norm2() <= collisionDistanceThreshold | |
161 while t < timeHorizon and not collision: | |
162 t += 1 | |
163 p1 = predictedTrajectory1.predictPosition(t) | |
164 p2 = predictedTrajectory2.predictPosition(t) | |
165 collision = (p1-p2).norm2() <= collisionDistanceThreshold | |
166 return collision, t, p1, p2 | |
167 | |
168 def savePredictedTrajectoriesFigure(currentInstant, obj1, obj2, predictedTrajectories1, predictedTrajectories2, timeHorizon, printFigure = True): | |
169 from matplotlib.pyplot import figure, axis, title, clf, savefig | |
170 if printFigure: | |
171 clf() | |
172 else: | |
173 figure() | |
174 for et in predictedTrajectories1: | |
175 for t in range(int(np.round(timeHorizon))): | |
176 et.predictPosition(t) | |
177 et.plot('rx') | |
178 for et in predictedTrajectories2: | |
179 for t in range(int(np.round(timeHorizon))): | |
180 et.predictPosition(t) | |
181 et.plot('bx') | |
182 obj1.plot('r', withOrigin = True) | |
183 obj2.plot('b', withOrigin = True) | |
184 title('instant {0}'.format(currentInstant)) | |
185 axis('equal') | |
186 if printFigure: | |
187 savefig('predicted-trajectories-t-{0}.png'.format(currentInstant)) | |
188 | |
189 def calculateProbability(nMatching,similarity,objects): | |
190 sumFrequencies=sum([nMatching[p] for p in similarity]) | |
191 prototypeProbability={} | |
192 for i in similarity: | |
193 prototypeProbability[i]= similarity[i] * float(nMatching[i])/sumFrequencies | |
194 sumProbabilities= sum([prototypeProbability[p] for p in prototypeProbability]) | |
195 probabilities={} | |
196 for i in prototypeProbability: | |
197 probabilities[objects[i]]= float(prototypeProbability[i])/sumProbabilities | |
198 return probabilities | |
199 | |
200 def findPrototypes(prototypes,nMatching,objects,route,partialObjPositions,noiseEntryNums,noiseExitNums,minSimilarity=0.1,mostMatched=None,spatialThreshold=1.0, delta=180): | |
201 ''' behaviour prediction first step''' | |
202 if route[0] not in noiseEntryNums: | |
203 prototypesRoutes= [ x for x in sorted(prototypes.keys()) if route[0]==x[0]] | |
204 elif route[1] not in noiseExitNums: | |
205 prototypesRoutes=[ x for x in sorted(prototypes.keys()) if route[1]==x[1]] | |
206 else: | |
207 prototypesRoutes=[x for x in sorted(prototypes.keys())] | |
208 lcss = LCSS(similarityFunc=lambda x,y: (distanceForLCSS(x,y) <= spatialThreshold),delta=delta) | |
209 similarity={} | |
210 for y in prototypesRoutes: | |
211 if y in prototypes: | |
212 prototypesIDs=prototypes[y] | |
213 for x in prototypesIDs: | |
214 s=lcss.computeNormalized(partialObjPositions, objects[x].positions) | |
215 if s >= minSimilarity: | |
216 similarity[x]=s | |
217 | |
218 if mostMatched==None: | |
219 probabilities= calculateProbability(nMatching,similarity,objects) | |
220 return probabilities | |
221 else: | |
222 mostMatchedValues=sorted(similarity.values(),reverse=True)[:mostMatched] | |
223 keys=[k for k in similarity if similarity[k] in mostMatchedValues] | |
224 newSimilarity={} | |
225 for i in keys: | |
226 newSimilarity[i]=similarity[i] | |
227 probabilities= calculateProbability(nMatching,newSimilarity,objects) | |
228 return probabilities | |
229 | |
230 def findPrototypesSpeed(prototypes,secondStepPrototypes,nMatching,objects,route,partialObjPositions,noiseEntryNums,noiseExitNums,minSimilarity=0.1,mostMatched=None,useDestination=True,spatialThreshold=1.0, delta=180): | |
231 if useDestination: | |
232 prototypesRoutes=[route] | |
233 else: | |
234 if route[0] not in noiseEntryNums: | |
235 prototypesRoutes= [ x for x in sorted(prototypes.keys()) if route[0]==x[0]] | |
236 elif route[1] not in noiseExitNums: | |
237 prototypesRoutes=[ x for x in sorted(prototypes.keys()) if route[1]==x[1]] | |
238 else: | |
239 prototypesRoutes=[x for x in sorted(prototypes.keys())] | |
240 lcss = LCSS(similarityFunc=lambda x,y: (distanceForLCSS(x,y) <= spatialThreshold),delta=delta) | |
241 similarity={} | |
242 for y in prototypesRoutes: | |
243 if y in prototypes: | |
244 prototypesIDs=prototypes[y] | |
245 for x in prototypesIDs: | |
246 s=lcss.computeNormalized(partialObjPositions, objects[x].positions) | |
247 if s >= minSimilarity: | |
248 similarity[x]=s | |
249 | |
250 newSimilarity={} | |
251 for i in similarity: | |
252 if i in secondStepPrototypes: | |
253 for j in secondStepPrototypes[i]: | |
254 newSimilarity[j]=similarity[i] | |
255 probabilities= calculateProbability(nMatching,newSimilarity,objects) | |
256 return probabilities | |
257 | |
258 def getPrototypeTrajectory(obj,route,currentInstant,prototypes,secondStepPrototypes,nMatching,objects,noiseEntryNums,noiseExitNums,minSimilarity=0.1,mostMatched=None,useDestination=True,useSpeedPrototype=True): | |
259 partialInterval=moving.Interval(obj.getFirstInstant(),currentInstant) | |
260 partialObjPositions= obj.getObjectInTimeInterval(partialInterval).positions | |
261 if useSpeedPrototype: | |
262 prototypeTrajectories=findPrototypesSpeed(prototypes,secondStepPrototypes,nMatching,objects,route,partialObjPositions,noiseEntryNums,noiseExitNums,minSimilarity,mostMatched,useDestination) | |
263 else: | |
264 prototypeTrajectories=findPrototypes(prototypes,nMatching,objects,route,partialObjPositions,noiseEntryNums,noiseExitNums,minSimilarity,mostMatched) | |
265 return prototypeTrajectories | |
266 | |
267 | |
268 class PredictionParameters(object): | |
269 def __init__(self, name, maxSpeed): | |
270 self.name = name | |
271 self.maxSpeed = maxSpeed | |
272 | |
273 def __str__(self): | |
274 return '{0} {1}'.format(self.name, self.maxSpeed) | |
275 | |
276 def generatePredictedTrajectories(self, obj, instant): | |
277 return None | |
278 | |
279 def computeCrossingsCollisionsAtInstant(self, currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False): | |
280 '''returns the lists of collision points and crossing zones''' | |
281 predictedTrajectories1 = self.generatePredictedTrajectories(obj1, currentInstant) | |
282 predictedTrajectories2 = self.generatePredictedTrajectories(obj2, currentInstant) | |
283 | |
284 collisionPoints = [] | |
285 if computeCZ: | |
286 crossingZones = [] | |
287 else: | |
288 crossingZones = None | |
289 for et1 in predictedTrajectories1: | |
290 for et2 in predictedTrajectories2: | |
291 collision, t, p1, p2 = computeCollisionTime(et1, et2, collisionDistanceThreshold, timeHorizon) | |
292 if collision: | |
293 collisionPoints.append(SafetyPoint((p1+p2)*0.5, et1.probability*et2.probability, t)) | |
294 elif computeCZ: # check if there is a crossing zone | |
295 # TODO same computation as PET with metric + concatenate past trajectory with future trajectory | |
296 cz = None | |
297 t1 = 0 | |
298 while not cz and t1 < timeHorizon: # t1 <= timeHorizon-1 | |
299 t2 = 0 | |
300 while not cz and t2 < timeHorizon: | |
301 cz = moving.segmentIntersection(et1.predictPosition(t1), et1.predictPosition(t1+1), et2.predictPosition(t2), et2.predictPosition(t2+1)) | |
302 if cz is not None: | |
303 deltaV= (et1.predictPosition(t1)- et1.predictPosition(t1+1) - et2.predictPosition(t2)+ et2.predictPosition(t2+1)).norm2() | |
304 crossingZones.append(SafetyPoint(cz, et1.probability*et2.probability, abs(t1-t2)-(float(collisionDistanceThreshold)/deltaV))) | |
305 t2 += 1 | |
306 t1 += 1 | |
307 | |
308 if debug: | |
309 savePredictedTrajectoriesFigure(currentInstant, obj1, obj2, predictedTrajectories1, predictedTrajectories2, timeHorizon) | |
310 | |
311 return collisionPoints, crossingZones | |
312 | |
313 def computeCrossingsCollisions(self, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None):#, nProcesses = 1): | |
314 '''Computes all crossing and collision points at each common instant for two road users. ''' | |
315 collisionPoints = {} | |
316 if computeCZ: | |
317 crossingZones = {} | |
318 else: | |
319 crossingZones = None | |
320 if timeInterval is not None: | |
321 commonTimeInterval = timeInterval | |
322 else: | |
323 commonTimeInterval = obj1.commonTimeInterval(obj2) | |
324 #if nProcesses == 1: | |
325 for i in list(commonTimeInterval)[:-1]: # do not look at the 1 last position/velocities, often with errors | |
326 cp, cz = self.computeCrossingsCollisionsAtInstant(i, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ, debug) | |
327 if len(cp) != 0: | |
328 collisionPoints[i] = cp | |
329 if computeCZ and len(cz) != 0: | |
330 crossingZones[i] = cz | |
331 return collisionPoints, crossingZones | |
332 | |
333 def computeCollisionProbability(self, obj1, obj2, collisionDistanceThreshold, timeHorizon, debug = False, timeInterval = None): | |
334 '''Computes only collision probabilities | |
335 Returns for each instant the collision probability and number of samples drawn''' | |
336 collisionProbabilities = {} | |
337 if timeInterval is not None: | |
338 commonTimeInterval = timeInterval | |
339 else: | |
340 commonTimeInterval = obj1.commonTimeInterval(obj2) | |
341 for i in list(commonTimeInterval)[:-1]: | |
342 nCollisions = 0 | |
343 predictedTrajectories1 = self.generatePredictedTrajectories(obj1, i) | |
344 predictedTrajectories2 = self.generatePredictedTrajectories(obj2, i) | |
345 for et1 in predictedTrajectories1: | |
346 for et2 in predictedTrajectories2: | |
347 collision, t, p1, p2 = computeCollisionTime(et1, et2, collisionDistanceThreshold, timeHorizon) | |
348 if collision: | |
349 nCollisions += 1 | |
350 # take into account probabilities ?? | |
351 nSamples = float(len(predictedTrajectories1)*len(predictedTrajectories2)) | |
352 collisionProbabilities[i] = [nSamples, float(nCollisions)/nSamples] | |
353 | |
354 if debug: | |
355 savePredictedTrajectoriesFigure(i, obj1, obj2, predictedTrajectories1, predictedTrajectories2, timeHorizon) | |
356 | |
357 return collisionProbabilities | |
358 | |
359 class ConstantPredictionParameters(PredictionParameters): | |
360 def __init__(self, maxSpeed): | |
361 PredictionParameters.__init__(self, 'constant velocity', maxSpeed) | |
362 | |
363 def generatePredictedTrajectories(self, obj, instant): | |
364 return [PredictedTrajectoryConstant(obj.getPositionAtInstant(instant), obj.getVelocityAtInstant(instant), maxSpeed = self.maxSpeed)] | |
365 | |
366 class NormalAdaptationPredictionParameters(PredictionParameters): | |
367 def __init__(self, maxSpeed, nPredictedTrajectories, accelerationDistribution, steeringDistribution, useFeatures = False): | |
368 '''An example of acceleration and steering distributions is | |
369 lambda: random.triangular(-self.maxAcceleration, self.maxAcceleration, 0.) | |
370 ''' | |
371 if useFeatures: | |
372 name = 'point set normal adaptation' | |
373 else: | |
374 name = 'normal adaptation' | |
375 PredictionParameters.__init__(self, name, maxSpeed) | |
376 self.nPredictedTrajectories = nPredictedTrajectories | |
377 self.useFeatures = useFeatures | |
378 self.accelerationDistribution = accelerationDistribution | |
379 self.steeringDistribution = steeringDistribution | |
380 | |
381 def __str__(self): | |
382 return PredictionParameters.__str__(self)+' {0} {1} {2}'.format(self.nPredictedTrajectories, | |
383 self.maxAcceleration, | |
384 self.maxSteering) | |
385 | |
386 def generatePredictedTrajectories(self, obj, instant): | |
387 predictedTrajectories = [] | |
388 if self.useFeatures and obj.hasFeatures(): | |
389 features = [f for f in obj.getFeatures() if f.existsAtInstant(instant)] | |
390 positions = [f.getPositionAtInstant(instant) for f in features] | |
391 velocities = [f.getVelocityAtInstant(instant) for f in features] | |
392 else: | |
393 positions = [obj.getPositionAtInstant(instant)] | |
394 velocities = [obj.getVelocityAtInstant(instant)] | |
395 probability = 1./float(len(positions)*self.nPredictedTrajectories) | |
396 for i in range(self.nPredictedTrajectories): | |
397 for initialPosition,initialVelocity in zip(positions, velocities): | |
398 predictedTrajectories.append(PredictedTrajectoryRandomControl(initialPosition, | |
399 initialVelocity, | |
400 self.accelerationDistribution, | |
401 self.steeringDistribution, | |
402 probability, | |
403 maxSpeed = self.maxSpeed)) | |
404 return predictedTrajectories | |
405 | |
406 class PointSetPredictionParameters(PredictionParameters): | |
407 def __init__(self, maxSpeed): | |
408 PredictionParameters.__init__(self, 'point set', maxSpeed) | |
409 | |
410 def generatePredictedTrajectories(self, obj, instant): | |
411 predictedTrajectories = [] | |
412 if obj.hasFeatures(): | |
413 features = [f for f in obj.getFeatures() if f.existsAtInstant(instant)] | |
414 positions = [f.getPositionAtInstant(instant) for f in features] | |
415 velocities = [f.getVelocityAtInstant(instant) for f in features] | |
416 probability = 1./float(len(positions)) | |
417 for initialPosition,initialVelocity in zip(positions, velocities): | |
418 predictedTrajectories.append(PredictedTrajectoryConstant(initialPosition, initialVelocity, probability = probability, maxSpeed = self.maxSpeed)) | |
419 return predictedTrajectories | |
420 else: | |
421 print('Object {} has no features'.format(obj.getNum())) | |
422 return None | |
423 | |
424 | |
425 class EvasiveActionPredictionParameters(PredictionParameters): | |
426 def __init__(self, maxSpeed, nPredictedTrajectories, accelerationDistribution, steeringDistribution, useFeatures = False): | |
427 '''Suggested acceleration distribution may not be symmetric, eg | |
428 lambda: random.triangular(self.minAcceleration, self.maxAcceleration, 0.)''' | |
429 | |
430 if useFeatures: | |
431 name = 'point set evasive action' | |
432 else: | |
433 name = 'evasive action' | |
434 PredictionParameters.__init__(self, name, maxSpeed) | |
435 self.nPredictedTrajectories = nPredictedTrajectories | |
436 self.useFeatures = useFeatures | |
437 self.accelerationDistribution = accelerationDistribution | |
438 self.steeringDistribution = steeringDistribution | |
439 | |
440 def __str__(self): | |
441 return PredictionParameters.__str__(self)+' {0} {1} {2} {3}'.format(self.nPredictedTrajectories, self.minAcceleration, self.maxAcceleration, self.maxSteering) | |
442 | |
443 def generatePredictedTrajectories(self, obj, instant): | |
444 predictedTrajectories = [] | |
445 if self.useFeatures and obj.hasFeatures(): | |
446 features = [f for f in obj.getFeatures() if f.existsAtInstant(instant)] | |
447 positions = [f.getPositionAtInstant(instant) for f in features] | |
448 velocities = [f.getVelocityAtInstant(instant) for f in features] | |
449 else: | |
450 positions = [obj.getPositionAtInstant(instant)] | |
451 velocities = [obj.getVelocityAtInstant(instant)] | |
452 probability = 1./float(self.nPredictedTrajectories) | |
453 for i in range(self.nPredictedTrajectories): | |
454 for initialPosition,initialVelocity in zip(positions, velocities): | |
455 predictedTrajectories.append(PredictedTrajectoryConstant(initialPosition, | |
456 initialVelocity, | |
457 moving.NormAngle(self.accelerationDistribution(), | |
458 self.steeringDistribution()), | |
459 probability, | |
460 self.maxSpeed)) | |
461 return predictedTrajectories | |
462 | |
463 | |
464 class CVDirectPredictionParameters(PredictionParameters): | |
465 '''Prediction parameters of prediction at constant velocity | |
466 using direct computation of the intersecting point | |
467 Warning: the computed time to collision may be higher than timeHorizon (not used)''' | |
468 | |
469 def __init__(self): | |
470 PredictionParameters.__init__(self, 'constant velocity (direct computation)', None) | |
471 | |
472 def computeCrossingsCollisionsAtInstant(self, currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, *kwargs): | |
473 collisionPoints = [] | |
474 if computeCZ: | |
475 crossingZones = [] | |
476 else: | |
477 crossingZones = None | |
478 | |
479 p1 = obj1.getPositionAtInstant(currentInstant) | |
480 p2 = obj2.getPositionAtInstant(currentInstant) | |
481 if (p1-p2).norm2() <= collisionDistanceThreshold: | |
482 collisionPoints = [SafetyPoint((p1+p2)*0.5, 1., 0.)] | |
483 else: | |
484 v1 = obj1.getVelocityAtInstant(currentInstant) | |
485 v2 = obj2.getVelocityAtInstant(currentInstant) | |
486 intersection = moving.intersection(p1, p1+v1, p2, p2+v2) | |
487 | |
488 if intersection is not None: | |
489 dp1 = intersection-p1 | |
490 dp2 = intersection-p2 | |
491 dot1 = moving.Point.dot(dp1, v1) | |
492 dot2 = moving.Point.dot(dp2, v2) | |
493 if (computeCZ and (dot1 > 0 or dot2 > 0)) or (dot1 > 0 and dot2 > 0): # if the road users are moving towards the intersection or if computing pPET | |
494 dist1 = dp1.norm2() | |
495 dist2 = dp2.norm2() | |
496 s1 = math.copysign(v1.norm2(), dot1) | |
497 s2 = math.copysign(v2.norm2(), dot2) | |
498 halfCollisionDistanceThreshold = collisionDistanceThreshold/2. | |
499 timeInterval1 = moving.TimeInterval(max(0,dist1-halfCollisionDistanceThreshold)/s1, (dist1+halfCollisionDistanceThreshold)/s1) | |
500 timeInterval2 = moving.TimeInterval(max(0,dist2-halfCollisionDistanceThreshold)/s2, (dist2+halfCollisionDistanceThreshold)/s2) | |
501 collisionTimeInterval = moving.TimeInterval.intersection(timeInterval1, timeInterval2) | |
502 | |
503 if collisionTimeInterval.empty(): | |
504 if computeCZ: | |
505 crossingZones = [SafetyPoint(intersection, 1., timeInterval1.distance(timeInterval2))] | |
506 else: | |
507 collisionPoints = [SafetyPoint(intersection, 1., collisionTimeInterval.center())] | |
508 | |
509 if debug and intersection is not None: | |
510 from matplotlib.pyplot import plot, figure, axis, title | |
511 figure() | |
512 plot([p1.x, intersection.x], [p1.y, intersection.y], 'r') | |
513 plot([p2.x, intersection.x], [p2.y, intersection.y], 'b') | |
514 intersection.plot() | |
515 obj1.plot('r') | |
516 obj2.plot('b') | |
517 title('instant {0}'.format(currentInstant)) | |
518 axis('equal') | |
519 | |
520 return collisionPoints, crossingZones | |
521 | |
522 class CVExactPredictionParameters(PredictionParameters): | |
523 '''Prediction parameters of prediction at constant velocity | |
524 using direct computation of the intersecting point (solving the equation) | |
525 Warning: the computed time to collision may be higher than timeHorizon (not used)''' | |
526 | |
527 def __init__(self): | |
528 PredictionParameters.__init__(self, 'constant velocity (direct exact computation)', None) | |
529 | |
530 def computeCrossingsCollisionsAtInstant(self, currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, *kwargs): | |
531 'TODO compute pPET' | |
532 collisionPoints = [] | |
533 crossingZones = [] | |
534 | |
535 p1 = obj1.getPositionAtInstant(currentInstant) | |
536 p2 = obj2.getPositionAtInstant(currentInstant) | |
537 v1 = obj1.getVelocityAtInstant(currentInstant) | |
538 v2 = obj2.getVelocityAtInstant(currentInstant) | |
539 #intersection = moving.intersection(p1, p1+v1, p2, p2+v2) | |
540 | |
541 if not moving.Point.parallel(v1, v2): | |
542 ttc = moving.Point.timeToCollision(p1, p2, v1, v2, collisionDistanceThreshold) | |
543 if ttc is not None: | |
544 collisionPoints = [SafetyPoint((p1+(v1*ttc)+p2+(v2*ttc))*0.5, 1., ttc)] | |
545 else: | |
546 pass # compute pPET | |
547 | |
548 return collisionPoints, crossingZones | |
549 | |
550 class PrototypePredictionParameters(PredictionParameters): | |
551 def __init__(self, prototypes, nPredictedTrajectories, pointSimilarityDistance, minSimilarity, lcssMetric = 'cityblock', minFeatureTime = 10, constantSpeed = False, useFeatures = True): | |
552 PredictionParameters.__init__(self, 'prototypes', None) | |
553 self.prototypes = prototypes | |
554 self.nPredictedTrajectories = nPredictedTrajectories | |
555 self.lcss = LCSS(metric = lcssMetric, epsilon = pointSimilarityDistance) | |
556 self.minSimilarity = minSimilarity | |
557 self.minFeatureTime = minFeatureTime | |
558 self.constantSpeed = constantSpeed | |
559 self.useFeatures = useFeatures | |
560 | |
561 def getLcss(self): | |
562 return self.lcss | |
563 | |
564 def addPredictedTrajectories(self, predictedTrajectories, obj, instant): | |
565 obj.computeTrajectorySimilarities(self.prototypes, self.lcss) | |
566 for proto, similarities in zip(self.prototypes, obj.prototypeSimilarities): | |
567 if similarities[instant-obj.getFirstInstant()] >= self.minSimilarity: | |
568 initialPosition = obj.getPositionAtInstant(instant) | |
569 initialVelocity = obj.getVelocityAtInstant(instant) | |
570 predictedTrajectories.append(PredictedTrajectoryPrototype(initialPosition, initialVelocity, proto.getMovingObject(), constantSpeed = self.constantSpeed, probability = proto.getNMatchings())) | |
571 | |
572 def generatePredictedTrajectories(self, obj, instant): | |
573 predictedTrajectories = [] | |
574 if instant-obj.getFirstInstant()+1 >= self.minFeatureTime: | |
575 if self.useFeatures and obj.hasFeatures(): | |
576 if not hasattr(obj, 'currentPredictionFeatures'): | |
577 obj.currentPredictionFeatures = [] | |
578 else: | |
579 obj.currentPredictionFeatures[:] = [f for f in obj.currentPredictionFeatures if f.existsAtInstant(instant)] | |
580 firstInstants = [(f,f.getFirstInstant()) for f in obj.getFeatures() if f.existsAtInstant(instant) and f not in obj.currentPredictionFeatures] | |
581 firstInstants.sort(key = lambda t: t[1]) | |
582 for f,t1 in firstInstants[:min(self.nPredictedTrajectories, len(firstInstants), self.nPredictedTrajectories-len(obj.currentPredictionFeatures))]: | |
583 obj.currentPredictionFeatures.append(f) | |
584 for f in obj.currentPredictionFeatures: | |
585 self.addPredictedTrajectories(predictedTrajectories, f, instant) | |
586 else: | |
587 self.addPredictedTrajectories(predictedTrajectories, obj, instant) | |
588 return predictedTrajectories | |
589 | |
590 if __name__ == "__main__": | |
591 import doctest | |
592 import unittest | |
593 suite = doctest.DocFileSuite('tests/prediction.txt') | |
594 #suite = doctest.DocTestSuite() | |
595 unittest.TextTestRunner().run(suite) | |
596 #doctest.testmod() | |
597 #doctest.testfile("example.txt") | |
598 |