comparison trafficintelligence/prediction.py @ 1028:cc5cb04b04b0

major update using the trafficintelligence package name and install through pip
author Nicolas Saunier <nicolas.saunier@polymtl.ca>
date Fri, 15 Jun 2018 11:19:10 -0400
parents python/prediction.py@933670761a57
children c6cf75a2ed08
comparison
equal deleted inserted replaced
1027:6129296848d3 1028:cc5cb04b04b0
1 #! /usr/bin/env python
2 '''Library for motion prediction methods'''
3
4 import moving
5 from utils import LCSS
6
7 import math, random
8 from copy import copy
9 import numpy as np
10 #from multiprocessing import Pool
11
12
13 class PredictedTrajectory(object):
14 '''Class for predicted trajectories with lazy evaluation
15 if the predicted position has not been already computed, compute it
16
17 it should also have a probability'''
18
19 def __init__(self):
20 self.probability = 0.
21 self.predictedPositions = {}
22 self.predictedSpeedOrientations = {}
23 #self.collisionPoints = {}
24 #self.crossingZones = {}
25
26 def predictPosition(self, nTimeSteps):
27 if nTimeSteps > 0 and not nTimeSteps in self.predictedPositions:
28 self.predictPosition(nTimeSteps-1)
29 self.predictedPositions[nTimeSteps], self.predictedSpeedOrientations[nTimeSteps] = moving.predictPosition(self.predictedPositions[nTimeSteps-1], self.predictedSpeedOrientations[nTimeSteps-1], self.getControl(), self.maxSpeed)
30 return self.predictedPositions[nTimeSteps]
31
32 def getPredictedTrajectory(self):
33 return moving.Trajectory.fromPointList(list(self.predictedPositions.values()))
34
35 def getPredictedSpeeds(self):
36 return [so.norm for so in self.predictedSpeedOrientations.values()]
37
38 def plot(self, options = '', withOrigin = False, timeStep = 1, **kwargs):
39 self.getPredictedTrajectory().plot(options, withOrigin, timeStep, **kwargs)
40
41 class PredictedTrajectoryConstant(PredictedTrajectory):
42 '''Predicted trajectory at constant speed or acceleration
43 TODO generalize by passing a series of velocities/accelerations'''
44
45 def __init__(self, initialPosition, initialVelocity, control = moving.NormAngle(0,0), probability = 1., maxSpeed = None):
46 self.control = control
47 self.maxSpeed = maxSpeed
48 self.probability = probability
49 self.predictedPositions = {0: initialPosition}
50 self.predictedSpeedOrientations = {0: moving.NormAngle.fromPoint(initialVelocity)}
51
52 def getControl(self):
53 return self.control
54
55 class PredictedTrajectoryPrototype(PredictedTrajectory):
56 '''Predicted trajectory that follows a prototype trajectory
57 The prototype is in the format of a moving.Trajectory: it could be
58 1. an observed trajectory (extracted from video)
59 2. a generic polyline (eg the road centerline) that a vehicle is supposed to follow
60
61 Prediction can be done
62 1. at constant speed (the instantaneous user speed)
63 2. following the trajectory path, at the speed of the user
64 (applying a constant ratio equal
65 to the ratio of the user instantaneous speed and the trajectory closest speed)'''
66
67 def __init__(self, initialPosition, initialVelocity, prototype, constantSpeed = False, nFramesIgnore = 3, probability = 1.):
68 ''' prototype is a MovingObject
69
70 Prediction at constant speed will not work for unrealistic trajectories
71 that do not follow a slowly changing velocity (eg moving object trajectories,
72 but is good for realistic motion (eg features)'''
73 self.prototype = prototype
74 self.constantSpeed = constantSpeed
75 self.nFramesIgnore = nFramesIgnore
76 self.probability = probability
77 self.predictedPositions = {0: initialPosition}
78 self.closestPointIdx = prototype.getPositions().getClosestPoint(initialPosition)
79 self.deltaPosition = initialPosition-prototype.getPositionAt(self.closestPointIdx) #should be computed in relative coordinates to position
80 self.theta = prototype.getVelocityAt(self.closestPointIdx).angle()
81 self.initialSpeed = initialVelocity.norm2()
82 if not constantSpeed:
83 self.ratio = self.initialSpeed/prototype.getVelocityAt(self.closestPointIdx).norm2()
84
85 def predictPosition(self, nTimeSteps):
86 if nTimeSteps > 0 and not nTimeSteps in self.predictedPositions:
87 deltaPosition = copy(self.deltaPosition)
88 if self.constantSpeed:
89 traj = self.prototype.getPositions()
90 trajLength = traj.length()
91 traveledDistance = nTimeSteps*self.initialSpeed + traj.getCumulativeDistance(self.closestPointIdx)
92 i = self.closestPointIdx
93 while i < trajLength and traj.getCumulativeDistance(i) < traveledDistance:
94 i += 1
95 if i == trajLength:
96 v = self.prototype.getVelocityAt(-1-self.nFramesIgnore)
97 self.predictedPositions[nTimeSteps] = deltaPosition.rotate(v.angle()-self.theta)+traj[i-1]+v*((traveledDistance-traj.getCumulativeDistance(i-1))/v.norm2())
98 else:
99 v = self.prototype.getVelocityAt(min(i-1, int(self.prototype.length())-1-self.nFramesIgnore))
100 self.predictedPositions[nTimeSteps] = deltaPosition.rotate(v.angle()-self.theta)+traj[i-1]+(traj[i]-traj[i-1])*((traveledDistance-traj.getCumulativeDistance(i-1))/traj.getDistance(i-1))
101 else:
102 traj = self.prototype.getPositions()
103 trajLength = traj.length()
104 nSteps = self.ratio*nTimeSteps+self.closestPointIdx
105 i = int(np.floor(nSteps))
106 if nSteps < trajLength-1:
107 v = self.prototype.getVelocityAt(min(i, int(self.prototype.length())-1-self.nFramesIgnore))
108 self.predictedPositions[nTimeSteps] = deltaPosition.rotate(v.angle()-self.theta)+traj[i]+(traj[i+1]-traj[i])*(nSteps-i)
109 else:
110 v = self.prototype.getVelocityAt(-1-self.nFramesIgnore)
111 self.predictedPositions[nTimeSteps] = deltaPosition.rotate(v.angle()-self.theta)+traj[-1]+v*(nSteps-trajLength+1)
112 return self.predictedPositions[nTimeSteps]
113
114 class PredictedTrajectoryRandomControl(PredictedTrajectory):
115 '''Random vehicle control: suitable for normal adaptation'''
116 def __init__(self, initialPosition, initialVelocity, accelerationDistribution, steeringDistribution, probability = 1., maxSpeed = None):
117 '''Constructor
118 accelerationDistribution and steeringDistribution are distributions
119 that return random numbers drawn from them'''
120 self.accelerationDistribution = accelerationDistribution
121 self.steeringDistribution = steeringDistribution
122 self.maxSpeed = maxSpeed
123 self.probability = probability
124 self.predictedPositions = {0: initialPosition}
125 self.predictedSpeedOrientations = {0: moving.NormAngle.fromPoint(initialVelocity)}
126
127 def getControl(self):
128 return moving.NormAngle(self.accelerationDistribution(),self.steeringDistribution())
129
130 class SafetyPoint(moving.Point):
131 '''Can represent a collision point or crossing zone
132 with respective safety indicator, TTC or pPET'''
133 def __init__(self, p, probability = 1., indicator = -1):
134 self.x = p.x
135 self.y = p.y
136 self.probability = probability
137 self.indicator = indicator
138
139 def __str__(self):
140 return '{0} {1} {2} {3}'.format(self.x, self.y, self.probability, self.indicator)
141
142 @staticmethod
143 def save(out, points, predictionInstant, objNum1, objNum2):
144 for p in points:
145 out.write('{0} {1} {2} {3}\n'.format(objNum1, objNum2, predictionInstant, p))
146
147 @staticmethod
148 def computeExpectedIndicator(points):
149 return np.sum([p.indicator*p.probability for p in points])/sum([p.probability for p in points])
150
151 def computeCollisionTime(predictedTrajectory1, predictedTrajectory2, collisionDistanceThreshold, timeHorizon):
152 '''Computes the first instant
153 at which two predicted trajectories are within some distance threshold
154 Computes all the times including timeHorizon
155
156 User has to check the first variable collision to know about a collision'''
157 t = 1
158 p1 = predictedTrajectory1.predictPosition(t)
159 p2 = predictedTrajectory2.predictPosition(t)
160 collision = (p1-p2).norm2() <= collisionDistanceThreshold
161 while t < timeHorizon and not collision:
162 t += 1
163 p1 = predictedTrajectory1.predictPosition(t)
164 p2 = predictedTrajectory2.predictPosition(t)
165 collision = (p1-p2).norm2() <= collisionDistanceThreshold
166 return collision, t, p1, p2
167
168 def savePredictedTrajectoriesFigure(currentInstant, obj1, obj2, predictedTrajectories1, predictedTrajectories2, timeHorizon, printFigure = True):
169 from matplotlib.pyplot import figure, axis, title, clf, savefig
170 if printFigure:
171 clf()
172 else:
173 figure()
174 for et in predictedTrajectories1:
175 for t in range(int(np.round(timeHorizon))):
176 et.predictPosition(t)
177 et.plot('rx')
178 for et in predictedTrajectories2:
179 for t in range(int(np.round(timeHorizon))):
180 et.predictPosition(t)
181 et.plot('bx')
182 obj1.plot('r', withOrigin = True)
183 obj2.plot('b', withOrigin = True)
184 title('instant {0}'.format(currentInstant))
185 axis('equal')
186 if printFigure:
187 savefig('predicted-trajectories-t-{0}.png'.format(currentInstant))
188
189 def calculateProbability(nMatching,similarity,objects):
190 sumFrequencies=sum([nMatching[p] for p in similarity])
191 prototypeProbability={}
192 for i in similarity:
193 prototypeProbability[i]= similarity[i] * float(nMatching[i])/sumFrequencies
194 sumProbabilities= sum([prototypeProbability[p] for p in prototypeProbability])
195 probabilities={}
196 for i in prototypeProbability:
197 probabilities[objects[i]]= float(prototypeProbability[i])/sumProbabilities
198 return probabilities
199
200 def findPrototypes(prototypes,nMatching,objects,route,partialObjPositions,noiseEntryNums,noiseExitNums,minSimilarity=0.1,mostMatched=None,spatialThreshold=1.0, delta=180):
201 ''' behaviour prediction first step'''
202 if route[0] not in noiseEntryNums:
203 prototypesRoutes= [ x for x in sorted(prototypes.keys()) if route[0]==x[0]]
204 elif route[1] not in noiseExitNums:
205 prototypesRoutes=[ x for x in sorted(prototypes.keys()) if route[1]==x[1]]
206 else:
207 prototypesRoutes=[x for x in sorted(prototypes.keys())]
208 lcss = LCSS(similarityFunc=lambda x,y: (distanceForLCSS(x,y) <= spatialThreshold),delta=delta)
209 similarity={}
210 for y in prototypesRoutes:
211 if y in prototypes:
212 prototypesIDs=prototypes[y]
213 for x in prototypesIDs:
214 s=lcss.computeNormalized(partialObjPositions, objects[x].positions)
215 if s >= minSimilarity:
216 similarity[x]=s
217
218 if mostMatched==None:
219 probabilities= calculateProbability(nMatching,similarity,objects)
220 return probabilities
221 else:
222 mostMatchedValues=sorted(similarity.values(),reverse=True)[:mostMatched]
223 keys=[k for k in similarity if similarity[k] in mostMatchedValues]
224 newSimilarity={}
225 for i in keys:
226 newSimilarity[i]=similarity[i]
227 probabilities= calculateProbability(nMatching,newSimilarity,objects)
228 return probabilities
229
230 def findPrototypesSpeed(prototypes,secondStepPrototypes,nMatching,objects,route,partialObjPositions,noiseEntryNums,noiseExitNums,minSimilarity=0.1,mostMatched=None,useDestination=True,spatialThreshold=1.0, delta=180):
231 if useDestination:
232 prototypesRoutes=[route]
233 else:
234 if route[0] not in noiseEntryNums:
235 prototypesRoutes= [ x for x in sorted(prototypes.keys()) if route[0]==x[0]]
236 elif route[1] not in noiseExitNums:
237 prototypesRoutes=[ x for x in sorted(prototypes.keys()) if route[1]==x[1]]
238 else:
239 prototypesRoutes=[x for x in sorted(prototypes.keys())]
240 lcss = LCSS(similarityFunc=lambda x,y: (distanceForLCSS(x,y) <= spatialThreshold),delta=delta)
241 similarity={}
242 for y in prototypesRoutes:
243 if y in prototypes:
244 prototypesIDs=prototypes[y]
245 for x in prototypesIDs:
246 s=lcss.computeNormalized(partialObjPositions, objects[x].positions)
247 if s >= minSimilarity:
248 similarity[x]=s
249
250 newSimilarity={}
251 for i in similarity:
252 if i in secondStepPrototypes:
253 for j in secondStepPrototypes[i]:
254 newSimilarity[j]=similarity[i]
255 probabilities= calculateProbability(nMatching,newSimilarity,objects)
256 return probabilities
257
258 def getPrototypeTrajectory(obj,route,currentInstant,prototypes,secondStepPrototypes,nMatching,objects,noiseEntryNums,noiseExitNums,minSimilarity=0.1,mostMatched=None,useDestination=True,useSpeedPrototype=True):
259 partialInterval=moving.Interval(obj.getFirstInstant(),currentInstant)
260 partialObjPositions= obj.getObjectInTimeInterval(partialInterval).positions
261 if useSpeedPrototype:
262 prototypeTrajectories=findPrototypesSpeed(prototypes,secondStepPrototypes,nMatching,objects,route,partialObjPositions,noiseEntryNums,noiseExitNums,minSimilarity,mostMatched,useDestination)
263 else:
264 prototypeTrajectories=findPrototypes(prototypes,nMatching,objects,route,partialObjPositions,noiseEntryNums,noiseExitNums,minSimilarity,mostMatched)
265 return prototypeTrajectories
266
267
268 class PredictionParameters(object):
269 def __init__(self, name, maxSpeed):
270 self.name = name
271 self.maxSpeed = maxSpeed
272
273 def __str__(self):
274 return '{0} {1}'.format(self.name, self.maxSpeed)
275
276 def generatePredictedTrajectories(self, obj, instant):
277 return None
278
279 def computeCrossingsCollisionsAtInstant(self, currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False):
280 '''returns the lists of collision points and crossing zones'''
281 predictedTrajectories1 = self.generatePredictedTrajectories(obj1, currentInstant)
282 predictedTrajectories2 = self.generatePredictedTrajectories(obj2, currentInstant)
283
284 collisionPoints = []
285 if computeCZ:
286 crossingZones = []
287 else:
288 crossingZones = None
289 for et1 in predictedTrajectories1:
290 for et2 in predictedTrajectories2:
291 collision, t, p1, p2 = computeCollisionTime(et1, et2, collisionDistanceThreshold, timeHorizon)
292 if collision:
293 collisionPoints.append(SafetyPoint((p1+p2)*0.5, et1.probability*et2.probability, t))
294 elif computeCZ: # check if there is a crossing zone
295 # TODO same computation as PET with metric + concatenate past trajectory with future trajectory
296 cz = None
297 t1 = 0
298 while not cz and t1 < timeHorizon: # t1 <= timeHorizon-1
299 t2 = 0
300 while not cz and t2 < timeHorizon:
301 cz = moving.segmentIntersection(et1.predictPosition(t1), et1.predictPosition(t1+1), et2.predictPosition(t2), et2.predictPosition(t2+1))
302 if cz is not None:
303 deltaV= (et1.predictPosition(t1)- et1.predictPosition(t1+1) - et2.predictPosition(t2)+ et2.predictPosition(t2+1)).norm2()
304 crossingZones.append(SafetyPoint(cz, et1.probability*et2.probability, abs(t1-t2)-(float(collisionDistanceThreshold)/deltaV)))
305 t2 += 1
306 t1 += 1
307
308 if debug:
309 savePredictedTrajectoriesFigure(currentInstant, obj1, obj2, predictedTrajectories1, predictedTrajectories2, timeHorizon)
310
311 return collisionPoints, crossingZones
312
313 def computeCrossingsCollisions(self, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None):#, nProcesses = 1):
314 '''Computes all crossing and collision points at each common instant for two road users. '''
315 collisionPoints = {}
316 if computeCZ:
317 crossingZones = {}
318 else:
319 crossingZones = None
320 if timeInterval is not None:
321 commonTimeInterval = timeInterval
322 else:
323 commonTimeInterval = obj1.commonTimeInterval(obj2)
324 #if nProcesses == 1:
325 for i in list(commonTimeInterval)[:-1]: # do not look at the 1 last position/velocities, often with errors
326 cp, cz = self.computeCrossingsCollisionsAtInstant(i, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ, debug)
327 if len(cp) != 0:
328 collisionPoints[i] = cp
329 if computeCZ and len(cz) != 0:
330 crossingZones[i] = cz
331 return collisionPoints, crossingZones
332
333 def computeCollisionProbability(self, obj1, obj2, collisionDistanceThreshold, timeHorizon, debug = False, timeInterval = None):
334 '''Computes only collision probabilities
335 Returns for each instant the collision probability and number of samples drawn'''
336 collisionProbabilities = {}
337 if timeInterval is not None:
338 commonTimeInterval = timeInterval
339 else:
340 commonTimeInterval = obj1.commonTimeInterval(obj2)
341 for i in list(commonTimeInterval)[:-1]:
342 nCollisions = 0
343 predictedTrajectories1 = self.generatePredictedTrajectories(obj1, i)
344 predictedTrajectories2 = self.generatePredictedTrajectories(obj2, i)
345 for et1 in predictedTrajectories1:
346 for et2 in predictedTrajectories2:
347 collision, t, p1, p2 = computeCollisionTime(et1, et2, collisionDistanceThreshold, timeHorizon)
348 if collision:
349 nCollisions += 1
350 # take into account probabilities ??
351 nSamples = float(len(predictedTrajectories1)*len(predictedTrajectories2))
352 collisionProbabilities[i] = [nSamples, float(nCollisions)/nSamples]
353
354 if debug:
355 savePredictedTrajectoriesFigure(i, obj1, obj2, predictedTrajectories1, predictedTrajectories2, timeHorizon)
356
357 return collisionProbabilities
358
359 class ConstantPredictionParameters(PredictionParameters):
360 def __init__(self, maxSpeed):
361 PredictionParameters.__init__(self, 'constant velocity', maxSpeed)
362
363 def generatePredictedTrajectories(self, obj, instant):
364 return [PredictedTrajectoryConstant(obj.getPositionAtInstant(instant), obj.getVelocityAtInstant(instant), maxSpeed = self.maxSpeed)]
365
366 class NormalAdaptationPredictionParameters(PredictionParameters):
367 def __init__(self, maxSpeed, nPredictedTrajectories, accelerationDistribution, steeringDistribution, useFeatures = False):
368 '''An example of acceleration and steering distributions is
369 lambda: random.triangular(-self.maxAcceleration, self.maxAcceleration, 0.)
370 '''
371 if useFeatures:
372 name = 'point set normal adaptation'
373 else:
374 name = 'normal adaptation'
375 PredictionParameters.__init__(self, name, maxSpeed)
376 self.nPredictedTrajectories = nPredictedTrajectories
377 self.useFeatures = useFeatures
378 self.accelerationDistribution = accelerationDistribution
379 self.steeringDistribution = steeringDistribution
380
381 def __str__(self):
382 return PredictionParameters.__str__(self)+' {0} {1} {2}'.format(self.nPredictedTrajectories,
383 self.maxAcceleration,
384 self.maxSteering)
385
386 def generatePredictedTrajectories(self, obj, instant):
387 predictedTrajectories = []
388 if self.useFeatures and obj.hasFeatures():
389 features = [f for f in obj.getFeatures() if f.existsAtInstant(instant)]
390 positions = [f.getPositionAtInstant(instant) for f in features]
391 velocities = [f.getVelocityAtInstant(instant) for f in features]
392 else:
393 positions = [obj.getPositionAtInstant(instant)]
394 velocities = [obj.getVelocityAtInstant(instant)]
395 probability = 1./float(len(positions)*self.nPredictedTrajectories)
396 for i in range(self.nPredictedTrajectories):
397 for initialPosition,initialVelocity in zip(positions, velocities):
398 predictedTrajectories.append(PredictedTrajectoryRandomControl(initialPosition,
399 initialVelocity,
400 self.accelerationDistribution,
401 self.steeringDistribution,
402 probability,
403 maxSpeed = self.maxSpeed))
404 return predictedTrajectories
405
406 class PointSetPredictionParameters(PredictionParameters):
407 def __init__(self, maxSpeed):
408 PredictionParameters.__init__(self, 'point set', maxSpeed)
409
410 def generatePredictedTrajectories(self, obj, instant):
411 predictedTrajectories = []
412 if obj.hasFeatures():
413 features = [f for f in obj.getFeatures() if f.existsAtInstant(instant)]
414 positions = [f.getPositionAtInstant(instant) for f in features]
415 velocities = [f.getVelocityAtInstant(instant) for f in features]
416 probability = 1./float(len(positions))
417 for initialPosition,initialVelocity in zip(positions, velocities):
418 predictedTrajectories.append(PredictedTrajectoryConstant(initialPosition, initialVelocity, probability = probability, maxSpeed = self.maxSpeed))
419 return predictedTrajectories
420 else:
421 print('Object {} has no features'.format(obj.getNum()))
422 return None
423
424
425 class EvasiveActionPredictionParameters(PredictionParameters):
426 def __init__(self, maxSpeed, nPredictedTrajectories, accelerationDistribution, steeringDistribution, useFeatures = False):
427 '''Suggested acceleration distribution may not be symmetric, eg
428 lambda: random.triangular(self.minAcceleration, self.maxAcceleration, 0.)'''
429
430 if useFeatures:
431 name = 'point set evasive action'
432 else:
433 name = 'evasive action'
434 PredictionParameters.__init__(self, name, maxSpeed)
435 self.nPredictedTrajectories = nPredictedTrajectories
436 self.useFeatures = useFeatures
437 self.accelerationDistribution = accelerationDistribution
438 self.steeringDistribution = steeringDistribution
439
440 def __str__(self):
441 return PredictionParameters.__str__(self)+' {0} {1} {2} {3}'.format(self.nPredictedTrajectories, self.minAcceleration, self.maxAcceleration, self.maxSteering)
442
443 def generatePredictedTrajectories(self, obj, instant):
444 predictedTrajectories = []
445 if self.useFeatures and obj.hasFeatures():
446 features = [f for f in obj.getFeatures() if f.existsAtInstant(instant)]
447 positions = [f.getPositionAtInstant(instant) for f in features]
448 velocities = [f.getVelocityAtInstant(instant) for f in features]
449 else:
450 positions = [obj.getPositionAtInstant(instant)]
451 velocities = [obj.getVelocityAtInstant(instant)]
452 probability = 1./float(self.nPredictedTrajectories)
453 for i in range(self.nPredictedTrajectories):
454 for initialPosition,initialVelocity in zip(positions, velocities):
455 predictedTrajectories.append(PredictedTrajectoryConstant(initialPosition,
456 initialVelocity,
457 moving.NormAngle(self.accelerationDistribution(),
458 self.steeringDistribution()),
459 probability,
460 self.maxSpeed))
461 return predictedTrajectories
462
463
464 class CVDirectPredictionParameters(PredictionParameters):
465 '''Prediction parameters of prediction at constant velocity
466 using direct computation of the intersecting point
467 Warning: the computed time to collision may be higher than timeHorizon (not used)'''
468
469 def __init__(self):
470 PredictionParameters.__init__(self, 'constant velocity (direct computation)', None)
471
472 def computeCrossingsCollisionsAtInstant(self, currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, *kwargs):
473 collisionPoints = []
474 if computeCZ:
475 crossingZones = []
476 else:
477 crossingZones = None
478
479 p1 = obj1.getPositionAtInstant(currentInstant)
480 p2 = obj2.getPositionAtInstant(currentInstant)
481 if (p1-p2).norm2() <= collisionDistanceThreshold:
482 collisionPoints = [SafetyPoint((p1+p2)*0.5, 1., 0.)]
483 else:
484 v1 = obj1.getVelocityAtInstant(currentInstant)
485 v2 = obj2.getVelocityAtInstant(currentInstant)
486 intersection = moving.intersection(p1, p1+v1, p2, p2+v2)
487
488 if intersection is not None:
489 dp1 = intersection-p1
490 dp2 = intersection-p2
491 dot1 = moving.Point.dot(dp1, v1)
492 dot2 = moving.Point.dot(dp2, v2)
493 if (computeCZ and (dot1 > 0 or dot2 > 0)) or (dot1 > 0 and dot2 > 0): # if the road users are moving towards the intersection or if computing pPET
494 dist1 = dp1.norm2()
495 dist2 = dp2.norm2()
496 s1 = math.copysign(v1.norm2(), dot1)
497 s2 = math.copysign(v2.norm2(), dot2)
498 halfCollisionDistanceThreshold = collisionDistanceThreshold/2.
499 timeInterval1 = moving.TimeInterval(max(0,dist1-halfCollisionDistanceThreshold)/s1, (dist1+halfCollisionDistanceThreshold)/s1)
500 timeInterval2 = moving.TimeInterval(max(0,dist2-halfCollisionDistanceThreshold)/s2, (dist2+halfCollisionDistanceThreshold)/s2)
501 collisionTimeInterval = moving.TimeInterval.intersection(timeInterval1, timeInterval2)
502
503 if collisionTimeInterval.empty():
504 if computeCZ:
505 crossingZones = [SafetyPoint(intersection, 1., timeInterval1.distance(timeInterval2))]
506 else:
507 collisionPoints = [SafetyPoint(intersection, 1., collisionTimeInterval.center())]
508
509 if debug and intersection is not None:
510 from matplotlib.pyplot import plot, figure, axis, title
511 figure()
512 plot([p1.x, intersection.x], [p1.y, intersection.y], 'r')
513 plot([p2.x, intersection.x], [p2.y, intersection.y], 'b')
514 intersection.plot()
515 obj1.plot('r')
516 obj2.plot('b')
517 title('instant {0}'.format(currentInstant))
518 axis('equal')
519
520 return collisionPoints, crossingZones
521
522 class CVExactPredictionParameters(PredictionParameters):
523 '''Prediction parameters of prediction at constant velocity
524 using direct computation of the intersecting point (solving the equation)
525 Warning: the computed time to collision may be higher than timeHorizon (not used)'''
526
527 def __init__(self):
528 PredictionParameters.__init__(self, 'constant velocity (direct exact computation)', None)
529
530 def computeCrossingsCollisionsAtInstant(self, currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, *kwargs):
531 'TODO compute pPET'
532 collisionPoints = []
533 crossingZones = []
534
535 p1 = obj1.getPositionAtInstant(currentInstant)
536 p2 = obj2.getPositionAtInstant(currentInstant)
537 v1 = obj1.getVelocityAtInstant(currentInstant)
538 v2 = obj2.getVelocityAtInstant(currentInstant)
539 #intersection = moving.intersection(p1, p1+v1, p2, p2+v2)
540
541 if not moving.Point.parallel(v1, v2):
542 ttc = moving.Point.timeToCollision(p1, p2, v1, v2, collisionDistanceThreshold)
543 if ttc is not None:
544 collisionPoints = [SafetyPoint((p1+(v1*ttc)+p2+(v2*ttc))*0.5, 1., ttc)]
545 else:
546 pass # compute pPET
547
548 return collisionPoints, crossingZones
549
550 class PrototypePredictionParameters(PredictionParameters):
551 def __init__(self, prototypes, nPredictedTrajectories, pointSimilarityDistance, minSimilarity, lcssMetric = 'cityblock', minFeatureTime = 10, constantSpeed = False, useFeatures = True):
552 PredictionParameters.__init__(self, 'prototypes', None)
553 self.prototypes = prototypes
554 self.nPredictedTrajectories = nPredictedTrajectories
555 self.lcss = LCSS(metric = lcssMetric, epsilon = pointSimilarityDistance)
556 self.minSimilarity = minSimilarity
557 self.minFeatureTime = minFeatureTime
558 self.constantSpeed = constantSpeed
559 self.useFeatures = useFeatures
560
561 def getLcss(self):
562 return self.lcss
563
564 def addPredictedTrajectories(self, predictedTrajectories, obj, instant):
565 obj.computeTrajectorySimilarities(self.prototypes, self.lcss)
566 for proto, similarities in zip(self.prototypes, obj.prototypeSimilarities):
567 if similarities[instant-obj.getFirstInstant()] >= self.minSimilarity:
568 initialPosition = obj.getPositionAtInstant(instant)
569 initialVelocity = obj.getVelocityAtInstant(instant)
570 predictedTrajectories.append(PredictedTrajectoryPrototype(initialPosition, initialVelocity, proto.getMovingObject(), constantSpeed = self.constantSpeed, probability = proto.getNMatchings()))
571
572 def generatePredictedTrajectories(self, obj, instant):
573 predictedTrajectories = []
574 if instant-obj.getFirstInstant()+1 >= self.minFeatureTime:
575 if self.useFeatures and obj.hasFeatures():
576 if not hasattr(obj, 'currentPredictionFeatures'):
577 obj.currentPredictionFeatures = []
578 else:
579 obj.currentPredictionFeatures[:] = [f for f in obj.currentPredictionFeatures if f.existsAtInstant(instant)]
580 firstInstants = [(f,f.getFirstInstant()) for f in obj.getFeatures() if f.existsAtInstant(instant) and f not in obj.currentPredictionFeatures]
581 firstInstants.sort(key = lambda t: t[1])
582 for f,t1 in firstInstants[:min(self.nPredictedTrajectories, len(firstInstants), self.nPredictedTrajectories-len(obj.currentPredictionFeatures))]:
583 obj.currentPredictionFeatures.append(f)
584 for f in obj.currentPredictionFeatures:
585 self.addPredictedTrajectories(predictedTrajectories, f, instant)
586 else:
587 self.addPredictedTrajectories(predictedTrajectories, obj, instant)
588 return predictedTrajectories
589
590 if __name__ == "__main__":
591 import doctest
592 import unittest
593 suite = doctest.DocFileSuite('tests/prediction.txt')
594 #suite = doctest.DocTestSuite()
595 unittest.TextTestRunner().run(suite)
596 #doctest.testmod()
597 #doctest.testfile("example.txt")
598