Mercurial Hosting > traffic-intelligence
changeset 662:72174e66aba5
corrected bug that increased TTC by 1 frame and structure to store collision points and crossing zones
author | Nicolas Saunier <nicolas.saunier@polymtl.ca> |
---|---|
date | Mon, 18 May 2015 17:17:06 +0200 |
parents | dc70d9e711f5 |
children | 65a867942eee |
files | python/events.py python/moving.py python/prediction.py python/tests/events.txt python/tests/prediction.txt |
diffstat | 5 files changed, 79 insertions(+), 30 deletions(-) [+] |
line wrap: on
line diff
--- a/python/events.py Mon May 18 13:53:25 2015 +0200 +++ b/python/events.py Mon May 18 17:17:06 2015 +0200 @@ -98,13 +98,15 @@ if roaduserNum1 is not None and roaduserNum2 is not None: self.roadUserNumbers = set([roaduserNum1, roaduserNum2]) elif roadUser1 is not None and roadUser2 is not None: - self.roadUserNumbers = set(roadUser1.getNum(), roadUser2.getNum()) + self.roadUserNumbers = set([roadUser1.getNum(), roadUser2.getNum()]) else: self.roadUserNumbers = None self.categoryNum = categoryNum self.indicators = {} self.interactionInterval = None - self.collisionPoints = None # distionary for collison points with different prediction methods + # list for collison points and crossing zones + self.collisionPoints = None + self.crossingZones = None def getRoadUserNumbers(self): return self.roadUserNumbers @@ -180,7 +182,8 @@ speedDifferentials[instant] = deltav.norm2() if collisionCourseDotProducts[instant] > 0: interactionInstants.append(instant) - collisionCourseAngles[instant] = arccos(collisionCourseDotProducts[instant]/(distances[instant]*speedDifferentials[instant])) + if distances[instant] != 0 and speedDifferentials[instant] != 0: + collisionCourseAngles[instant] = arccos(collisionCourseDotProducts[instant]/(distances[instant]*speedDifferentials[instant])) if len(interactionInstants) >= 2: self.interactionInterval = moving.TimeInterval(interactionInstants[0], interactionInstants[-1]) @@ -201,8 +204,6 @@ def computeCrossingsCollisions(self, predictionParameters, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None, nProcesses = 1, usePrototypes=False, route1= (-1,-1), route2=(-1,-1), prototypes={}, secondStepPrototypes={}, nMatching={}, objects=[], noiseEntryNums=[], noiseExitNums=[], minSimilarity=0.1, mostMatched=None, useDestination=True, useSpeedPrototype=True, acceptPartialLength=30, step=1): '''Computes all crossing and collision points at each common instant for two road users. ''' - self.collisionPoints={} - self.crossingZones={} TTCs = {} if usePrototypes: route1= getRoute(self.roadUser1,prototypes,objects,noiseEntryNums,noiseExitNums,useDestination) @@ -212,19 +213,19 @@ commonTimeInterval = timeInterval else: commonTimeInterval = self.timeInterval - self.collisionPoints[predictionParameters.name], crossingZones = predictionParameters.computeCrossingsCollisions(self.roadUser1, self.roadUser2, collisionDistanceThreshold, timeHorizon, computeCZ, debug, commonTimeInterval, nProcesses,usePrototypes,route1,route2,prototypes,secondStepPrototypes,nMatching,objects,noiseEntryNums,noiseExitNums,minSimilarity,mostMatched,useDestination,useSpeedPrototype,acceptPartialLength, step) - if computeCZ: - self.crossingZones[predictionParameters.name] = crossingZones + self.collisionPoints, crossingZones = predictionParameters.computeCrossingsCollisions(self.roadUser1, self.roadUser2, collisionDistanceThreshold, timeHorizon, computeCZ, debug, commonTimeInterval, nProcesses,usePrototypes,route1,route2,prototypes,secondStepPrototypes,nMatching,objects,noiseEntryNums,noiseExitNums,minSimilarity,mostMatched,useDestination,useSpeedPrototype,acceptPartialLength, step) for i, cp in self.collisionPoints.iteritems(): TTCs[i] = prediction.SafetyPoint.computeExpectedIndicator(cp) - # add probability of collision, and probability of successful evasive action self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[7], TTCs, mostSevereIsMax=False)) + # crossing zones and pPET if computeCZ: + self.crossingZones[predictionParameters.name] = crossingZones pPETs = {} for i, cz in self.crossingZones.iteritems(): pPETs[i] = prediction.SafetyPoint.computeExpectedIndicator(cz) self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[9], pPETs, mostSevereIsMax=False)) + # TODO add probability of collision, and probability of successful evasive action def computePET(self, collisionDistanceThreshold): # TODO add crossing zone @@ -238,7 +239,7 @@ self.interactionType = interactionType def getCrossingZones(self, predictionMethodName): - if self.hasattr(self, 'crossingZones'): + if self.crossingZones is not None: return self.crossingZones[predictionMethodName] else: return None @@ -277,16 +278,19 @@ else: return None -def aggregateSafetyPoints(interactions, pointType = 'collision'): +def aggregateSafetyPoints(interactions, predictionMethodName = None, pointType = 'collision'): '''Put all collision points or crossing zones in a list for display''' + if predictionMethodName is None and len(interactions)>0: + predictionMethodName = interactions[0].collisionPoints.keys()[0] + allPoints = [] if pointType == 'collision': for i in interactions: - for points in i.collisionPoints.values(): + for points in i.collisionPoints[predictionMethodName].values(): allPoints += points elif pointType == 'crossing': for i in interactions: - for points in i.crossingZones.values(): + for points in i.crossingZones[predictionMethodName].values(): allPoints += points else: print('unknown type of point '+pointType)
--- a/python/moving.py Mon May 18 13:53:25 2015 +0200 +++ b/python/moving.py Mon May 18 17:17:06 2015 +0200 @@ -988,7 +988,7 @@ self.userType = userType self.features = None # compute bounding polygon from trajectory - + @staticmethod def generate(p, v, timeInterval): positions, velocities = Trajectory.generate(p, v, int(timeInterval.length()))
--- a/python/prediction.py Mon May 18 13:53:25 2015 +0200 +++ b/python/prediction.py Mon May 18 17:17:06 2015 +0200 @@ -140,15 +140,21 @@ return sum([p.indicator*p.probability for p in points])/sum([p.probability for p in points]) def computeCollisionTime(predictedTrajectory1, predictedTrajectory2, collisionDistanceThreshold, timeHorizon): - '''Computes the first instant at which two predicted trajectories are within some distance threshold''' + '''Computes the first instant + at which two predicted trajectories are within some distance threshold + Computes all the times including timeHorizon + + User has to check the first variable collision to know about a collision''' t = 1 p1 = predictedTrajectory1.predictPosition(t) p2 = predictedTrajectory2.predictPosition(t) - while t <= timeHorizon and (p1-p2).norm2() > collisionDistanceThreshold: + collision = (p1-p2).norm2() <= collisionDistanceThreshold + while t < timeHorizon and not collision: + t += 1 p1 = predictedTrajectory1.predictPosition(t) p2 = predictedTrajectory2.predictPosition(t) - t += 1 - return t, p1, p2 + collision = (p1-p2).norm2() <= collisionDistanceThreshold + return collision, t, p1, p2 def savePredictedTrajectoriesFigure(currentInstant, obj1, obj2, predictedTrajectories1, predictedTrajectories2, timeHorizon): from matplotlib.pyplot import figure, axis, title, close, savefig @@ -248,8 +254,8 @@ def computeCrossingsCollisionsAtInstant(predictionParams,currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, usePrototypes = False,route1= (-1,-1),route2=(-1,-1),prototypes={},secondStepPrototypes={},nMatching={},objects=[],noiseEntryNums=[],noiseExitNums=[],minSimilarity=0.1,mostMatched=None,useDestination=True,useSpeedPrototype=True): '''returns the lists of collision points and crossing zones''' if usePrototypes: - prototypeTrajectories1=getPrototypeTrajectory(obj1,route1,currentInstant,prototypes,secondStepPrototypes,nMatching,objects,noiseEntryNums,noiseExitNums,minSimilarity,mostMatched,useDestination,useSpeedPrototype) - prototypeTrajectories2= getPrototypeTrajectory(obj2,route2,currentInstant,prototypes,secondStepPrototypes,nMatching,objects,noiseEntryNums,noiseExitNums,minSimilarity,mostMatched,useDestination,useSpeedPrototype) + prototypeTrajectories1 = getPrototypeTrajectory(obj1,route1,currentInstant,prototypes,secondStepPrototypes,nMatching,objects,noiseEntryNums,noiseExitNums,minSimilarity,mostMatched,useDestination,useSpeedPrototype) + prototypeTrajectories2 = getPrototypeTrajectory(obj2,route2,currentInstant,prototypes,secondStepPrototypes,nMatching,objects,noiseEntryNums,noiseExitNums,minSimilarity,mostMatched,useDestination,useSpeedPrototype) predictedTrajectories1 = predictionParams.generatePredictedTrajectories(obj1, currentInstant,prototypeTrajectories1) predictedTrajectories2 = predictionParams.generatePredictedTrajectories(obj2, currentInstant,prototypeTrajectories2) else: @@ -260,9 +266,9 @@ crossingZones = [] for et1 in predictedTrajectories1: for et2 in predictedTrajectories2: - t, p1, p2 = computeCollisionTime(et1, et2, collisionDistanceThreshold, timeHorizon) + collision, t, p1, p2 = computeCollisionTime(et1, et2, collisionDistanceThreshold, timeHorizon) - if t <= timeHorizon: + if collision: collisionPoints.append(SafetyPoint((p1+p2).multiply(0.5), et1.probability*et2.probability, t)) elif computeCZ: # check if there is a crossing zone # TODO? zone should be around the points at which the traj are the closest @@ -368,8 +374,8 @@ predictedTrajectories2 = self.generatePredictedTrajectories(obj2, i) for et1 in predictedTrajectories1: for et2 in predictedTrajectories2: - t, p1, p2 = computeCollisionTime(et1, et2, collisionDistanceThreshold, timeHorizon) - if t <= timeHorizon: + collision, t, p1, p2 = computeCollisionTime(et1, et2, collisionDistanceThreshold, timeHorizon) + if collision: nCollisions += 1 # take into account probabilities ?? nSamples = float(len(predictedTrajectories1)*len(predictedTrajectories2))
--- a/python/tests/events.txt Mon May 18 13:53:25 2015 +0200 +++ b/python/tests/events.txt Mon May 18 17:17:06 2015 +0200 @@ -1,5 +1,6 @@ >>> from events import * ->>> from moving import MovingObject, TimeInterval +>>> from moving import MovingObject, TimeInterval, Point +>>> from prediction import ConstantPredictionParameters >>> objects = [MovingObject(num = i, timeInterval = TimeInterval(0,10)) for i in range(10)] >>> interactions = createInteractions(objects) @@ -9,3 +10,25 @@ >>> interactions = createInteractions(objects, objects2) >>> len([i for i in interactions if len(i.roadUserNumbers) == 1]) 0 + +>>> o1 = MovingObject.generate(Point(-5.,0.), Point(1.,0.), TimeInterval(0,10)) +>>> o2 = MovingObject.generate(Point(0.,-5.), Point(0.,1.), TimeInterval(0,10)) +>>> inter = Interaction(roadUser1 = o1, roadUser2 = o2) +>>> inter.computeIndicators() +>>> predictionParams = ConstantPredictionParameters(10.) +>>> inter.computeCrossingsCollisions(predictionParams, 0.1, 10) +>>> ttc = inter.getIndicator("Time to Collision") +>>> ttc[0] +5.0 +>>> ttc[1] +4.0 +>>> (inter.collisionPoints[0][0] - Point(0.,0.)).norm2() < 0.0001 +True +>>> (inter.collisionPoints[4][0] - Point(0.,0.)).norm2() < 0.0001 +True +>>> inter.getIndicator(Interaction.indicatorNames[1])[4] < 0.000001 # collision angle +True +>>> inter.getIndicator(Interaction.indicatorNames[1])[5] is None +True +>>> inter.getIndicator(Interaction.indicatorNames[1])[6] # doctest:+ELLIPSIS +3.1415...
--- a/python/tests/prediction.txt Mon May 18 13:53:25 2015 +0200 +++ b/python/tests/prediction.txt Mon May 18 17:17:06 2015 +0200 @@ -1,13 +1,13 @@ ->>> import prediction +>>> from prediction import * >>> import moving ->>> et = prediction.PredictedTrajectoryConstant(moving.Point(0,0), moving.Point(1,0)) +>>> et = PredictedTrajectoryConstant(moving.Point(0,0), moving.Point(1,0)) >>> et.predictPosition(4) # doctest:+ELLIPSIS (4.0...,0.0...) >>> et.predictPosition(1) # doctest:+ELLIPSIS (1.0...,0.0...) ->>> et = prediction.PredictedTrajectoryConstant(moving.Point(0,0), moving.Point(1,0), moving.NormAngle(0.1,0), maxSpeed = 2) +>>> et = PredictedTrajectoryConstant(moving.Point(0,0), moving.Point(1,0), moving.NormAngle(0.1,0), maxSpeed = 2) >>> et.predictPosition(10) # doctest:+ELLIPSIS (15.5...,0.0...) >>> et.predictPosition(11) # doctest:+ELLIPSIS @@ -18,13 +18,29 @@ >>> import random >>> acceleration = lambda: random.uniform(-0.5,0.5) >>> steering = lambda: random.uniform(-0.1,0.1) ->>> et = prediction.PredictedTrajectoryRandomControl(moving.Point(0,0),moving.Point(1,1), acceleration, steering, maxSpeed = 2) +>>> et = PredictedTrajectoryRandomControl(moving.Point(0,0),moving.Point(1,1), acceleration, steering, maxSpeed = 2) >>> p = et.predictPosition(500) >>> from numpy import max >>> max(et.getPredictedSpeeds()) <= 2. True >>> p = moving.Point(3,4) ->>> sp = prediction.SafetyPoint(p, 0.1, 0) +>>> sp = SafetyPoint(p, 0.1, 0) >>> print(sp) 3 4 0.1 0 + +>>> et1 = PredictedTrajectoryConstant(moving.Point(-5.,0.), moving.Point(1.,0.)) +>>> et2 = PredictedTrajectoryConstant(moving.Point(0.,-5.), moving.Point(0.,1.)) +>>> collision, t, cp1, cp2 = computeCollisionTime(et1, et2, 0.1, 10) +>>> collision +True +>>> t +5 +>>> collision, t, cp1, cp2 = computeCollisionTime(et1, et2, 0.1, 5) +>>> collision +True +>>> t +5 +>>> collision, t, cp1, cp2 = computeCollisionTime(et1, et2, 0.1, 4) +>>> collision +False