comparison trafficintelligence/prediction.py @ 1212:af329f3330ba

work in progress on 3D safety analysis
author Nicolas Saunier <nicolas.saunier@polymtl.ca>
date Tue, 02 May 2023 17:11:24 -0400
parents a095d4fbb2ea
children 01c24c1cdb70
comparison
equal deleted inserted replaced
1211:a095d4fbb2ea 1212:af329f3330ba
70 if nTimeSteps > 0 and not nTimeSteps in self.predictedPositions: 70 if nTimeSteps > 0 and not nTimeSteps in self.predictedPositions:
71 self.predictPosition(nTimeSteps-1) 71 self.predictPosition(nTimeSteps-1)
72 self.predictedPositions[nTimeSteps] = self.predictedPositions[nTimeSteps-1]+self.initialVelocity 72 self.predictedPositions[nTimeSteps] = self.predictedPositions[nTimeSteps-1]+self.initialVelocity
73 return self.predictedPositions[nTimeSteps] 73 return self.predictedPositions[nTimeSteps]
74 74
75 class PredictedPolyTrajectoryConstant(PredictedTrajectory): 75 class PredictedPolyTrajectoryConstantVelocity(PredictedTrajectory):
76 '''Predicted trajectory of an object with an outline represented by a polygon 76 '''Predicted trajectory of an object with an outline represented by a polygon
77 Simple method that just translates the polygon corners''' 77 Simple method that just translates the polygon corners (set of moving.Point)'''
78 def __init__(self, polyCorners, initialVelocity, probability = 1.): 78 def __init__(self, polyCorners, initialVelocity, probability = 1.):
79 self.probability = probability 79 self.probability = probability
80 self.predictedPositions = {0: polyCorners} 80 self.predictedPositions = {0: moving.pointsToShapely(polyCorners)}
81 self.initialVelocity = initialVelocity 81 self.initialVelocity = initialVelocity
82 self.predictedTrajectories = [PredictedTrajectoryConstant(p, initialVelocity) for p in polyCorners] 82 self.predictedTrajectories = [PredictedTrajectoryConstantVelocity(p, initialVelocity) for p in polyCorners]
83 83
84 def predictPosition(self, nTimeSteps):
85 if nTimeSteps > 0 and not nTimeSteps in self.predictedPositions:
86 self.predictedPositions[nTimeSteps] = moving.pointsToShapely([t.predictPosition(nTimeSteps) for t in self.predictedTrajectories])
87 return self.predictedPositions[nTimeSteps]
88
89 def hasCollisionWith(self, other, t, collisionDistanceThreshold):
90 poly1 = self.predictPosition(t)
91 poly2 = other.predictPosition(t)
92 return poly1.overlaps(poly2), poly1, poly2
93
84 class PredictedTrajectoryPrototype(PredictedTrajectory): 94 class PredictedTrajectoryPrototype(PredictedTrajectory):
85 '''Predicted trajectory that follows a prototype trajectory 95 '''Predicted trajectory that follows a prototype trajectory
86 The prototype is in the format of a moving.Trajectory: it could be 96 The prototype is in the format of a moving.Trajectory: it could be
87 1. an observed trajectory (extracted from video) 97 1. an observed trajectory (extracted from video)
88 2. a generic polyline (eg the road centerline) that a vehicle is supposed to follow 98 2. a generic polyline (eg the road centerline) that a vehicle is supposed to follow
194 while t < timeHorizon and not collision: 204 while t < timeHorizon and not collision:
195 t += 1 205 t += 1
196 collision, p1, p2 = predictedTrajectory1.hasCollisionWith(predictedTrajectory2, t, collisionDistanceThreshold) 206 collision, p1, p2 = predictedTrajectory1.hasCollisionWith(predictedTrajectory2, t, collisionDistanceThreshold)
197 return collision, t, p1, p2 207 return collision, t, p1, p2
198 208
199 def computeCollisionTimePolygon(predictedTrajectories1, predictedTrajectories2, timeHorizon):
200 '''Computes the first instant
201 at which two objects represented by a series of points (eg box)
202 Computes all the times including timeHorizon
203
204 User has to check the first variable collision to know about a collision'''
205 t = 1
206 poly1 = moving.pointsToShapely([t1.predictPosition(t) for t1 in predictedTrajectories1])
207 poly2 = moving.pointsToShapely([t2.predictPosition(t) for t2 in predictedTrajectories2])
208 collision = poly1.overlaps(poly2)
209 while t < timeHorizon and not collision:
210 t += 1
211 poly1 = moving.pointsToShapely([t1.predictPosition(t) for t1 in predictedTrajectories1])
212 poly2 = moving.pointsToShapely([t2.predictPosition(t) for t2 in predictedTrajectories2])
213 collision = poly1.overlaps(poly2)
214 return collision, t
215
216 def savePredictedTrajectoriesFigure(currentInstant, obj1, obj2, predictedTrajectories1, predictedTrajectories2, timeHorizon, printFigure = True): 209 def savePredictedTrajectoriesFigure(currentInstant, obj1, obj2, predictedTrajectories1, predictedTrajectories2, timeHorizon, printFigure = True):
217 from matplotlib.pyplot import figure, axis, title, clf, savefig 210 from matplotlib.pyplot import figure, axis, title, clf, savefig
218 if printFigure: 211 if printFigure:
219 clf() 212 clf()
220 else: 213 else:
312 prototypeTrajectories=findPrototypes(prototypes,nMatching,objects,route,partialObjPositions,noiseEntryNums,noiseExitNums,minSimilarity,mostMatched) 305 prototypeTrajectories=findPrototypes(prototypes,nMatching,objects,route,partialObjPositions,noiseEntryNums,noiseExitNums,minSimilarity,mostMatched)
313 return prototypeTrajectories 306 return prototypeTrajectories
314 307
315 308
316 class PredictionParameters(object): 309 class PredictionParameters(object):
317 def __init__(self, name, maxSpeed, useCurvilinear = False): 310 def __init__(self, name, maxSpeed = None, useCurvilinear = False):
318 self.name = name 311 self.name = name
319 self.maxSpeed = maxSpeed 312 self.maxSpeed = maxSpeed
320 self.useCurvilinear = useCurvilinear 313 self.useCurvilinear = useCurvilinear
321 314
322 def __str__(self): 315 def __str__(self):
323 return '{0} {1}'.format(self.name, self.maxSpeed) 316 return '{0} {1}'.format(self.name, self.maxSpeed)
324 317
325 def generatePredictedTrajectories(self, obj, instant): 318 def generatePredictedTrajectories(self, obj, instant):
326 return None 319 return None
320
321 def computeCollisionPoint(self, p1, p2, probability, t):
322 return SafetyPoint((p1+p2)*0.5, probability, t)
327 323
328 def computeCrossingsCollisionsAtInstant(self, currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False): 324 def computeCrossingsCollisionsAtInstant(self, currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False):
329 '''returns the lists of collision points and crossing zones''' 325 '''returns the lists of collision points and crossing zones'''
330 predictedTrajectories1 = self.generatePredictedTrajectories(obj1, currentInstant) 326 predictedTrajectories1 = self.generatePredictedTrajectories(obj1, currentInstant)
331 predictedTrajectories2 = self.generatePredictedTrajectories(obj2, currentInstant) 327 predictedTrajectories2 = self.generatePredictedTrajectories(obj2, currentInstant)
337 crossingZones = None 333 crossingZones = None
338 for et1 in predictedTrajectories1: 334 for et1 in predictedTrajectories1:
339 for et2 in predictedTrajectories2: 335 for et2 in predictedTrajectories2:
340 collision, t, p1, p2 = computeCollisionTime(et1, et2, collisionDistanceThreshold, timeHorizon) 336 collision, t, p1, p2 = computeCollisionTime(et1, et2, collisionDistanceThreshold, timeHorizon)
341 if collision: 337 if collision:
342 collisionPoints.append(SafetyPoint((p1+p2)*0.5, et1.probability*et2.probability, t)) 338 collisionPoints.append(self.computeCollisionPoint(p1, p2, et1.probability*et2.probability, t))
343 elif computeCZ: # check if there is a crossing zone 339 elif computeCZ: # check if there is a crossing zone
344 # TODO same computation as PET with metric + concatenate past trajectory with future trajectory 340 # TODO same computation as PET with metric + concatenate past trajectory with future trajectory
345 cz = None 341 cz = None
346 t1 = 0 342 t1 = 0
347 while not cz and t1 < timeHorizon: # t1 <= timeHorizon-1 343 while not cz and t1 < timeHorizon: # t1 <= timeHorizon-1
410 PredictionParameters.__init__(self, 'constant velocity') 406 PredictionParameters.__init__(self, 'constant velocity')
411 407
412 def generatePredictedTrajectories(self, obj, instant): 408 def generatePredictedTrajectories(self, obj, instant):
413 return [PredictedTrajectoryConstantVelocity(obj.getPositionAtInstant(instant), obj.getVelocityAtInstant(instant))] 409 return [PredictedTrajectoryConstantVelocity(obj.getPositionAtInstant(instant), obj.getVelocityAtInstant(instant))]
414 410
411 class ConstantPolyPredictionParameters(PredictionParameters):
412 def __init__(self):
413 PredictionParameters.__init__(self, 'constant velocity for polygon representation')
414
415 def generatePredictedTrajectories(self, obj, instant):
416 return [PredictedPolyTrajectoryConstantVelocity(obj.getBoundingPolygon(instant), obj.getVelocityAtInstant(instant))]
417
418 def computeCollisionPoint(self, p1, p2, probability, t):
419 polyRepr1 = p1.representative_point()
420 polyRepr2 = p2.representative_point()
421 p = moving.Point(polyRepr1.x+polyRepr2.x, polyRepr1.y+polyRepr2.y)
422 return SafetyPoint(p, probability, t)
423
415 class NormalAdaptationPredictionParameters(PredictionParameters): 424 class NormalAdaptationPredictionParameters(PredictionParameters):
416 def __init__(self, maxSpeed, nPredictedTrajectories, accelerationDistribution, steeringDistribution, useFeatures = False): 425 def __init__(self, maxSpeed, nPredictedTrajectories, accelerationDistribution, steeringDistribution, useFeatures = False):
417 '''An example of acceleration and steering distributions is 426 '''An example of acceleration and steering distributions is
418 lambda: random.triangular(-self.maxAcceleration, self.maxAcceleration, 0.) 427 lambda: random.triangular(-self.maxAcceleration, self.maxAcceleration, 0.)
419 ''' 428 '''
467 predictedTrajectories.append(PredictedTrajectoryConstantVelocity(initialPosition, initialVelocity, probability = probability)) 476 predictedTrajectories.append(PredictedTrajectoryConstantVelocity(initialPosition, initialVelocity, probability = probability))
468 return predictedTrajectories 477 return predictedTrajectories
469 else: 478 else:
470 print('Object {} has no features'.format(obj.getNum())) 479 print('Object {} has no features'.format(obj.getNum()))
471 return None 480 return None
472
473 481
474 class EvasiveActionPredictionParameters(PredictionParameters): 482 class EvasiveActionPredictionParameters(PredictionParameters):
475 def __init__(self, maxSpeed, nPredictedTrajectories, accelerationDistribution, steeringDistribution, useFeatures = False): 483 def __init__(self, maxSpeed, nPredictedTrajectories, accelerationDistribution, steeringDistribution, useFeatures = False):
476 '''Suggested acceleration distribution may not be symmetric, eg 484 '''Suggested acceleration distribution may not be symmetric, eg
477 lambda: random.triangular(self.minAcceleration, self.maxAcceleration, 0.)''' 485 lambda: random.triangular(self.minAcceleration, self.maxAcceleration, 0.)'''
506 moving.NormAngle(self.accelerationDistribution(), 514 moving.NormAngle(self.accelerationDistribution(),
507 self.steeringDistribution()), 515 self.steeringDistribution()),
508 probability, 516 probability,
509 self.maxSpeed)) 517 self.maxSpeed))
510 return predictedTrajectories 518 return predictedTrajectories
511
512 519
513 class CVDirectPredictionParameters(PredictionParameters): 520 class CVDirectPredictionParameters(PredictionParameters):
514 '''Prediction parameters of prediction at constant velocity 521 '''Prediction parameters of prediction at constant velocity
515 using direct computation of the intersecting point 522 using direct computation of the intersecting point
516 Warning: the computed time to collision may be higher than timeHorizon (not used)''' 523 Warning: the computed time to collision may be higher than timeHorizon (not used)'''