Mercurial Hosting > traffic-intelligence
changeset 268:0c0b92f621f6
reorganized to compute evasive action for multiple positions
author | Nicolas Saunier <nicolas.saunier@polymtl.ca> |
---|---|
date | Sat, 28 Jul 2012 02:58:47 -0400 |
parents | 32e88b513f5c |
children | a9988971aac8 |
files | python/extrapolation.py python/moving.py |
diffstat | 2 files changed, 95 insertions(+), 38 deletions(-) [+] |
line wrap: on
line diff
--- a/python/extrapolation.py Fri Jul 27 20:32:14 2012 -0400 +++ b/python/extrapolation.py Sat Jul 28 02:58:47 2012 -0400 @@ -3,6 +3,7 @@ import moving import math +import random class ExtrapolatedTrajectory: '''Class for extrapolated trajectories with lazy evaluation @@ -70,36 +71,90 @@ def __str__(self): return '{0} {1}'.format(self.name, self.maxSpeed) - # refactor with classes and subclasses - def createExtrapolatedTrajectories(self, obj, instant): - '''extrapolationParameters specific to each method (in name field) ''' +class ConstantExtrapolationParameters(ExtrapolationParameters): + def __init__(self, maxSpeed): + ExtrapolationParameters.__init__(self, 'constant velocity', maxSpeed) + + def generateExtrapolatedTrajectories(self, obj, instant): + return [ExtrapolatedTrajectoryConstant(obj.getPositionAtInstant(instant), obj.getVelocityAtInstant(instant), + maxSpeed = self.maxSpeed)] + +class NormalAdaptationExtrapolationParameters(ExtrapolationParameters): + def __init__(self, maxSpeed, nExtrapolatedTrajectories, maxAcceleration, maxSteering): + ExtrapolationParameters.__init__(self, 'normal adaptation', maxSpeed) + self.nExtrapolatedTrajectories = nExtrapolatedTrajectories + self.maxAcceleration = maxAcceleration + self.maxSteering = maxSteering + self.accelerationDistribution = lambda: random.triangular(-self.maxAcceleration, + self.maxAcceleration, 0.) + self.steeringDistribution = lambda: random.triangular(-self.maxSteering, + self.maxSteering, 0.) + + def __str__(self): + return ExtrapolationParameters.__str__(self)+' {0} {1} {2}'.format(self.nExtrapolatedTrajectories, + self.maxAcceleration, + self.maxSteering) + + def generateExtrapolatedTrajectories(self, obj, instant): extrapolatedTrajectories = [] - if self.name == 'constant velocity': - extrapolatedTrajectories = [ExtrapolatedTrajectoryConstant(obj.getPositionAtInstant(instant), obj.getVelocityAtInstant(instant), - maxSpeed = self.maxSpeed)] - elif self.name == 'normal adaptation': - for i in xrange(self.nExtrapolatedTrajectories): - extrapolatedTrajectories.append(ExtrapolatedTrajectoryNormalAdaptation(obj.getPositionAtInstant(instant), - obj.getVelocityAtInstant(instant), - self.accelerationDistribution, - self.steeringDistribution, - maxSpeed = self.maxSpeed)) - elif self.name == 'pointset': + for i in xrange(self.nExtrapolatedTrajectories): + extrapolatedTrajectories.append(ExtrapolatedTrajectoryNormalAdaptation(obj.getPositionAtInstant(instant), + obj.getVelocityAtInstant(instant), + self.accelerationDistribution, + self.steeringDistribution, + maxSpeed = self.maxSpeed)) + return extrapolatedTrajectories + +class PointSetExtrapolationParameters(ExtrapolationParameters): + def __init__(self, maxSpeed): + ExtrapolationParameters.__init__(self, 'point set', maxSpeed) + + def generateExtrapolatedTrajectories(self, obj, instant): + extrapolatedTrajectories = [] + features = [f for f in obj.features if f.existsAtInstant(instant)] + positions = [f.getPositionAtInstant(instant) for f in features] + velocities = [f.getVelocityAtInstant(instant) for f in features] + for initialPosition,initialVelocity in zip(positions, velocities): + extrapolatedTrajectories.append(ExtrapolatedTrajectoryConstant(initialPosition, initialVelocity, + maxSpeed = self.maxSpeed)) + return extrapolatedTrajectories + +class EvasiveActionExtrapolationParameters(ExtrapolationParameters): + def __init__(self, maxSpeed, nExtrapolatedTrajectories, minAcceleration, maxAcceleration, maxSteering, useFeatures = False): + if useFeatures: + name = 'point set evasive action' + else: + name = 'evasive action' + ExtrapolationParameters.__init__(self, name, maxSpeed) + self.nExtrapolatedTrajectories = nExtrapolatedTrajectories + self.minAcceleration = minAcceleration + self.maxAcceleration = maxAcceleration + self.maxSteering = maxSteering + self.useFeatures = useFeatures + self.accelerationDistribution = lambda: random.triangular(self.minAcceleration, + self.maxAcceleration, 0.) + self.steeringDistribution = lambda: random.triangular(-self.maxSteering, + self.maxSteering, 0.) + + def __str__(self): + return ExtrapolationParameters.__str__(self)+' {0} {1} {2} {3}'.format(self.nExtrapolatedTrajectories, self.minAcceleration, self.maxAcceleration, self.maxSteering) + + def generateExtrapolatedTrajectories(self, obj, instant): + extrapolatedTrajectories = [] + if self.useFeatures: features = [f for f in obj.features if f.existsAtInstant(instant)] positions = [f.getPositionAtInstant(instant) for f in features] velocities = [f.getVelocityAtInstant(instant) for f in features] + else: + positions = [obj.getPositionAtInstant(instant)] + velocities = [obj.getVelocityAtInstant(instant)] + for i in xrange(self.nExtrapolatedTrajectories): for initialPosition,initialVelocity in zip(positions, velocities): - extrapolatedTrajectories.append(ExtrapolatedTrajectoryConstant(initialPosition, initialVelocity, + extrapolatedTrajectories.append(ExtrapolatedTrajectoryConstant(initialPosition, + initialVelocity, + moving.NormAngle(self.accelerationDistribution(), + self.steeringDistribution()), maxSpeed = self.maxSpeed)) - elif self.name == 'evasive action': - for i in xrange(self.nExtrapolatedTrajectories): - - extrapolatedTrajectories.append(ExtrapolatedTrajectoryConstant(obj.getPositionAtInstant(instant), - obj.getVelocityAtInstant(instant), - moving.NormAngle(self.accelerationDistribution(), self.steeringDistribution()), - maxSpeed = self.maxSpeed)) - else: - print('Unknown extrapolation hypothesis') return extrapolatedTrajectories class SafetyPoint(moving.Point): @@ -132,8 +187,8 @@ def computeCrossingsCollisionsAtInstant(i, obj1, obj2, extrapolationParameters, collisionDistanceThreshold, timeHorizon): '''returns the lists of collision points and crossing zones ''' - extrapolatedTrajectories1 = extrapolationParameters.createExtrapolatedTrajectories(obj1, i) - extrapolatedTrajectories2 = extrapolationParameters.createExtrapolatedTrajectories(obj2, i) + extrapolatedTrajectories1 = extrapolationParameters.generateExtrapolatedTrajectories(obj1, i) + extrapolatedTrajectories2 = extrapolationParameters.generateExtrapolatedTrajectories(obj2, i) collisionPoints = [] crossingZones = [] @@ -164,8 +219,6 @@ def computeCrossingsCollisions(obj1, obj2, extrapolationParameters, collisionDistanceThreshold, timeHorizon, outCP, outCZ, debug = False, timeInterval = None): collisionPoints={} crossingZones={} - #TTCs = {} - #pPETs = {} if timeInterval: commonTimeInterval = timeInterval else: @@ -173,10 +226,6 @@ for i in list(commonTimeInterval)[:-1]: # do not look at the 1 last position/velocities, often with errors print(obj1.num, obj2.num, i) collisionPoints[i], crossingZones[i] = computeCrossingsCollisionsAtInstant(i, obj1, obj2, extrapolationParameters, collisionDistanceThreshold, timeHorizon) - # if len(collisionPoints[i]) > 0: - # TTCs[i] = extrapolation.computeExpectedIndicator(collisionPoints[i]) - # if len(crossingZones[i]) > 0: - # pPETs[i] = extrapolation.computeExpectedIndicator(crossingZones[i]) SafetyPoint.save(outCP, collisionPoints[i], obj1.num, obj2.num, i) SafetyPoint.save(outCZ, crossingZones[i], obj1.num, obj2.num, i) @@ -197,21 +246,26 @@ return collisionPoints, crossingZones -def computeCollisionProbability(obj1, obj2, extrapolationParameters, collisionDistanceThreshold, timeHorizon, out, debug = False): +def computeCollisionProbability(obj1, obj2, extrapolationParameters, collisionDistanceThreshold, timeHorizon, out, debug = False, timeInterval = None): collisionProbabilities = {} - for i in list(obj1.commonTimeInterval(obj2))[:-1]: + if timeInterval: + commonTimeInterval = timeInterval + else: + commonTimeInterval = obj1.commonTimeInterval(obj2) + for i in list(commonTimeInterval)[:-1]: nCollisions = 0 print(obj1.num, obj2.num, i) - extrapolatedTrajectories1 = extrapolationParameters.createExtrapolatedTrajectories(obj1, i) - extrapolatedTrajectories2 = extrapolationParameters.createExtrapolatedTrajectories(obj2, i) + extrapolatedTrajectories1 = extrapolationParameters.generateExtrapolatedTrajectories(obj1, i) + extrapolatedTrajectories2 = extrapolationParameters.generateExtrapolatedTrajectories(obj2, i) for et1 in extrapolatedTrajectories1: for et2 in extrapolatedTrajectories2: t, p1, p2 = computeCollisionTime(et1, et2, collisionDistanceThreshold, timeHorizon) if t <= timeHorizon: nCollisions += 1 # take into account probabilities ?? - collisionProbabilities[i] = float(nCollisions)/extrapolationParameters.nExtrapolatedTrajectories**2 - out.write('{0} {1} {2} {3} {4}\n'.format(obj1.num, obj2.num, extrapolationParameters.nExtrapolatedTrajectories, i, collisionProbabilities[i])) + nSamples = float(len(extrapolatedTrajectories1)*len(extrapolatedTrajectories2)) + collisionProbabilities[i] = float(nCollisions)/nSamples + out.write('{0} {1} {2} {3} {4}\n'.format(obj1.num, obj2.num, nSamples, i, collisionProbabilities[i])) if debug: from matplotlib.pyplot import figure, axis, title
--- a/python/moving.py Fri Jul 27 20:32:14 2012 -0400 +++ b/python/moving.py Sat Jul 28 02:58:47 2012 -0400 @@ -544,6 +544,9 @@ def getVelocities(self): return self.velocities + def setFeatures(self, features): + self.features = [features[i] for i in self.featureNumbers] + def getSpeeds(self): return self.getVelocities().norm()