Mercurial Hosting > traffic-intelligence
diff python/prediction.py @ 271:bbd9c09e6869
changed the names to prediction methods and predicted trajectories
author | Nicolas Saunier <nicolas.saunier@polymtl.ca> |
---|---|
date | Thu, 09 Aug 2012 15:18:20 -0400 |
parents | python/extrapolation.py@05c9b0cb8202 |
children | e56c34c1ebac |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/python/prediction.py Thu Aug 09 15:18:20 2012 -0400 @@ -0,0 +1,300 @@ +#! /usr/bin/env python +'''Library for motion prediction methods''' + +import moving +import math +import random + +class PredictedTrajectory: + '''Class for predicted trajectories with lazy evaluation + if the predicted position has not been already computed, compute it + + it should also have a probability''' + + def __init__(self): + self.probability = 0. + self.predictedPositions = {} + self.predictedSpeedOrientations = {} + self.collisionPoints = {} + self.crossingZones = {} + + def predictPosition(self, nTimeSteps): + if nTimeSteps > 0 and not nTimeSteps in self.predictedPositions.keys(): + self.predictPosition(nTimeSteps-1) + self.predictedPositions[nTimeSteps], self.predictedSpeedOrientations[nTimeSteps] = moving.predictPosition(self.predictedPositions[nTimeSteps-1], self.predictedSpeedOrientations[nTimeSteps-1], self.getControl(), self.maxSpeed) + return self.predictedPositions[nTimeSteps] + + def getPredictedTrajectory(self): + return moving.Trajectory.fromPointList(self.predictedPositions.values()) + + def getPredictedSpeeds(self): + return [so.norm for so in self.predictedSpeedOrientations.values()] + + def draw(self, options = '', withOrigin = False, timeStep = 1, **kwargs): + self.getPredictedTrajectory().draw(options, withOrigin, timeStep, **kwargs) + +class PredictedTrajectoryConstant(PredictedTrajectory): + '''Predicted trajectory at constant speed or acceleration + TODO generalize by passing a series of velocities/accelerations''' + + def __init__(self, initialPosition, initialVelocity, control = moving.NormAngle(0,0), probability = 1, maxSpeed = None): + self.control = control + self.maxSpeed = maxSpeed + self.probability = probability + self.predictedPositions = {0: initialPosition} + self.predictedSpeedOrientations = {0: moving.NormAngle.fromPoint(initialVelocity)} + + def getControl(self): + return self.control + +class PredictedTrajectoryNormalAdaptation(PredictedTrajectory): + '''Random small adaptation of vehicle control ''' + def __init__(self, initialPosition, initialVelocity, accelerationDistribution, steeringDistribution, probability = 1, maxSpeed = None): + '''Constructor + accelerationDistribution and steeringDistribution are distributions + that return random numbers drawn from them''' + self.accelerationDistribution = accelerationDistribution + self.steeringDistribution = steeringDistribution + self.maxSpeed = maxSpeed + self.probability = probability + self.predictedPositions = {0: initialPosition} + self.predictedSpeedOrientations = {0: moving.NormAngle.fromPoint(initialVelocity)} + + def getControl(self): + return moving.NormAngle(self.accelerationDistribution(),self.steeringDistribution()) + +class PredictionParameters: + def __init__(self, name, maxSpeed): + self.name = name + self.maxSpeed = maxSpeed + + def __str__(self): + return '{0} {1}'.format(self.name, self.maxSpeed) + +class ConstantPredictionParameters(PredictionParameters): + def __init__(self, maxSpeed): + PredictionParameters.__init__(self, 'constant velocity', maxSpeed) + + def generatePredictedTrajectories(self, obj, instant): + return [PredictedTrajectoryConstant(obj.getPositionAtInstant(instant), obj.getVelocityAtInstant(instant), + maxSpeed = self.maxSpeed)] + +class NormalAdaptationPredictionParameters(PredictionParameters): + def __init__(self, maxSpeed, nPredictedTrajectories, maxAcceleration, maxSteering): + PredictionParameters.__init__(self, 'normal adaptation', maxSpeed) + self.nPredictedTrajectories = nPredictedTrajectories + self.maxAcceleration = maxAcceleration + self.maxSteering = maxSteering + self.accelerationDistribution = lambda: random.triangular(-self.maxAcceleration, + self.maxAcceleration, 0.) + self.steeringDistribution = lambda: random.triangular(-self.maxSteering, + self.maxSteering, 0.) + + def __str__(self): + return PredictionParameters.__str__(self)+' {0} {1} {2}'.format(self.nPredictedTrajectories, + self.maxAcceleration, + self.maxSteering) + + def generatePredictedTrajectories(self, obj, instant): + predictedTrajectories = [] + for i in xrange(self.nPredictedTrajectories): + predictedTrajectories.append(PredictedTrajectoryNormalAdaptation(obj.getPositionAtInstant(instant), + obj.getVelocityAtInstant(instant), + self.accelerationDistribution, + self.steeringDistribution, + maxSpeed = self.maxSpeed)) + return predictedTrajectories + +class PointSetPredictionParameters(PredictionParameters): + # todo generate several trajectories with normal adaptatoins from each position (feature) + def __init__(self, maxSpeed): + PredictionParameters.__init__(self, 'point set', maxSpeed) + + def generatePredictedTrajectories(self, obj, instant): + predictedTrajectories = [] + features = [f for f in obj.features if f.existsAtInstant(instant)] + positions = [f.getPositionAtInstant(instant) for f in features] + velocities = [f.getVelocityAtInstant(instant) for f in features] + for initialPosition,initialVelocity in zip(positions, velocities): + predictedTrajectories.append(PredictedTrajectoryConstant(initialPosition, initialVelocity, + maxSpeed = self.maxSpeed)) + return predictedTrajectories + +class EvasiveActionPredictionParameters(PredictionParameters): + def __init__(self, maxSpeed, nPredictedTrajectories, minAcceleration, maxAcceleration, maxSteering, useFeatures = False): + if useFeatures: + name = 'point set evasive action' + else: + name = 'evasive action' + PredictionParameters.__init__(self, name, maxSpeed) + self.nPredictedTrajectories = nPredictedTrajectories + self.minAcceleration = minAcceleration + self.maxAcceleration = maxAcceleration + self.maxSteering = maxSteering + self.useFeatures = useFeatures + self.accelerationDistribution = lambda: random.triangular(self.minAcceleration, + self.maxAcceleration, 0.) + self.steeringDistribution = lambda: random.triangular(-self.maxSteering, + self.maxSteering, 0.) + + def __str__(self): + return PredictionParameters.__str__(self)+' {0} {1} {2} {3}'.format(self.nPredictedTrajectories, self.minAcceleration, self.maxAcceleration, self.maxSteering) + + def generatePredictedTrajectories(self, obj, instant): + predictedTrajectories = [] + if self.useFeatures: + features = [f for f in obj.features if f.existsAtInstant(instant)] + positions = [f.getPositionAtInstant(instant) for f in features] + velocities = [f.getVelocityAtInstant(instant) for f in features] + else: + positions = [obj.getPositionAtInstant(instant)] + velocities = [obj.getVelocityAtInstant(instant)] + for i in xrange(self.nPredictedTrajectories): + for initialPosition,initialVelocity in zip(positions, velocities): + predictedTrajectories.append(PredictedTrajectoryConstant(initialPosition, + initialVelocity, + moving.NormAngle(self.accelerationDistribution(), + self.steeringDistribution()), + maxSpeed = self.maxSpeed)) + return predictedTrajectories + +class SafetyPoint(moving.Point): + '''Can represent a collision point or crossing zone + with respective safety indicator, TTC or pPET''' + def __init__(self, p, probability = 1., indicator = -1): + self.x = p.x + self.y = p.y + self.probability = probability + self.indicator = indicator + + @staticmethod + def save(out, points, objNum1, objNum2, instant): + for p in points: + out.write('{0} {1} {2} {3} {4} {5} {6}\n'.format(objNum1, objNum2, instant, p.x, p.y, p.probability, p.indicator)) + +def computeExpectedIndicator(points): + from numpy import sum + return sum([p.indicator*p.probability for p in points])/sum([p.probability for p in points]) + +def computeCollisionTime(predictedTrajectory1, predictedTrajectory2, collisionDistanceThreshold, timeHorizon): + t = 1 + p1 = predictedTrajectory1.predictPosition(t) + p2 = predictedTrajectory2.predictPosition(t) + while t <= timeHorizon and (p1-p2).norm2() > collisionDistanceThreshold: + p1 = predictedTrajectory1.predictPosition(t) + p2 = predictedTrajectory2.predictPosition(t) + t += 1 + return t, p1, p2 + +def computeCrossingsCollisionsAtInstant(i, obj1, obj2, predictionParameters, collisionDistanceThreshold, timeHorizon, debug = False): + '''returns the lists of collision points and crossing zones + + Check: Predicting all the points together, as if they represent the whole vehicle''' + predictedTrajectories1 = predictionParameters.generatePredictedTrajectories(obj1, i) + predictedTrajectories2 = predictionParameters.generatePredictedTrajectories(obj2, i) + + collisionPoints = [] + crossingZones = [] + for et1 in predictedTrajectories1: + for et2 in predictedTrajectories2: + t, p1, p2 = computeCollisionTime(et1, et2, collisionDistanceThreshold, timeHorizon) + + if t <= timeHorizon: + collisionPoints.append(SafetyPoint((p1+p2).multiply(0.5), et1.probability*et2.probability, t)) + else: # check if there is a crossing zone + # TODO? zone should be around the points at which the traj are the closest + # look for CZ at different times, otherwise it would be a collision + # an approximation would be to look for close points at different times, ie the complementary of collision points + cz = None + t1 = 0 + while not cz and t1 < timeHorizon: # t1 <= timeHorizon-1 + t2 = 0 + while not cz and t2 < timeHorizon: + #if (et1.predictPosition(t1)-et2.predictPosition(t2)).norm2() < collisionDistanceThreshold: + # cz = (et1.predictPosition(t1)+et2.predictPosition(t2)).multiply(0.5) + cz = moving.segmentIntersection(et1.predictPosition(t1), et1.predictPosition(t1+1), et2.predictPosition(t2), et2.predictPosition(t2+1)) + if cz: + crossingZones.append(SafetyPoint(cz, et1.probability*et2.probability, abs(t1-t2))) + t2 += 1 + t1 += 1 + + if debug: + from matplotlib.pyplot import figure, axis, title + figure() + for et in predictedTrajectories1: + et.predictPosition(timeHorizon) + et.draw('rx') + + for et in predictedTrajectories2: + et.predictPosition(timeHorizon) + et.draw('bx') + obj1.draw('r') + obj2.draw('b') + title('instant {0}'.format(i)) + axis('equal') + + return collisionPoints, crossingZones + +def computeCrossingsCollisions(obj1, obj2, predictionParameters, collisionDistanceThreshold, timeHorizon, outCP, outCZ, debug = False, timeInterval = None): + collisionPoints={} + crossingZones={} + if timeInterval: + commonTimeInterval = timeInterval + else: + commonTimeInterval = obj1.commonTimeInterval(obj2) + for i in list(commonTimeInterval)[:-1]: # do not look at the 1 last position/velocities, often with errors + print(obj1.num, obj2.num, i) + collisionPoints[i], crossingZones[i] = computeCrossingsCollisionsAtInstant(i, obj1, obj2, predictionParameters, collisionDistanceThreshold, timeHorizon, debug) + SafetyPoint.save(outCP, collisionPoints[i], obj1.num, obj2.num, i) + SafetyPoint.save(outCZ, crossingZones[i], obj1.num, obj2.num, i) + + return collisionPoints, crossingZones + +def computeCollisionProbability(obj1, obj2, predictionParameters, collisionDistanceThreshold, timeHorizon, out, debug = False, timeInterval = None): + collisionProbabilities = {} + if timeInterval: + commonTimeInterval = timeInterval + else: + commonTimeInterval = obj1.commonTimeInterval(obj2) + for i in list(commonTimeInterval)[:-1]: + nCollisions = 0 + print(obj1.num, obj2.num, i) + predictedTrajectories1 = predictionParameters.generatePredictedTrajectories(obj1, i) + predictedTrajectories2 = predictionParameters.generatePredictedTrajectories(obj2, i) + for et1 in predictedTrajectories1: + for et2 in predictedTrajectories2: + t, p1, p2 = computeCollisionTime(et1, et2, collisionDistanceThreshold, timeHorizon) + if t <= timeHorizon: + nCollisions += 1 + # take into account probabilities ?? + nSamples = float(len(predictedTrajectories1)*len(predictedTrajectories2)) + collisionProbabilities[i] = float(nCollisions)/nSamples + out.write('{0} {1} {2} {3} {4}\n'.format(obj1.num, obj2.num, nSamples, i, collisionProbabilities[i])) + + if debug: + from matplotlib.pyplot import figure, axis, title + figure() + for et in predictedTrajectories1: + et.predictPosition(timeHorizon) + et.draw('rx') + + for et in predictedTrajectories2: + et.predictPosition(timeHorizon) + et.draw('bx') + obj1.draw('r') + obj2.draw('b') + title('instant {0}'.format(i)) + axis('equal') + + return collisionProbabilities + + +if __name__ == "__main__": + import doctest + import unittest + suite = doctest.DocFileSuite('tests/prediction.txt') + #suite = doctest.DocTestSuite() + unittest.TextTestRunner().run(suite) + #doctest.testmod() + #doctest.testfile("example.txt") +