Mercurial Hosting > traffic-intelligence
diff python/prediction.py @ 945:05d4302bf67e
working motion pattern prediction with rotation and features
author | Nicolas Saunier <nicolas.saunier@polymtl.ca> |
---|---|
date | Thu, 20 Jul 2017 14:29:46 -0400 |
parents | 84ebe1b031f1 |
children | e5970606066f |
line wrap: on
line diff
--- a/python/prediction.py Thu Jul 20 12:12:28 2017 -0400 +++ b/python/prediction.py Thu Jul 20 14:29:46 2017 -0400 @@ -5,6 +5,7 @@ from utils import LCSS import math, random +from copy import copy import numpy as np from multiprocessing import Pool @@ -74,13 +75,15 @@ self.probability = probability self.predictedPositions = {0: initialPosition} self.closestPointIdx = prototype.getPositions().getClosestPoint(initialPosition) - self.deltaPosition = initialPosition-prototype.getPositionAt(self.closestPointIdx) + self.deltaPosition = initialPosition-prototype.getPositionAt(self.closestPointIdx) #should be computed in relative coordinates to position + self.theta = prototype.getVelocityAt(self.closestPointIdx).angle() self.initialSpeed = initialVelocity.norm2() if not constantSpeed: self.ratio = self.initialSpeed/prototype.getVelocityAt(self.closestPointIdx).norm2() def predictPosition(self, nTimeSteps): if nTimeSteps > 0 and not nTimeSteps in self.predictedPositions.keys(): + deltaPosition = copy(self.deltaPosition) if self.constantSpeed: traj = self.prototype.getPositions() trajLength = traj.length() @@ -90,19 +93,21 @@ i += 1 if i == trajLength: v = self.prototype.getVelocityAt(-1) - self.predictedPositions[nTimeSteps] = self.deltaPosition+traj[i-1]+v*((traveledDistance-traj.getCumulativeDistance(i-1))/v.norm2()) + self.predictedPositions[nTimeSteps] = deltaPosition.rotate(v.angle()-self.theta)+traj[i-1]+v*((traveledDistance-traj.getCumulativeDistance(i-1))/v.norm2()) else: - self.predictedPositions[nTimeSteps] = self.deltaPosition+traj[i-1]+(traj[i]-traj[i-1])*((traveledDistance-traj.getCumulativeDistance(i-1))/traj.getDistance(i-1)) + v = self.prototype.getVelocityAt(i-1) + self.predictedPositions[nTimeSteps] = deltaPosition.rotate(v.angle()-self.theta)+traj[i-1]+(traj[i]-traj[i-1])*((traveledDistance-traj.getCumulativeDistance(i-1))/traj.getDistance(i-1)) else: traj = self.prototype.getPositions() trajLength = traj.length() nSteps = self.ratio*nTimeSteps+self.closestPointIdx i = int(np.floor(nSteps)) if nSteps < trajLength-1: - self.predictedPositions[nTimeSteps] = self.deltaPosition+traj[i]+(traj[i+1]-traj[i])*(nSteps-i) + v = self.prototype.getVelocityAt(i) + self.predictedPositions[nTimeSteps] = deltaPosition.rotate(v.angle()-self.theta)+traj[i]+(traj[i+1]-traj[i])*(nSteps-i) else: v = self.prototype.getVelocityAt(-1) - self.predictedPositions[nTimeSteps] = self.deltaPosition+traj[-1]+v*(nSteps-trajLength+1) + self.predictedPositions[nTimeSteps] = deltaPosition.rotate(v.angle()-self.theta)+traj[-1]+v*(nSteps-trajLength+1) return self.predictedPositions[nTimeSteps] class PredictedTrajectoryRandomControl(PredictedTrajectory): @@ -552,25 +557,31 @@ self.minFeatureTime = minFeatureTime self.constantSpeed = constantSpeed self.useFeatures = useFeatures + + def addPredictedTrajectories(self, predictedTrajectories, obj, instant): + if not hasattr(obj, 'prototypeSimilarities'): + obj.prototypeSimilarities = [] + for proto in self.prototypes: + self.lcss.similarities(proto.getMovingObject().getPositions().asArray().T, obj.getPositions().asArray().T) + similarities = self.lcss.similarityTable[-1, :-1].astype(float) + obj.prototypeSimilarities.append(similarities/np.minimum(np.arange(1.,len(similarities)+1), proto.getMovingObject().length()*np.ones(len(similarities)))) + for proto, similarities in zip(self.prototypes, obj.prototypeSimilarities): + if similarities[instant-obj.getFirstInstant()] >= self.minSimilarity: + initialPosition = obj.getPositionAtInstant(instant) + initialVelocity = obj.getVelocityAtInstant(instant) + predictedTrajectories.append(PredictedTrajectoryPrototype(initialPosition, initialVelocity, proto.getMovingObject(), constantSpeed = self.constantSpeed, probability = proto.getNMatchings())) def generatePredictedTrajectories(self, obj, instant): predictedTrajectories = [] if instant-obj.getFirstInstant()+1 >= self.minFeatureTime: if self.useFeatures and obj.hasFeatures(): # get current features existing for the most time, sort on first instant of feature and take n first - pass + firstInstants = [(f,f.getFirstInstant()) for f in obj.getFeatures() if f.existsAtInstant(instant)] + firstInstants.sort(key = lambda t: t[1]) + for f,t1 in firstInstants[:min(self.nPredictedTrajectories, len(firstInstants))]: + self.addPredictedTrajectories(predictedTrajectories, f, instant) else: - if not hasattr(obj, 'prototypeSimilarities'): - obj.prototypeSimilarities = [] - for proto in self.prototypes: - self.lcss.similarities(proto.getMovingObject().getPositions().asArray().T, obj.getPositions().asArray().T) - similarities = self.lcss.similarityTable[-1, :-1].astype(float) - obj.prototypeSimilarities.append(similarities/np.minimum(np.arange(1.,len(similarities)+1), proto.getMovingObject().length()*np.ones(len(similarities)))) - for proto, similarities in zip(self.prototypes, obj.prototypeSimilarities): - if similarities[instant-obj.getFirstInstant()] >= self.minSimilarity: - initialPosition = obj.getPositionAtInstant(instant) - initialVelocity = obj.getVelocityAtInstant(instant) - predictedTrajectories.append(PredictedTrajectoryPrototype(initialPosition, initialVelocity, proto.getMovingObject(), constantSpeed = self.constantSpeed, probability = proto.getNMatchings())) + self.addPredictedTrajectories(predictedTrajectories, obj, instant) return predictedTrajectories if __name__ == "__main__":