Mercurial Hosting > traffic-intelligence
diff python/prediction.py @ 604:8ba4b8ad4c86
Motion Pattern Method
author | MohamedGomaa |
---|---|
date | Tue, 15 Jul 2014 13:25:15 -0400 |
parents | 727e3c529519 |
children |
line wrap: on
line diff
--- a/python/prediction.py Tue Jul 08 15:22:30 2014 -0400 +++ b/python/prediction.py Tue Jul 15 13:25:15 2014 -0400 @@ -4,6 +4,7 @@ import moving import math import random +import numpy as np class PredictedTrajectory: '''Class for predicted trajectories with lazy evaluation @@ -47,6 +48,20 @@ def getControl(self): return self.control +def findNearestOrientation(initialPosition,prototypeTrajectory): + distances=[] + for p in prototypeTrajectory.positions: + distances.append(moving.Point.distanceNorm2(initialPosition, p)) + t=np.argmin(distances) + return moving.NormAngle.fromPoint(prototypeTrajectory.velocities[t]).angle +#todo: merge with previous function +def findNearestInstant(initialPosition,prototypeTrajectory): + distances=[] + for p in prototypeTrajectory.positions: + distances.append(moving.Point.distanceNorm2(initialPosition, p)) + t=np.argmin(distances) + return t + class PredictedTrajectoryPrototype(PredictedTrajectory): '''Predicted trajectory that follows a prototype trajectory The prototype is in the format of a moving.Trajectory: it could be @@ -59,20 +74,39 @@ (applying a constant ratio equal to the ratio of the user instantaneous speed and the trajectory closest speed)''' - def __init__(self, initialPosition, initialVelocity, prototypeTrajectory, constantSpeed = True, probability = 1.): + def __init__(self, initialPosition, initialVelocity, prototypeTrajectory, constantSpeed = True, speedList=[], probability = 1.): self.prototypeTrajectory = prototypeTrajectory self.constantSpeed = constantSpeed + self.speedList = speedList self.probability = probability self.predictedPositions = {0: initialPosition} - self.predictedSpeedOrientations = {0: moving.NormAngle.fromPoint(initialVelocity)} + self.predictedSpeedOrientations = {0: moving.NormAngle(moving.NormAngle.fromPoint(initialVelocity).norm, findNearestOrientation(initialPosition,prototypeTrajectory))}#moving.NormAngle.fromPoint(initialVelocity)} def predictPosition(self, nTimeSteps): if nTimeSteps > 0 and not nTimeSteps in self.predictedPositions.keys(): - if constantSpeed: + if self.constantSpeed: # calculate cumulative distance - pass + speedNorm= self.predictedSpeedOrientations[0].norm #moving.NormAngle.fromPoint(initialVelocity).norm + anglePrototype = findNearestOrientation(self.predictedPositions[nTimeSteps-1],self.prototypeTrajectory) + self.predictedSpeedOrientations[nTimeSteps]= moving.NormAngle(speedNorm, anglePrototype) + self.predictedPositions[nTimeSteps],tmp= moving.predictPosition(self.predictedPositions[nTimeSteps-1], self.predictedSpeedOrientations[nTimeSteps-1], moving.NormAngle(0,0), None) + #pass + elif self.speedList!=[]: + pass else: # see c++ code, calculate ratio - pass + speedNorm= self.predictedSpeedOrientations[0].norm + instant=findNearestInstant(self.predictedPositions[0],self.prototypeTrajectory) + prototypeSpeeds= self.prototypeTrajectory.getSpeeds()[instant:] + ratio=float(speedNorm)/prototypeSpeeds[0] + resampledSpeeds=[sp*ratio for sp in prototypeSpeeds] + anglePrototype = findNearestOrientation(self.predictedPositions[nTimeSteps-1],self.prototypeTrajectory) + if nTimeSteps<len(resampledSpeeds): + self.predictedSpeedOrientations[nTimeSteps]= moving.NormAngle(resampledSpeeds[nTimeSteps], anglePrototype) + self.predictedPositions[nTimeSteps],tmp= moving.predictPosition(self.predictedPositions[nTimeSteps-1], self.predictedSpeedOrientations[nTimeSteps-1], moving.NormAngle(0,0), None) + else: + self.predictedSpeedOrientations[nTimeSteps]= moving.NormAngle(resampledSpeeds[-1], anglePrototype) + self.predictedPositions[nTimeSteps],tmp= moving.predictPosition(self.predictedPositions[nTimeSteps-1], self.predictedSpeedOrientations[nTimeSteps-1], moving.NormAngle(0,0), None) + # pass return self.predictedPositions[nTimeSteps] class PredictedTrajectoryRandomControl(PredictedTrajectory): @@ -135,10 +169,14 @@ def generatePredictedTrajectories(self, obj, instant): return [] - def computeCrossingsCollisionsAtInstant(self, currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False): + def computeCrossingsCollisionsAtInstant(self, currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False,prototypeTrajectories1=None,prototypeTrajectories2=None): '''returns the lists of collision points and crossing zones''' - predictedTrajectories1 = self.generatePredictedTrajectories(obj1, currentInstant) - predictedTrajectories2 = self.generatePredictedTrajectories(obj2, currentInstant) + if prototypeTrajectories1==None: + predictedTrajectories1 = self.generatePredictedTrajectories(obj1, currentInstant) + predictedTrajectories2 = self.generatePredictedTrajectories(obj2, currentInstant) + else: + predictedTrajectories1 = self.generatePredictedTrajectories(obj1, currentInstant,prototypeTrajectories1) + predictedTrajectories2 = self.generatePredictedTrajectories(obj2, currentInstant,prototypeTrajectories2) collisionPoints = [] crossingZones = [] @@ -161,7 +199,8 @@ # cz = (et1.predictPosition(t1)+et2.predictPosition(t2)).multiply(0.5) cz = moving.segmentIntersection(et1.predictPosition(t1), et1.predictPosition(t1+1), et2.predictPosition(t2), et2.predictPosition(t2+1)) if cz: - crossingZones.append(SafetyPoint(cz, et1.probability*et2.probability, abs(t1-t2))) + deltaV= (et1.predictPosition(t1)- et1.predictPosition(t1+1) - et2.predictPosition(t2)+ et2.predictPosition(t2+1)).norm2() + crossingZones.append(SafetyPoint(cz, et1.probability*et2.probability, abs(t1-t2)-(float(collisionDistanceThreshold)/deltaV))) t2 += 1 t1 += 1 @@ -182,7 +221,7 @@ return collisionPoints, crossingZones - def computeCrossingsCollisions(self, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None): + def computeCrossingsCollisions(self, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None,prototypeTrajectories1=None,prototypeTrajectories2=None): '''Computes all crossing and collision points at each common instant for two road users. ''' collisionPoints={} crossingZones={} @@ -191,7 +230,7 @@ else: commonTimeInterval = obj1.commonTimeInterval(obj2) for i in list(commonTimeInterval)[:-1]: # do not look at the 1 last position/velocities, often with errors - collisionPoints[i], crossingZones[i] = self.computeCrossingsCollisionsAtInstant(i, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ, debug) + collisionPoints[i], crossingZones[i] = self.computeCrossingsCollisionsAtInstant(i, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ, debug,prototypeTrajectories1,prototypeTrajectories2) return collisionPoints, crossingZones @@ -412,10 +451,24 @@ #### # Other Methods #### - - - - +class prototypePredictionParameters(PredictionParameters): + def __init__(self, maxSpeed, nPredictedTrajectories,constantSpeed = True, speedList=[]): + #TODO speed profiles integration + name = 'prototype' + PredictionParameters.__init__(self, name, maxSpeed) + self.nPredictedTrajectories = nPredictedTrajectories + #self.prototypeTrajectory = prototypeTrajectory + self.constantSpeed = constantSpeed + self.speedList = speedList + #self.probability=probability + + def generatePredictedTrajectories(self, obj, instant,prototypeTrajectories): + predictedTrajectories = [] + initialPosition = obj.getPositionAtInstant(instant) + initialVelocity = obj.getVelocityAtInstant(instant) + for prototypeTraj in prototypeTrajectories.keys(): + predictedTrajectories.append(PredictedTrajectoryPrototype(initialPosition, initialVelocity, prototypeTraj, constantSpeed = self.constantSpeed, speedList=self.speedList, probability = prototypeTrajectories[prototypeTraj])) + return predictedTrajectories if __name__ == "__main__": import doctest