Mercurial Hosting > traffic-intelligence
changeset 271:bbd9c09e6869
changed the names to prediction methods and predicted trajectories
author | Nicolas Saunier <nicolas.saunier@polymtl.ca> |
---|---|
date | Thu, 09 Aug 2012 15:18:20 -0400 |
parents | 05c9b0cb8202 |
children | e34698d93b23 |
files | python/extrapolation.py python/prediction.py python/tests/extrapolation.txt python/tests/prediction.txt |
diffstat | 4 files changed, 325 insertions(+), 325 deletions(-) [+] |
line wrap: on
line diff
--- a/python/extrapolation.py Thu Aug 02 05:35:57 2012 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,300 +0,0 @@ -#! /usr/bin/env python -'''Library for moving object extrapolation hypotheses''' - -import moving -import math -import random - -class ExtrapolatedTrajectory: - '''Class for extrapolated trajectories with lazy evaluation - if the predicted position has not been already computed, compute it - - it should also have a probability''' - - def __init__(self): - self.probability = 0. - self.predictedPositions = {} - self.predictedSpeedOrientations = {} - self.collisionPoints = {} - self.crossingZones = {} - - def predictPosition(self, nTimeSteps): - if nTimeSteps > 0 and not nTimeSteps in self.predictedPositions.keys(): - self.predictPosition(nTimeSteps-1) - self.predictedPositions[nTimeSteps], self.predictedSpeedOrientations[nTimeSteps] = moving.predictPosition(self.predictedPositions[nTimeSteps-1], self.predictedSpeedOrientations[nTimeSteps-1], self.getControl(), self.maxSpeed) - return self.predictedPositions[nTimeSteps] - - def getPredictedTrajectory(self): - return moving.Trajectory.fromPointList(self.predictedPositions.values()) - - def getPredictedSpeeds(self): - return [so.norm for so in self.predictedSpeedOrientations.values()] - - def draw(self, options = '', withOrigin = False, timeStep = 1, **kwargs): - self.getPredictedTrajectory().draw(options, withOrigin, timeStep, **kwargs) - -class ExtrapolatedTrajectoryConstant(ExtrapolatedTrajectory): - '''Extrapolated trajectory at constant speed or acceleration - TODO generalize by passing a series of velocities/accelerations''' - - def __init__(self, initialPosition, initialVelocity, control = moving.NormAngle(0,0), probability = 1, maxSpeed = None): - self.control = control - self.maxSpeed = maxSpeed - self.probability = probability - self.predictedPositions = {0: initialPosition} - self.predictedSpeedOrientations = {0: moving.NormAngle.fromPoint(initialVelocity)} - - def getControl(self): - return self.control - -class ExtrapolatedTrajectoryNormalAdaptation(ExtrapolatedTrajectory): - '''Random small adaptation of vehicle control ''' - def __init__(self, initialPosition, initialVelocity, accelerationDistribution, steeringDistribution, probability = 1, maxSpeed = None): - '''Constructor - accelerationDistribution and steeringDistribution are distributions - that return random numbers drawn from them''' - self.accelerationDistribution = accelerationDistribution - self.steeringDistribution = steeringDistribution - self.maxSpeed = maxSpeed - self.probability = probability - self.predictedPositions = {0: initialPosition} - self.predictedSpeedOrientations = {0: moving.NormAngle.fromPoint(initialVelocity)} - - def getControl(self): - return moving.NormAngle(self.accelerationDistribution(),self.steeringDistribution()) - -class ExtrapolationParameters: - def __init__(self, name, maxSpeed): - self.name = name - self.maxSpeed = maxSpeed - - def __str__(self): - return '{0} {1}'.format(self.name, self.maxSpeed) - -class ConstantExtrapolationParameters(ExtrapolationParameters): - def __init__(self, maxSpeed): - ExtrapolationParameters.__init__(self, 'constant velocity', maxSpeed) - - def generateExtrapolatedTrajectories(self, obj, instant): - return [ExtrapolatedTrajectoryConstant(obj.getPositionAtInstant(instant), obj.getVelocityAtInstant(instant), - maxSpeed = self.maxSpeed)] - -class NormalAdaptationExtrapolationParameters(ExtrapolationParameters): - def __init__(self, maxSpeed, nExtrapolatedTrajectories, maxAcceleration, maxSteering): - ExtrapolationParameters.__init__(self, 'normal adaptation', maxSpeed) - self.nExtrapolatedTrajectories = nExtrapolatedTrajectories - self.maxAcceleration = maxAcceleration - self.maxSteering = maxSteering - self.accelerationDistribution = lambda: random.triangular(-self.maxAcceleration, - self.maxAcceleration, 0.) - self.steeringDistribution = lambda: random.triangular(-self.maxSteering, - self.maxSteering, 0.) - - def __str__(self): - return ExtrapolationParameters.__str__(self)+' {0} {1} {2}'.format(self.nExtrapolatedTrajectories, - self.maxAcceleration, - self.maxSteering) - - def generateExtrapolatedTrajectories(self, obj, instant): - extrapolatedTrajectories = [] - for i in xrange(self.nExtrapolatedTrajectories): - extrapolatedTrajectories.append(ExtrapolatedTrajectoryNormalAdaptation(obj.getPositionAtInstant(instant), - obj.getVelocityAtInstant(instant), - self.accelerationDistribution, - self.steeringDistribution, - maxSpeed = self.maxSpeed)) - return extrapolatedTrajectories - -class PointSetExtrapolationParameters(ExtrapolationParameters): - # todo generate several trajectories with normal adaptatoins from each position (feature) - def __init__(self, maxSpeed): - ExtrapolationParameters.__init__(self, 'point set', maxSpeed) - - def generateExtrapolatedTrajectories(self, obj, instant): - extrapolatedTrajectories = [] - features = [f for f in obj.features if f.existsAtInstant(instant)] - positions = [f.getPositionAtInstant(instant) for f in features] - velocities = [f.getVelocityAtInstant(instant) for f in features] - for initialPosition,initialVelocity in zip(positions, velocities): - extrapolatedTrajectories.append(ExtrapolatedTrajectoryConstant(initialPosition, initialVelocity, - maxSpeed = self.maxSpeed)) - return extrapolatedTrajectories - -class EvasiveActionExtrapolationParameters(ExtrapolationParameters): - def __init__(self, maxSpeed, nExtrapolatedTrajectories, minAcceleration, maxAcceleration, maxSteering, useFeatures = False): - if useFeatures: - name = 'point set evasive action' - else: - name = 'evasive action' - ExtrapolationParameters.__init__(self, name, maxSpeed) - self.nExtrapolatedTrajectories = nExtrapolatedTrajectories - self.minAcceleration = minAcceleration - self.maxAcceleration = maxAcceleration - self.maxSteering = maxSteering - self.useFeatures = useFeatures - self.accelerationDistribution = lambda: random.triangular(self.minAcceleration, - self.maxAcceleration, 0.) - self.steeringDistribution = lambda: random.triangular(-self.maxSteering, - self.maxSteering, 0.) - - def __str__(self): - return ExtrapolationParameters.__str__(self)+' {0} {1} {2} {3}'.format(self.nExtrapolatedTrajectories, self.minAcceleration, self.maxAcceleration, self.maxSteering) - - def generateExtrapolatedTrajectories(self, obj, instant): - extrapolatedTrajectories = [] - if self.useFeatures: - features = [f for f in obj.features if f.existsAtInstant(instant)] - positions = [f.getPositionAtInstant(instant) for f in features] - velocities = [f.getVelocityAtInstant(instant) for f in features] - else: - positions = [obj.getPositionAtInstant(instant)] - velocities = [obj.getVelocityAtInstant(instant)] - for i in xrange(self.nExtrapolatedTrajectories): - for initialPosition,initialVelocity in zip(positions, velocities): - extrapolatedTrajectories.append(ExtrapolatedTrajectoryConstant(initialPosition, - initialVelocity, - moving.NormAngle(self.accelerationDistribution(), - self.steeringDistribution()), - maxSpeed = self.maxSpeed)) - return extrapolatedTrajectories - -class SafetyPoint(moving.Point): - '''Can represent a collision point or crossing zone - with respective safety indicator, TTC or pPET''' - def __init__(self, p, probability = 1., indicator = -1): - self.x = p.x - self.y = p.y - self.probability = probability - self.indicator = indicator - - @staticmethod - def save(out, points, objNum1, objNum2, instant): - for p in points: - out.write('{0} {1} {2} {3} {4} {5} {6}\n'.format(objNum1, objNum2, instant, p.x, p.y, p.probability, p.indicator)) - -def computeExpectedIndicator(points): - from numpy import sum - return sum([p.indicator*p.probability for p in points])/sum([p.probability for p in points]) - -def computeCollisionTime(extrapolatedTrajectory1, extrapolatedTrajectory2, collisionDistanceThreshold, timeHorizon): - t = 1 - p1 = extrapolatedTrajectory1.predictPosition(t) - p2 = extrapolatedTrajectory2.predictPosition(t) - while t <= timeHorizon and (p1-p2).norm2() > collisionDistanceThreshold: - p1 = extrapolatedTrajectory1.predictPosition(t) - p2 = extrapolatedTrajectory2.predictPosition(t) - t += 1 - return t, p1, p2 - -def computeCrossingsCollisionsAtInstant(i, obj1, obj2, extrapolationParameters, collisionDistanceThreshold, timeHorizon, debug = False): - '''returns the lists of collision points and crossing zones - - Check: Extrapolating all the points together, as if they represent the whole vehicle''' - extrapolatedTrajectories1 = extrapolationParameters.generateExtrapolatedTrajectories(obj1, i) - extrapolatedTrajectories2 = extrapolationParameters.generateExtrapolatedTrajectories(obj2, i) - - collisionPoints = [] - crossingZones = [] - for et1 in extrapolatedTrajectories1: - for et2 in extrapolatedTrajectories2: - t, p1, p2 = computeCollisionTime(et1, et2, collisionDistanceThreshold, timeHorizon) - - if t <= timeHorizon: - collisionPoints.append(SafetyPoint((p1+p2).multiply(0.5), et1.probability*et2.probability, t)) - else: # check if there is a crossing zone - # TODO? zone should be around the points at which the traj are the closest - # look for CZ at different times, otherwise it would be a collision - # an approximation would be to look for close points at different times, ie the complementary of collision points - cz = None - t1 = 0 - while not cz and t1 < timeHorizon: # t1 <= timeHorizon-1 - t2 = 0 - while not cz and t2 < timeHorizon: - #if (et1.predictPosition(t1)-et2.predictPosition(t2)).norm2() < collisionDistanceThreshold: - # cz = (et1.predictPosition(t1)+et2.predictPosition(t2)).multiply(0.5) - cz = moving.segmentIntersection(et1.predictPosition(t1), et1.predictPosition(t1+1), et2.predictPosition(t2), et2.predictPosition(t2+1)) - if cz: - crossingZones.append(SafetyPoint(cz, et1.probability*et2.probability, abs(t1-t2))) - t2 += 1 - t1 += 1 - - if debug: - from matplotlib.pyplot import figure, axis, title - figure() - for et in extrapolatedTrajectories1: - et.predictPosition(timeHorizon) - et.draw('rx') - - for et in extrapolatedTrajectories2: - et.predictPosition(timeHorizon) - et.draw('bx') - obj1.draw('r') - obj2.draw('b') - title('instant {0}'.format(i)) - axis('equal') - - return collisionPoints, crossingZones - -def computeCrossingsCollisions(obj1, obj2, extrapolationParameters, collisionDistanceThreshold, timeHorizon, outCP, outCZ, debug = False, timeInterval = None): - collisionPoints={} - crossingZones={} - if timeInterval: - commonTimeInterval = timeInterval - else: - commonTimeInterval = obj1.commonTimeInterval(obj2) - for i in list(commonTimeInterval)[:-1]: # do not look at the 1 last position/velocities, often with errors - print(obj1.num, obj2.num, i) - collisionPoints[i], crossingZones[i] = computeCrossingsCollisionsAtInstant(i, obj1, obj2, extrapolationParameters, collisionDistanceThreshold, timeHorizon, debug) - SafetyPoint.save(outCP, collisionPoints[i], obj1.num, obj2.num, i) - SafetyPoint.save(outCZ, crossingZones[i], obj1.num, obj2.num, i) - - return collisionPoints, crossingZones - -def computeCollisionProbability(obj1, obj2, extrapolationParameters, collisionDistanceThreshold, timeHorizon, out, debug = False, timeInterval = None): - collisionProbabilities = {} - if timeInterval: - commonTimeInterval = timeInterval - else: - commonTimeInterval = obj1.commonTimeInterval(obj2) - for i in list(commonTimeInterval)[:-1]: - nCollisions = 0 - print(obj1.num, obj2.num, i) - extrapolatedTrajectories1 = extrapolationParameters.generateExtrapolatedTrajectories(obj1, i) - extrapolatedTrajectories2 = extrapolationParameters.generateExtrapolatedTrajectories(obj2, i) - for et1 in extrapolatedTrajectories1: - for et2 in extrapolatedTrajectories2: - t, p1, p2 = computeCollisionTime(et1, et2, collisionDistanceThreshold, timeHorizon) - if t <= timeHorizon: - nCollisions += 1 - # take into account probabilities ?? - nSamples = float(len(extrapolatedTrajectories1)*len(extrapolatedTrajectories2)) - collisionProbabilities[i] = float(nCollisions)/nSamples - out.write('{0} {1} {2} {3} {4}\n'.format(obj1.num, obj2.num, nSamples, i, collisionProbabilities[i])) - - if debug: - from matplotlib.pyplot import figure, axis, title - figure() - for et in extrapolatedTrajectories1: - et.predictPosition(timeHorizon) - et.draw('rx') - - for et in extrapolatedTrajectories2: - et.predictPosition(timeHorizon) - et.draw('bx') - obj1.draw('r') - obj2.draw('b') - title('instant {0}'.format(i)) - axis('equal') - - return collisionProbabilities - - -if __name__ == "__main__": - import doctest - import unittest - suite = doctest.DocFileSuite('tests/extrapolation.txt') - #suite = doctest.DocTestSuite() - unittest.TextTestRunner().run(suite) - #doctest.testmod() - #doctest.testfile("example.txt") -
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/python/prediction.py Thu Aug 09 15:18:20 2012 -0400 @@ -0,0 +1,300 @@ +#! /usr/bin/env python +'''Library for motion prediction methods''' + +import moving +import math +import random + +class PredictedTrajectory: + '''Class for predicted trajectories with lazy evaluation + if the predicted position has not been already computed, compute it + + it should also have a probability''' + + def __init__(self): + self.probability = 0. + self.predictedPositions = {} + self.predictedSpeedOrientations = {} + self.collisionPoints = {} + self.crossingZones = {} + + def predictPosition(self, nTimeSteps): + if nTimeSteps > 0 and not nTimeSteps in self.predictedPositions.keys(): + self.predictPosition(nTimeSteps-1) + self.predictedPositions[nTimeSteps], self.predictedSpeedOrientations[nTimeSteps] = moving.predictPosition(self.predictedPositions[nTimeSteps-1], self.predictedSpeedOrientations[nTimeSteps-1], self.getControl(), self.maxSpeed) + return self.predictedPositions[nTimeSteps] + + def getPredictedTrajectory(self): + return moving.Trajectory.fromPointList(self.predictedPositions.values()) + + def getPredictedSpeeds(self): + return [so.norm for so in self.predictedSpeedOrientations.values()] + + def draw(self, options = '', withOrigin = False, timeStep = 1, **kwargs): + self.getPredictedTrajectory().draw(options, withOrigin, timeStep, **kwargs) + +class PredictedTrajectoryConstant(PredictedTrajectory): + '''Predicted trajectory at constant speed or acceleration + TODO generalize by passing a series of velocities/accelerations''' + + def __init__(self, initialPosition, initialVelocity, control = moving.NormAngle(0,0), probability = 1, maxSpeed = None): + self.control = control + self.maxSpeed = maxSpeed + self.probability = probability + self.predictedPositions = {0: initialPosition} + self.predictedSpeedOrientations = {0: moving.NormAngle.fromPoint(initialVelocity)} + + def getControl(self): + return self.control + +class PredictedTrajectoryNormalAdaptation(PredictedTrajectory): + '''Random small adaptation of vehicle control ''' + def __init__(self, initialPosition, initialVelocity, accelerationDistribution, steeringDistribution, probability = 1, maxSpeed = None): + '''Constructor + accelerationDistribution and steeringDistribution are distributions + that return random numbers drawn from them''' + self.accelerationDistribution = accelerationDistribution + self.steeringDistribution = steeringDistribution + self.maxSpeed = maxSpeed + self.probability = probability + self.predictedPositions = {0: initialPosition} + self.predictedSpeedOrientations = {0: moving.NormAngle.fromPoint(initialVelocity)} + + def getControl(self): + return moving.NormAngle(self.accelerationDistribution(),self.steeringDistribution()) + +class PredictionParameters: + def __init__(self, name, maxSpeed): + self.name = name + self.maxSpeed = maxSpeed + + def __str__(self): + return '{0} {1}'.format(self.name, self.maxSpeed) + +class ConstantPredictionParameters(PredictionParameters): + def __init__(self, maxSpeed): + PredictionParameters.__init__(self, 'constant velocity', maxSpeed) + + def generatePredictedTrajectories(self, obj, instant): + return [PredictedTrajectoryConstant(obj.getPositionAtInstant(instant), obj.getVelocityAtInstant(instant), + maxSpeed = self.maxSpeed)] + +class NormalAdaptationPredictionParameters(PredictionParameters): + def __init__(self, maxSpeed, nPredictedTrajectories, maxAcceleration, maxSteering): + PredictionParameters.__init__(self, 'normal adaptation', maxSpeed) + self.nPredictedTrajectories = nPredictedTrajectories + self.maxAcceleration = maxAcceleration + self.maxSteering = maxSteering + self.accelerationDistribution = lambda: random.triangular(-self.maxAcceleration, + self.maxAcceleration, 0.) + self.steeringDistribution = lambda: random.triangular(-self.maxSteering, + self.maxSteering, 0.) + + def __str__(self): + return PredictionParameters.__str__(self)+' {0} {1} {2}'.format(self.nPredictedTrajectories, + self.maxAcceleration, + self.maxSteering) + + def generatePredictedTrajectories(self, obj, instant): + predictedTrajectories = [] + for i in xrange(self.nPredictedTrajectories): + predictedTrajectories.append(PredictedTrajectoryNormalAdaptation(obj.getPositionAtInstant(instant), + obj.getVelocityAtInstant(instant), + self.accelerationDistribution, + self.steeringDistribution, + maxSpeed = self.maxSpeed)) + return predictedTrajectories + +class PointSetPredictionParameters(PredictionParameters): + # todo generate several trajectories with normal adaptatoins from each position (feature) + def __init__(self, maxSpeed): + PredictionParameters.__init__(self, 'point set', maxSpeed) + + def generatePredictedTrajectories(self, obj, instant): + predictedTrajectories = [] + features = [f for f in obj.features if f.existsAtInstant(instant)] + positions = [f.getPositionAtInstant(instant) for f in features] + velocities = [f.getVelocityAtInstant(instant) for f in features] + for initialPosition,initialVelocity in zip(positions, velocities): + predictedTrajectories.append(PredictedTrajectoryConstant(initialPosition, initialVelocity, + maxSpeed = self.maxSpeed)) + return predictedTrajectories + +class EvasiveActionPredictionParameters(PredictionParameters): + def __init__(self, maxSpeed, nPredictedTrajectories, minAcceleration, maxAcceleration, maxSteering, useFeatures = False): + if useFeatures: + name = 'point set evasive action' + else: + name = 'evasive action' + PredictionParameters.__init__(self, name, maxSpeed) + self.nPredictedTrajectories = nPredictedTrajectories + self.minAcceleration = minAcceleration + self.maxAcceleration = maxAcceleration + self.maxSteering = maxSteering + self.useFeatures = useFeatures + self.accelerationDistribution = lambda: random.triangular(self.minAcceleration, + self.maxAcceleration, 0.) + self.steeringDistribution = lambda: random.triangular(-self.maxSteering, + self.maxSteering, 0.) + + def __str__(self): + return PredictionParameters.__str__(self)+' {0} {1} {2} {3}'.format(self.nPredictedTrajectories, self.minAcceleration, self.maxAcceleration, self.maxSteering) + + def generatePredictedTrajectories(self, obj, instant): + predictedTrajectories = [] + if self.useFeatures: + features = [f for f in obj.features if f.existsAtInstant(instant)] + positions = [f.getPositionAtInstant(instant) for f in features] + velocities = [f.getVelocityAtInstant(instant) for f in features] + else: + positions = [obj.getPositionAtInstant(instant)] + velocities = [obj.getVelocityAtInstant(instant)] + for i in xrange(self.nPredictedTrajectories): + for initialPosition,initialVelocity in zip(positions, velocities): + predictedTrajectories.append(PredictedTrajectoryConstant(initialPosition, + initialVelocity, + moving.NormAngle(self.accelerationDistribution(), + self.steeringDistribution()), + maxSpeed = self.maxSpeed)) + return predictedTrajectories + +class SafetyPoint(moving.Point): + '''Can represent a collision point or crossing zone + with respective safety indicator, TTC or pPET''' + def __init__(self, p, probability = 1., indicator = -1): + self.x = p.x + self.y = p.y + self.probability = probability + self.indicator = indicator + + @staticmethod + def save(out, points, objNum1, objNum2, instant): + for p in points: + out.write('{0} {1} {2} {3} {4} {5} {6}\n'.format(objNum1, objNum2, instant, p.x, p.y, p.probability, p.indicator)) + +def computeExpectedIndicator(points): + from numpy import sum + return sum([p.indicator*p.probability for p in points])/sum([p.probability for p in points]) + +def computeCollisionTime(predictedTrajectory1, predictedTrajectory2, collisionDistanceThreshold, timeHorizon): + t = 1 + p1 = predictedTrajectory1.predictPosition(t) + p2 = predictedTrajectory2.predictPosition(t) + while t <= timeHorizon and (p1-p2).norm2() > collisionDistanceThreshold: + p1 = predictedTrajectory1.predictPosition(t) + p2 = predictedTrajectory2.predictPosition(t) + t += 1 + return t, p1, p2 + +def computeCrossingsCollisionsAtInstant(i, obj1, obj2, predictionParameters, collisionDistanceThreshold, timeHorizon, debug = False): + '''returns the lists of collision points and crossing zones + + Check: Predicting all the points together, as if they represent the whole vehicle''' + predictedTrajectories1 = predictionParameters.generatePredictedTrajectories(obj1, i) + predictedTrajectories2 = predictionParameters.generatePredictedTrajectories(obj2, i) + + collisionPoints = [] + crossingZones = [] + for et1 in predictedTrajectories1: + for et2 in predictedTrajectories2: + t, p1, p2 = computeCollisionTime(et1, et2, collisionDistanceThreshold, timeHorizon) + + if t <= timeHorizon: + collisionPoints.append(SafetyPoint((p1+p2).multiply(0.5), et1.probability*et2.probability, t)) + else: # check if there is a crossing zone + # TODO? zone should be around the points at which the traj are the closest + # look for CZ at different times, otherwise it would be a collision + # an approximation would be to look for close points at different times, ie the complementary of collision points + cz = None + t1 = 0 + while not cz and t1 < timeHorizon: # t1 <= timeHorizon-1 + t2 = 0 + while not cz and t2 < timeHorizon: + #if (et1.predictPosition(t1)-et2.predictPosition(t2)).norm2() < collisionDistanceThreshold: + # cz = (et1.predictPosition(t1)+et2.predictPosition(t2)).multiply(0.5) + cz = moving.segmentIntersection(et1.predictPosition(t1), et1.predictPosition(t1+1), et2.predictPosition(t2), et2.predictPosition(t2+1)) + if cz: + crossingZones.append(SafetyPoint(cz, et1.probability*et2.probability, abs(t1-t2))) + t2 += 1 + t1 += 1 + + if debug: + from matplotlib.pyplot import figure, axis, title + figure() + for et in predictedTrajectories1: + et.predictPosition(timeHorizon) + et.draw('rx') + + for et in predictedTrajectories2: + et.predictPosition(timeHorizon) + et.draw('bx') + obj1.draw('r') + obj2.draw('b') + title('instant {0}'.format(i)) + axis('equal') + + return collisionPoints, crossingZones + +def computeCrossingsCollisions(obj1, obj2, predictionParameters, collisionDistanceThreshold, timeHorizon, outCP, outCZ, debug = False, timeInterval = None): + collisionPoints={} + crossingZones={} + if timeInterval: + commonTimeInterval = timeInterval + else: + commonTimeInterval = obj1.commonTimeInterval(obj2) + for i in list(commonTimeInterval)[:-1]: # do not look at the 1 last position/velocities, often with errors + print(obj1.num, obj2.num, i) + collisionPoints[i], crossingZones[i] = computeCrossingsCollisionsAtInstant(i, obj1, obj2, predictionParameters, collisionDistanceThreshold, timeHorizon, debug) + SafetyPoint.save(outCP, collisionPoints[i], obj1.num, obj2.num, i) + SafetyPoint.save(outCZ, crossingZones[i], obj1.num, obj2.num, i) + + return collisionPoints, crossingZones + +def computeCollisionProbability(obj1, obj2, predictionParameters, collisionDistanceThreshold, timeHorizon, out, debug = False, timeInterval = None): + collisionProbabilities = {} + if timeInterval: + commonTimeInterval = timeInterval + else: + commonTimeInterval = obj1.commonTimeInterval(obj2) + for i in list(commonTimeInterval)[:-1]: + nCollisions = 0 + print(obj1.num, obj2.num, i) + predictedTrajectories1 = predictionParameters.generatePredictedTrajectories(obj1, i) + predictedTrajectories2 = predictionParameters.generatePredictedTrajectories(obj2, i) + for et1 in predictedTrajectories1: + for et2 in predictedTrajectories2: + t, p1, p2 = computeCollisionTime(et1, et2, collisionDistanceThreshold, timeHorizon) + if t <= timeHorizon: + nCollisions += 1 + # take into account probabilities ?? + nSamples = float(len(predictedTrajectories1)*len(predictedTrajectories2)) + collisionProbabilities[i] = float(nCollisions)/nSamples + out.write('{0} {1} {2} {3} {4}\n'.format(obj1.num, obj2.num, nSamples, i, collisionProbabilities[i])) + + if debug: + from matplotlib.pyplot import figure, axis, title + figure() + for et in predictedTrajectories1: + et.predictPosition(timeHorizon) + et.draw('rx') + + for et in predictedTrajectories2: + et.predictPosition(timeHorizon) + et.draw('bx') + obj1.draw('r') + obj2.draw('b') + title('instant {0}'.format(i)) + axis('equal') + + return collisionProbabilities + + +if __name__ == "__main__": + import doctest + import unittest + suite = doctest.DocFileSuite('tests/prediction.txt') + #suite = doctest.DocTestSuite() + unittest.TextTestRunner().run(suite) + #doctest.testmod() + #doctest.testfile("example.txt") +
--- a/python/tests/extrapolation.txt Thu Aug 02 05:35:57 2012 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,25 +0,0 @@ ->>> import extrapolation ->>> import moving - ->>> et = extrapolation.ExtrapolatedTrajectoryConstant(moving.Point(0,0), moving.Point(1,0)) ->>> et.predictPosition(4) # doctest:+ELLIPSIS -(0.0...,4.0...) ->>> et.predictPosition(1) # doctest:+ELLIPSIS -(0.0...,1.0...) - ->>> et = extrapolation.ExtrapolatedTrajectoryConstant(moving.Point(0,0), moving.Point(1,0), moving.NormAngle(0.1,0), maxSpeed = 2) ->>> et.predictPosition(10) # doctest:+ELLIPSIS -(0.0...,15.5...) ->>> et.predictPosition(11) # doctest:+ELLIPSIS -(0.0...,17.5...) ->>> et.predictPosition(12) # doctest:+ELLIPSIS -(0.0...,19.5...) - ->>> import random ->>> acceleration = lambda: random.uniform(-0.5,0.5) ->>> steering = lambda: random.uniform(-0.1,0.1) ->>> et = extrapolation.ExtrapolatedTrajectoryNormalAdaptation(moving.Point(0,0),moving.Point(1,1), acceleration, steering, maxSpeed = 2) ->>> p = et.predictPosition(500) ->>> from numpy import max ->>> max(et.getPredictedSpeeds()) <= 2. -True
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/python/tests/prediction.txt Thu Aug 09 15:18:20 2012 -0400 @@ -0,0 +1,25 @@ +>>> import prediction +>>> import moving + +>>> et = prediction.PredictedTrajectoryConstant(moving.Point(0,0), moving.Point(1,0)) +>>> et.predictPosition(4) # doctest:+ELLIPSIS +(4.0...,0.0...) +>>> et.predictPosition(1) # doctest:+ELLIPSIS +(1.0...,0.0...) + +>>> et = prediction.PredictedTrajectoryConstant(moving.Point(0,0), moving.Point(1,0), moving.NormAngle(0.1,0), maxSpeed = 2) +>>> et.predictPosition(10) # doctest:+ELLIPSIS +(15.5...,0.0...) +>>> et.predictPosition(11) # doctest:+ELLIPSIS +(17.5...,0.0...) +>>> et.predictPosition(12) # doctest:+ELLIPSIS +(19.5...,0.0...) + +>>> import random +>>> acceleration = lambda: random.uniform(-0.5,0.5) +>>> steering = lambda: random.uniform(-0.1,0.1) +>>> et = prediction.PredictedTrajectoryNormalAdaptation(moving.Point(0,0),moving.Point(1,1), acceleration, steering, maxSpeed = 2) +>>> p = et.predictPosition(500) +>>> from numpy import max +>>> max(et.getPredictedSpeeds()) <= 2. +True