Mercurial Hosting > traffic-intelligence
changeset 267:32e88b513f5c
added code to compute probability of collision
author | Nicolas Saunier <nicolas.saunier@polymtl.ca> |
---|---|
date | Fri, 27 Jul 2012 20:32:14 -0400 |
parents | aba9711b3149 |
children | 0c0b92f621f6 |
files | python/extrapolation.py |
diffstat | 1 files changed, 76 insertions(+), 37 deletions(-) [+] |
line wrap: on
line diff
--- a/python/extrapolation.py Fri Jul 27 10:29:24 2012 -0400 +++ b/python/extrapolation.py Fri Jul 27 20:32:14 2012 -0400 @@ -70,32 +70,37 @@ def __str__(self): return '{0} {1}'.format(self.name, self.maxSpeed) -def createExtrapolatedTrajectories(extrapolationParameters, obj, instant): - '''extrapolationParameters specific to each method (in name field) ''' - if extrapolationParameters.name == 'constant velocity': - return [ExtrapolatedTrajectoryConstant(obj.getPositionAtInstant(instant), obj.getVelocityAtInstant(instant), - maxSpeed = extrapolationParameters.maxSpeed)] - elif extrapolationParameters.name == 'normal adaptation': + # refactor with classes and subclasses + def createExtrapolatedTrajectories(self, obj, instant): + '''extrapolationParameters specific to each method (in name field) ''' extrapolatedTrajectories = [] - for i in xrange(extrapolationParameters.nExtrapolatedTrajectories): - extrapolatedTrajectories.append(ExtrapolatedTrajectoryNormalAdaptation(obj.getPositionAtInstant(instant), - obj.getVelocityAtInstant(instant), - extrapolationParameters.accelerationDistribution, - extrapolationParameters.steeringDistribution, - maxSpeed = extrapolationParameters.maxSpeed)) + if self.name == 'constant velocity': + extrapolatedTrajectories = [ExtrapolatedTrajectoryConstant(obj.getPositionAtInstant(instant), obj.getVelocityAtInstant(instant), + maxSpeed = self.maxSpeed)] + elif self.name == 'normal adaptation': + for i in xrange(self.nExtrapolatedTrajectories): + extrapolatedTrajectories.append(ExtrapolatedTrajectoryNormalAdaptation(obj.getPositionAtInstant(instant), + obj.getVelocityAtInstant(instant), + self.accelerationDistribution, + self.steeringDistribution, + maxSpeed = self.maxSpeed)) + elif self.name == 'pointset': + features = [f for f in obj.features if f.existsAtInstant(instant)] + positions = [f.getPositionAtInstant(instant) for f in features] + velocities = [f.getVelocityAtInstant(instant) for f in features] + for initialPosition,initialVelocity in zip(positions, velocities): + extrapolatedTrajectories.append(ExtrapolatedTrajectoryConstant(initialPosition, initialVelocity, + maxSpeed = self.maxSpeed)) + elif self.name == 'evasive action': + for i in xrange(self.nExtrapolatedTrajectories): + + extrapolatedTrajectories.append(ExtrapolatedTrajectoryConstant(obj.getPositionAtInstant(instant), + obj.getVelocityAtInstant(instant), + moving.NormAngle(self.accelerationDistribution(), self.steeringDistribution()), + maxSpeed = self.maxSpeed)) + else: + print('Unknown extrapolation hypothesis') return extrapolatedTrajectories - elif extrapolationParameters.name == 'pointset': - extrapolatedTrajectories = [] - features = [f for f in obj.features if f.existsAtInstant(instant)] - positions = [f.getPositionAtInstant(instant) for f in features] - velocities = [f.getVelocityAtInstant(instant) for f in features] - for initialPosition,initialVelocity in zip(positions, velocities): - extrapolatedTrajectories.append(ExtrapolatedTrajectoryConstant(initialPosition, initialVelocity, - maxSpeed = extrapolationParameters.maxSpeed)) - return extrapolatedTrajectories - else: - print('Unknown extrapolation hypothesis') - return [] class SafetyPoint(moving.Point): '''Can represent a collision point or crossing zone @@ -115,23 +120,27 @@ from numpy import sum return sum([p.indicator*p.probability for p in points])/sum([p.probability for p in points]) +def computeCollisionTime(extrapolatedTrajectory1, extrapolatedTrajectory2, collisionDistanceThreshold, timeHorizon): + t = 1 + p1 = extrapolatedTrajectory1.predictPosition(t) + p2 = extrapolatedTrajectory2.predictPosition(t) + while t <= timeHorizon and (p1-p2).norm2() > collisionDistanceThreshold: + p1 = extrapolatedTrajectory1.predictPosition(t) + p2 = extrapolatedTrajectory2.predictPosition(t) + t += 1 + return t, p1, p2 + def computeCrossingsCollisionsAtInstant(i, obj1, obj2, extrapolationParameters, collisionDistanceThreshold, timeHorizon): '''returns the lists of collision points and crossing zones ''' - extrapolatedTrajectories1 = createExtrapolatedTrajectories(extrapolationParameters, obj1, i) - extrapolatedTrajectories2 = createExtrapolatedTrajectories(extrapolationParameters, obj2, i) + extrapolatedTrajectories1 = extrapolationParameters.createExtrapolatedTrajectories(obj1, i) + extrapolatedTrajectories2 = extrapolationParameters.createExtrapolatedTrajectories(obj2, i) collisionPoints = [] crossingZones = [] for et1 in extrapolatedTrajectories1: for et2 in extrapolatedTrajectories2: - t = 1 - p1 = et1.predictPosition(t) - p2 = et2.predictPosition(t) - while t <= timeHorizon and (p1-p2).norm2() > collisionDistanceThreshold: - p1 = et1.predictPosition(t) - p2 = et2.predictPosition(t) - t += 1 - + t, p1, p2 = computeCollisionTime(et1, et2, collisionDistanceThreshold, timeHorizon) + if t <= timeHorizon: collisionPoints.append(SafetyPoint((p1+p2).multiply(0.5), et1.probability*et2.probability, t)) else: # check if there is a crossing zone @@ -185,11 +194,41 @@ et.draw('bx') title('instant {0}'.format(i)) axis('equal') - - - # probability of successful evasive action = 1-P(Collision) using larger control values return collisionPoints, crossingZones + +def computeCollisionProbability(obj1, obj2, extrapolationParameters, collisionDistanceThreshold, timeHorizon, out, debug = False): + collisionProbabilities = {} + for i in list(obj1.commonTimeInterval(obj2))[:-1]: + nCollisions = 0 + print(obj1.num, obj2.num, i) + extrapolatedTrajectories1 = extrapolationParameters.createExtrapolatedTrajectories(obj1, i) + extrapolatedTrajectories2 = extrapolationParameters.createExtrapolatedTrajectories(obj2, i) + for et1 in extrapolatedTrajectories1: + for et2 in extrapolatedTrajectories2: + t, p1, p2 = computeCollisionTime(et1, et2, collisionDistanceThreshold, timeHorizon) + if t <= timeHorizon: + nCollisions += 1 + # take into account probabilities ?? + collisionProbabilities[i] = float(nCollisions)/extrapolationParameters.nExtrapolatedTrajectories**2 + out.write('{0} {1} {2} {3} {4}\n'.format(obj1.num, obj2.num, extrapolationParameters.nExtrapolatedTrajectories, i, collisionProbabilities[i])) + + if debug: + from matplotlib.pyplot import figure, axis, title + figure() + obj1.draw('r') + obj2.draw('b') + for et in extrapolatedTrajectories1: + et.predictPosition(timeHorizon) + et.draw('rx') + + for et in extrapolatedTrajectories2: + et.predictPosition(timeHorizon) + et.draw('bx') + title('instant {0}'.format(i)) + axis('equal') + + return collisionProbabilities ###############