Mercurial Hosting > traffic-intelligence
diff python/prediction.py @ 607:84690dfe5560
add some functions for behaviour analysis
author | MohamedGomaa |
---|---|
date | Tue, 25 Nov 2014 22:49:47 -0500 |
parents | a9c1d61a89b4 |
children | 0dc36203973d |
line wrap: on
line diff
--- a/python/prediction.py Mon Nov 24 13:02:10 2014 -0500 +++ b/python/prediction.py Tue Nov 25 22:49:47 2014 -0500 @@ -4,6 +4,7 @@ import moving import math import random +import numpy as np class PredictedTrajectory: '''Class for predicted trajectories with lazy evaluation @@ -46,6 +47,14 @@ def getControl(self): return self.control + +def findNearestParams(initialPosition,prototypeTrajectory): + ''' nearest parameters are the index of minDistance and the orientation ''' + distances=[] + for position in prototypeTrajectory.positions: + distances.append(moving.Point.distanceNorm2(initialPosition, position)) + minDistanceIndex= np.argmin(distances) + return minDistanceIndex, moving.NormAngle.fromPoint(prototypeTrajectory.velocities[minDistanceIndex]).angle class PredictedTrajectoryPrototype(PredictedTrajectory): '''Predicted trajectory that follows a prototype trajectory @@ -64,15 +73,31 @@ self.constantSpeed = constantSpeed self.probability = probability self.predictedPositions = {0: initialPosition} - self.predictedSpeedOrientations = {0: moving.NormAngle.fromPoint(initialVelocity)} - + self.predictedSpeedOrientations = {0: moving.NormAngle(moving.NormAngle.fromPoint(initialVelocity).norm, findNearestParams(initialPosition,prototypeTrajectory)[1])}#moving.NormAngle.fromPoint(initialVelocity)} + def predictPosition(self, nTimeSteps): if nTimeSteps > 0 and not nTimeSteps in self.predictedPositions.keys(): - if constantSpeed: + if self.constantSpeed: # calculate cumulative distance - pass + speedNorm= self.predictedSpeedOrientations[0].norm #moving.NormAngle.fromPoint(initialVelocity).norm + anglePrototype = findNearestParams(self.predictedPositions[nTimeSteps-1],self.prototypeTrajectory)[1] + self.predictedSpeedOrientations[nTimeSteps]= moving.NormAngle(speedNorm, anglePrototype) + self.predictedPositions[nTimeSteps],tmp= moving.predictPosition(self.predictedPositions[nTimeSteps-1], self.predictedSpeedOrientations[nTimeSteps-1], moving.NormAngle(0,0), None) + else: # see c++ code, calculate ratio - pass + speedNorm= self.predictedSpeedOrientations[0].norm + instant=findNearestParams(self.predictedPositions[0],self.prototypeTrajectory)[0] + prototypeSpeeds= self.prototypeTrajectory.getSpeeds()[instant:] + ratio=float(speedNorm)/prototypeSpeeds[0] + resampledSpeeds=[sp*ratio for sp in prototypeSpeeds] + anglePrototype = findNearestParams(self.predictedPositions[nTimeSteps-1],self.prototypeTrajectory)[1] + if nTimeSteps<len(resampledSpeeds): + self.predictedSpeedOrientations[nTimeSteps]= moving.NormAngle(resampledSpeeds[nTimeSteps], anglePrototype) + self.predictedPositions[nTimeSteps],tmp= moving.predictPosition(self.predictedPositions[nTimeSteps-1], self.predictedSpeedOrientations[nTimeSteps-1], moving.NormAngle(0,0), None) + else: + self.predictedSpeedOrientations[nTimeSteps]= moving.NormAngle(resampledSpeeds[-1], anglePrototype) + self.predictedPositions[nTimeSteps],tmp= moving.predictPosition(self.predictedPositions[nTimeSteps-1], self.predictedSpeedOrientations[nTimeSteps-1], moving.NormAngle(0,0), None) + return self.predictedPositions[nTimeSteps] class PredictedTrajectoryRandomControl(PredictedTrajectory): @@ -141,10 +166,14 @@ savefig('predicted-trajectories-t-{0}.png'.format(currentInstant)) close() -def computeCrossingsCollisionsAtInstant(predictionParams, currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False): +def computeCrossingsCollisionsAtInstant(predictionParams,currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False,prototypeTrajectories1=None,prototypeTrajectories2=None): '''returns the lists of collision points and crossing zones''' - predictedTrajectories1 = predictionParams.generatePredictedTrajectories(obj1, currentInstant) - predictedTrajectories2 = predictionParams.generatePredictedTrajectories(obj2, currentInstant) + if prototypeTrajectories1==None: + predictedTrajectories1 = predictionParams.generatePredictedTrajectories(obj1, currentInstant) + predictedTrajectories2 = predictionParams.generatePredictedTrajectories(obj2, currentInstant) + else: + predictedTrajectories1 = predictionParams.generatePredictedTrajectories(obj1, currentInstant,prototypeTrajectories1) + predictedTrajectories2 = predictionParams.generatePredictedTrajectories(obj2, currentInstant,prototypeTrajectories2) collisionPoints = [] crossingZones = [] @@ -166,17 +195,17 @@ #if (et1.predictPosition(t1)-et2.predictPosition(t2)).norm2() < collisionDistanceThreshold: # cz = (et1.predictPosition(t1)+et2.predictPosition(t2)).multiply(0.5) cz = moving.segmentIntersection(et1.predictPosition(t1), et1.predictPosition(t1+1), et2.predictPosition(t2), et2.predictPosition(t2+1)) - if cz != None: - crossingZones.append(SafetyPoint(cz, et1.probability*et2.probability, abs(t1-t2))) + if cz: + deltaV= (et1.predictPosition(t1)- et1.predictPosition(t1+1) - et2.predictPosition(t2)+ et2.predictPosition(t2+1)).norm2() + crossingZones.append(SafetyPoint(cz, et1.probability*et2.probability, abs(t1-t2)-(float(collisionDistanceThreshold)/deltaV))) t2 += 1 t1 += 1 if debug: savePredictedTrajectoriesFigure(currentInstant, obj1, obj2, predictedTrajectories1, predictedTrajectories2, timeHorizon) + return currentInstant,collisionPoints, crossingZones - return currentInstant, collisionPoints, crossingZones - -def computeCrossingsCollisions(predictionParams, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None, nProcesses = 1): +def computeCrossingsCollisions(predictionParams, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None,nProcesses = 1,prototypeTrajectories1=None,prototypeTrajectories2=None): '''Computes all crossing and collision points at each common instant for two road users. ''' collisionPoints={} crossingZones={} @@ -186,7 +215,7 @@ commonTimeInterval = obj1.commonTimeInterval(obj2) if nProcesses == 1: for i in list(commonTimeInterval)[:-1]: # do not look at the 1 last position/velocities, often with errors - i, cp, cz = computeCrossingsCollisionsAtInstant(predictionParams, i, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ, debug) + i, cp, cz = computeCrossingsCollisionsAtInstant(predictionParams, i, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ, debug,prototypeTrajectories1,prototypeTrajectories2) if len(cp) != 0: collisionPoints[i] = cp if len(cz) != 0: @@ -194,7 +223,7 @@ else: from multiprocessing import Pool pool = Pool(processes = nProcesses) - jobs = [pool.apply_async(computeCrossingsCollisionsAtInstant, args = (predictionParams, i, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ, debug)) for i in list(commonTimeInterval)[:-1]] + jobs = [pool.apply_async(computeCrossingsCollisionsAtInstant, args = (predictionParams, i, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ, debug,prototypeTrajectories1,prototypeTrajectories2)) for i in list(commonTimeInterval)[:-1]] #results = [j.get() for j in jobs] #results.sort() for j in jobs: @@ -205,7 +234,7 @@ if len(cz) != 0: crossingZones[i] = cz pool.close() - return collisionPoints, crossingZones + return collisionPoints, crossingZones class PredictionParameters: def __init__(self, name, maxSpeed): @@ -218,11 +247,11 @@ def generatePredictedTrajectories(self, obj, instant): return [] - def computeCrossingsCollisionsAtInstant(self, currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False): - return computeCrossingsCollisionsAtInstant(self, currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ, debug) + def computeCrossingsCollisionsAtInstant(self, currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False,prototypeTrajectories1=None,prototypeTrajectories2=None): + return computeCrossingsCollisionsAtInstant(self, currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ, debug,prototypeTrajectories1,prototypeTrajectories2) - def computeCrossingsCollisions(self, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None, nProcesses = 1): - return computeCrossingsCollisions(self, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ, debug, timeInterval, nProcesses) + def computeCrossingsCollisions(self, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None, nProcesses = 1,prototypeTrajectories1=None,prototypeTrajectories2=None): + return computeCrossingsCollisions(self, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ, debug, timeInterval, nProcesses,prototypeTrajectories1,prototypeTrajectories2) def computeCollisionProbability(self, obj1, obj2, collisionDistanceThreshold, timeHorizon, debug = False, timeInterval = None): '''Computes only collision probabilities @@ -428,10 +457,20 @@ #### # Other Methods #### - - - - +class prototypePredictionParameters(PredictionParameters): + def __init__(self, maxSpeed, nPredictedTrajectories,constantSpeed = True): + name = 'prototype' + PredictionParameters.__init__(self, name, maxSpeed) + self.nPredictedTrajectories = nPredictedTrajectories + self.constantSpeed = constantSpeed + + def generatePredictedTrajectories(self, obj, instant,prototypeTrajectories): + predictedTrajectories = [] + initialPosition = obj.getPositionAtInstant(instant) + initialVelocity = obj.getVelocityAtInstant(instant) + for prototypeTraj in prototypeTrajectories.keys(): + predictedTrajectories.append(PredictedTrajectoryPrototype(initialPosition, initialVelocity, prototypeTraj, constantSpeed = self.constantSpeed, probability = prototypeTrajectories[prototypeTraj])) + return predictedTrajectories if __name__ == "__main__": import doctest