Mercurial Hosting > traffic-intelligence
changeset 358:c41ff9f3c263
moved current method for collision points and crossing zones computation into prediction parameters (put expectedindicator in SafetyPoint)
author | Nicolas Saunier <nicolas.saunier@polymtl.ca> |
---|---|
date | Thu, 11 Jul 2013 00:17:25 -0400 |
parents | e5fe0e6d48a1 |
children | 619ae9a9a788 |
files | python/events.py python/prediction.py |
diffstat | 2 files changed, 122 insertions(+), 117 deletions(-) [+] |
line wrap: on
line diff
--- a/python/events.py Thu Jul 11 00:07:47 2013 -0400 +++ b/python/events.py Thu Jul 11 00:17:25 2013 -0400 @@ -117,9 +117,9 @@ else: commonTimeInterval = self.timeInterval for i in list(commonTimeInterval)[:-1]: # do not look at the 1 last position/velocities, often with errors - self.collisionPoints[i], self.crossingZones[i] = prediction.computeCrossingsCollisionsAtInstant(i, self.roadUser1, self.roadUser2, predictionParameters, collisionDistanceThreshold, timeHorizon, computeCZ, debug) + self.collisionPoints[i], self.crossingZones[i] = predictionParameters.computeCrossingsCollisionsAtInstant(i, self.roadUser1, self.roadUser2, collisionDistanceThreshold, timeHorizon, computeCZ, debug) if len(self.collisionPoints[i]) > 0: - TTCs[i] = prediction.computeExpectedIndicator(self.collisionPoints[i]) + TTCs[i] = prediction.SafetyPoint.computeExpectedIndicator(self.collisionPoints[i]) # add probability of collision, and probability of successful evasive action self.addIndicator(indicators.SeverityIndicator('TTC', TTCs)) @@ -127,7 +127,7 @@ pPETs = {} for i in list(commonTimeInterval)[:-1]: if len(self.crossingZones[i]) > 0: - pPETs[i] = prediction.computeExpectedIndicator(self.crossingZones[i]) + pPETs[i] = prediction.SafetyPoint.computeExpectedIndicator(self.crossingZones[i]) self.addIndicator(indicators.SeverityIndicator('pPET', pPETs)) def addVideoFilename(self,videoFilename):
--- a/python/prediction.py Thu Jul 11 00:07:47 2013 -0400 +++ b/python/prediction.py Thu Jul 11 00:17:25 2013 -0400 @@ -63,6 +63,17 @@ def getControl(self): return moving.NormAngle(self.accelerationDistribution(),self.steeringDistribution()) +def computeCollisionTime(predictedTrajectory1, predictedTrajectory2, collisionDistanceThreshold, timeHorizon): + '''Computes the first instant at which two predicted trajectories are within some distance threshold''' + t = 1 + p1 = predictedTrajectory1.predictPosition(t) + p2 = predictedTrajectory2.predictPosition(t) + while t <= timeHorizon and (p1-p2).norm2() > collisionDistanceThreshold: + p1 = predictedTrajectory1.predictPosition(t) + p2 = predictedTrajectory2.predictPosition(t) + t += 1 + return t, p1, p2 + class PredictionParameters: def __init__(self, name, maxSpeed): self.name = name @@ -71,6 +82,110 @@ def __str__(self): return '{0} {1}'.format(self.name, self.maxSpeed) + def generatePredictedTrajectories(self, obj, instant): + return [] + + def computeCrossingsCollisionsAtInstant(self, currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False): + '''returns the lists of collision points and crossing zones + + Check: Predicting all the points together, as if they represent the whole vehicle''' + predictedTrajectories1 = self.generatePredictedTrajectories(obj1, currentInstant) + predictedTrajectories2 = self.generatePredictedTrajectories(obj2, currentInstant) + + collisionPoints = [] + crossingZones = [] + for et1 in predictedTrajectories1: + for et2 in predictedTrajectories2: + t, p1, p2 = computeCollisionTime(et1, et2, collisionDistanceThreshold, timeHorizon) + + if t <= timeHorizon: + collisionPoints.append(SafetyPoint((p1+p2).multiply(0.5), et1.probability*et2.probability, t)) + elif computeCZ: # check if there is a crossing zone + # TODO? zone should be around the points at which the traj are the closest + # look for CZ at different times, otherwise it would be a collision + # an approximation would be to look for close points at different times, ie the complementary of collision points + cz = None + t1 = 0 + while not cz and t1 < timeHorizon: # t1 <= timeHorizon-1 + t2 = 0 + while not cz and t2 < timeHorizon: + #if (et1.predictPosition(t1)-et2.predictPosition(t2)).norm2() < collisionDistanceThreshold: + # cz = (et1.predictPosition(t1)+et2.predictPosition(t2)).multiply(0.5) + cz = moving.segmentIntersection(et1.predictPosition(t1), et1.predictPosition(t1+1), et2.predictPosition(t2), et2.predictPosition(t2+1)) + if cz: + crossingZones.append(SafetyPoint(cz, et1.probability*et2.probability, abs(t1-t2))) + t2 += 1 + t1 += 1 + + if debug: + from matplotlib.pyplot import figure, axis, title + figure() + for et in predictedTrajectories1: + et.predictPosition(timeHorizon) + et.draw('rx') + + for et in predictedTrajectories2: + et.predictPosition(timeHorizon) + et.draw('bx') + obj1.draw('r') + obj2.draw('b') + title('instant {0}'.format(currentInstant)) + axis('equal') + + return collisionPoints, crossingZones + + def computeCrossingsCollisions(self, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None): + '''Computes all crossing and collision points at each common instant for two road users. ''' + collisionPoints={} + crossingZones={} + if timeInterval: + commonTimeInterval = timeInterval + else: + commonTimeInterval = obj1.commonTimeInterval(obj2) + for i in list(commonTimeInterval)[:-1]: # do not look at the 1 last position/velocities, often with errors + collisionPoints[i], crossingZones[i] = self.computeCrossingsCollisionsAtInstant(i, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ, debug) + + return collisionPoints, crossingZones + + def computeCollisionProbability(self, obj1, obj2, collisionDistanceThreshold, timeHorizon, debug = False, timeInterval = None): + '''Computes only collision probabilities + Returns for each instant the collision probability and number of samples drawn''' + collisionProbabilities = {} + if timeInterval: + commonTimeInterval = timeInterval + else: + commonTimeInterval = obj1.commonTimeInterval(obj2) + for i in list(commonTimeInterval)[:-1]: + nCollisions = 0 + print(obj1.num, obj2.num, i) + predictedTrajectories1 = self.generatePredictedTrajectories(obj1, i) + predictedTrajectories2 = self.generatePredictedTrajectories(obj2, i) + for et1 in predictedTrajectories1: + for et2 in predictedTrajectories2: + t, p1, p2 = computeCollisionTime(et1, et2, collisionDistanceThreshold, timeHorizon) + if t <= timeHorizon: + nCollisions += 1 + # take into account probabilities ?? + nSamples = float(len(predictedTrajectories1)*len(predictedTrajectories2)) + collisionProbabilities[i] = [nSamples, float(nCollisions)/nSamples] + + if debug: + from matplotlib.pyplot import figure, axis, title + figure() + for et in predictedTrajectories1: + et.predictPosition(timeHorizon) + et.draw('rx') + + for et in predictedTrajectories2: + et.predictPosition(timeHorizon) + et.draw('bx') + obj1.draw('r') + obj2.draw('b') + title('instant {0}'.format(i)) + axis('equal') + + return collisionProbabilities + class ConstantPredictionParameters(PredictionParameters): def __init__(self, maxSpeed): PredictionParameters.__init__(self, 'constant velocity', maxSpeed) @@ -190,121 +305,11 @@ for p in points: out.write('{0} {1} {2} {3}\n'.format(objNum1, objNum2, predictionInstant, p)) -def computeExpectedIndicator(points): - from numpy import sum - return sum([p.indicator*p.probability for p in points])/sum([p.probability for p in points]) - -def computeCollisionTime(predictedTrajectory1, predictedTrajectory2, collisionDistanceThreshold, timeHorizon): - '''Computes the first instant at which two predicted trajectories are within some distance threshold''' - t = 1 - p1 = predictedTrajectory1.predictPosition(t) - p2 = predictedTrajectory2.predictPosition(t) - while t <= timeHorizon and (p1-p2).norm2() > collisionDistanceThreshold: - p1 = predictedTrajectory1.predictPosition(t) - p2 = predictedTrajectory2.predictPosition(t) - t += 1 - return t, p1, p2 - -def computeCrossingsCollisionsAtInstant(currentInstant, obj1, obj2, predictionParameters, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False): - '''returns the lists of collision points and crossing zones - - Check: Predicting all the points together, as if they represent the whole vehicle''' - predictedTrajectories1 = predictionParameters.generatePredictedTrajectories(obj1, currentInstant) - predictedTrajectories2 = predictionParameters.generatePredictedTrajectories(obj2, currentInstant) - - collisionPoints = [] - crossingZones = [] - for et1 in predictedTrajectories1: - for et2 in predictedTrajectories2: - t, p1, p2 = computeCollisionTime(et1, et2, collisionDistanceThreshold, timeHorizon) - - if t <= timeHorizon: - collisionPoints.append(SafetyPoint((p1+p2).multiply(0.5), et1.probability*et2.probability, t)) - elif computeCZ: # check if there is a crossing zone - # TODO? zone should be around the points at which the traj are the closest - # look for CZ at different times, otherwise it would be a collision - # an approximation would be to look for close points at different times, ie the complementary of collision points - cz = None - t1 = 0 - while not cz and t1 < timeHorizon: # t1 <= timeHorizon-1 - t2 = 0 - while not cz and t2 < timeHorizon: - #if (et1.predictPosition(t1)-et2.predictPosition(t2)).norm2() < collisionDistanceThreshold: - # cz = (et1.predictPosition(t1)+et2.predictPosition(t2)).multiply(0.5) - cz = moving.segmentIntersection(et1.predictPosition(t1), et1.predictPosition(t1+1), et2.predictPosition(t2), et2.predictPosition(t2+1)) - if cz: - crossingZones.append(SafetyPoint(cz, et1.probability*et2.probability, abs(t1-t2))) - t2 += 1 - t1 += 1 - - if debug: - from matplotlib.pyplot import figure, axis, title - figure() - for et in predictedTrajectories1: - et.predictPosition(timeHorizon) - et.draw('rx') + @staticmethod + def computeExpectedIndicator(points): + from numpy import sum + return sum([p.indicator*p.probability for p in points])/sum([p.probability for p in points]) - for et in predictedTrajectories2: - et.predictPosition(timeHorizon) - et.draw('bx') - obj1.draw('r') - obj2.draw('b') - title('instant {0}'.format(currentInstant)) - axis('equal') - - return collisionPoints, crossingZones - -def computeCrossingsCollisions(obj1, obj2, predictionParameters, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None): - '''Computes all crossing and collision points at each common instant for two road users. ''' - collisionPoints={} - crossingZones={} - if timeInterval: - commonTimeInterval = timeInterval - else: - commonTimeInterval = obj1.commonTimeInterval(obj2) - for i in list(commonTimeInterval)[:-1]: # do not look at the 1 last position/velocities, often with errors - collisionPoints[i], crossingZones[i] = computeCrossingsCollisionsAtInstant(i, obj1, obj2, predictionParameters, collisionDistanceThreshold, timeHorizon, computeCZ, debug) - - return collisionPoints, crossingZones - -def computeCollisionProbability(obj1, obj2, predictionParameters, collisionDistanceThreshold, timeHorizon, debug = False, timeInterval = None): - '''Computes only collision probabilities - Returns for each instant the collision probability and number of samples drawn''' - collisionProbabilities = {} - if timeInterval: - commonTimeInterval = timeInterval - else: - commonTimeInterval = obj1.commonTimeInterval(obj2) - for i in list(commonTimeInterval)[:-1]: - nCollisions = 0 - print(obj1.num, obj2.num, i) - predictedTrajectories1 = predictionParameters.generatePredictedTrajectories(obj1, i) - predictedTrajectories2 = predictionParameters.generatePredictedTrajectories(obj2, i) - for et1 in predictedTrajectories1: - for et2 in predictedTrajectories2: - t, p1, p2 = computeCollisionTime(et1, et2, collisionDistanceThreshold, timeHorizon) - if t <= timeHorizon: - nCollisions += 1 - # take into account probabilities ?? - nSamples = float(len(predictedTrajectories1)*len(predictedTrajectories2)) - collisionProbabilities[i] = [nSamples, float(nCollisions)/nSamples] - - if debug: - from matplotlib.pyplot import figure, axis, title - figure() - for et in predictedTrajectories1: - et.predictPosition(timeHorizon) - et.draw('rx') - - for et in predictedTrajectories2: - et.predictPosition(timeHorizon) - et.draw('bx') - obj1.draw('r') - obj2.draw('b') - title('instant {0}'.format(i)) - axis('equal') - - return collisionProbabilities #### # Other Methods