Mercurial Hosting > traffic-intelligence
changeset 945:05d4302bf67e
working motion pattern prediction with rotation and features
author | Nicolas Saunier <nicolas.saunier@polymtl.ca> |
---|---|
date | Thu, 20 Jul 2017 14:29:46 -0400 |
parents | 84ebe1b031f1 |
children | e5970606066f |
files | python/moving.py python/prediction.py python/tests/prediction.txt scripts/safety-analysis.py |
diffstat | 4 files changed, 44 insertions(+), 21 deletions(-) [+] |
line wrap: on
line diff
--- a/python/moving.py Thu Jul 20 12:12:28 2017 -0400 +++ b/python/moving.py Thu Jul 20 14:29:46 2017 -0400 @@ -219,8 +219,17 @@ if clockwise: return Point(self.y, -self.x) else: - return Point(-self.y, self.x) + return Point(-self.y, self.x) + def projectLocal(self, v, clockwise = True): + 'Projects point projected on v, v.orthogonal()' + e1 = v/v.norm2() + e2 = e1.orthogonal(clockwise) + return Point(Point.dot(self, e1), Point.doc(self, e2)) + + def rotate(self, theta): + return Point(self.x*cos(theta)-self.y*sin(theta), self.x*sin(theta)+self.y*cos(theta)) + def __mul__(self, alpha): 'Warning, returns a new Point' return Point(self.x*alpha, self.y*alpha) @@ -236,6 +245,9 @@ def plotSegment(p1, p2, options = 'o', **kwargs): plot([p1.x, p2.x], [p1.y, p2.y], options, **kwargs) + def angle(self): + return atan2(self.y, self.x) + def norm2Squared(self): '''2-norm distance (Euclidean distance)''' return self.x**2+self.y**2 @@ -529,7 +541,7 @@ def fromPoint(p): norm = p.norm2() if norm > 0: - angle = atan2(p.y, p.x) + angle = p.angle() else: angle = 0. return NormAngle(norm, angle)
--- a/python/prediction.py Thu Jul 20 12:12:28 2017 -0400 +++ b/python/prediction.py Thu Jul 20 14:29:46 2017 -0400 @@ -5,6 +5,7 @@ from utils import LCSS import math, random +from copy import copy import numpy as np from multiprocessing import Pool @@ -74,13 +75,15 @@ self.probability = probability self.predictedPositions = {0: initialPosition} self.closestPointIdx = prototype.getPositions().getClosestPoint(initialPosition) - self.deltaPosition = initialPosition-prototype.getPositionAt(self.closestPointIdx) + self.deltaPosition = initialPosition-prototype.getPositionAt(self.closestPointIdx) #should be computed in relative coordinates to position + self.theta = prototype.getVelocityAt(self.closestPointIdx).angle() self.initialSpeed = initialVelocity.norm2() if not constantSpeed: self.ratio = self.initialSpeed/prototype.getVelocityAt(self.closestPointIdx).norm2() def predictPosition(self, nTimeSteps): if nTimeSteps > 0 and not nTimeSteps in self.predictedPositions.keys(): + deltaPosition = copy(self.deltaPosition) if self.constantSpeed: traj = self.prototype.getPositions() trajLength = traj.length() @@ -90,19 +93,21 @@ i += 1 if i == trajLength: v = self.prototype.getVelocityAt(-1) - self.predictedPositions[nTimeSteps] = self.deltaPosition+traj[i-1]+v*((traveledDistance-traj.getCumulativeDistance(i-1))/v.norm2()) + self.predictedPositions[nTimeSteps] = deltaPosition.rotate(v.angle()-self.theta)+traj[i-1]+v*((traveledDistance-traj.getCumulativeDistance(i-1))/v.norm2()) else: - self.predictedPositions[nTimeSteps] = self.deltaPosition+traj[i-1]+(traj[i]-traj[i-1])*((traveledDistance-traj.getCumulativeDistance(i-1))/traj.getDistance(i-1)) + v = self.prototype.getVelocityAt(i-1) + self.predictedPositions[nTimeSteps] = deltaPosition.rotate(v.angle()-self.theta)+traj[i-1]+(traj[i]-traj[i-1])*((traveledDistance-traj.getCumulativeDistance(i-1))/traj.getDistance(i-1)) else: traj = self.prototype.getPositions() trajLength = traj.length() nSteps = self.ratio*nTimeSteps+self.closestPointIdx i = int(np.floor(nSteps)) if nSteps < trajLength-1: - self.predictedPositions[nTimeSteps] = self.deltaPosition+traj[i]+(traj[i+1]-traj[i])*(nSteps-i) + v = self.prototype.getVelocityAt(i) + self.predictedPositions[nTimeSteps] = deltaPosition.rotate(v.angle()-self.theta)+traj[i]+(traj[i+1]-traj[i])*(nSteps-i) else: v = self.prototype.getVelocityAt(-1) - self.predictedPositions[nTimeSteps] = self.deltaPosition+traj[-1]+v*(nSteps-trajLength+1) + self.predictedPositions[nTimeSteps] = deltaPosition.rotate(v.angle()-self.theta)+traj[-1]+v*(nSteps-trajLength+1) return self.predictedPositions[nTimeSteps] class PredictedTrajectoryRandomControl(PredictedTrajectory): @@ -552,25 +557,31 @@ self.minFeatureTime = minFeatureTime self.constantSpeed = constantSpeed self.useFeatures = useFeatures + + def addPredictedTrajectories(self, predictedTrajectories, obj, instant): + if not hasattr(obj, 'prototypeSimilarities'): + obj.prototypeSimilarities = [] + for proto in self.prototypes: + self.lcss.similarities(proto.getMovingObject().getPositions().asArray().T, obj.getPositions().asArray().T) + similarities = self.lcss.similarityTable[-1, :-1].astype(float) + obj.prototypeSimilarities.append(similarities/np.minimum(np.arange(1.,len(similarities)+1), proto.getMovingObject().length()*np.ones(len(similarities)))) + for proto, similarities in zip(self.prototypes, obj.prototypeSimilarities): + if similarities[instant-obj.getFirstInstant()] >= self.minSimilarity: + initialPosition = obj.getPositionAtInstant(instant) + initialVelocity = obj.getVelocityAtInstant(instant) + predictedTrajectories.append(PredictedTrajectoryPrototype(initialPosition, initialVelocity, proto.getMovingObject(), constantSpeed = self.constantSpeed, probability = proto.getNMatchings())) def generatePredictedTrajectories(self, obj, instant): predictedTrajectories = [] if instant-obj.getFirstInstant()+1 >= self.minFeatureTime: if self.useFeatures and obj.hasFeatures(): # get current features existing for the most time, sort on first instant of feature and take n first - pass + firstInstants = [(f,f.getFirstInstant()) for f in obj.getFeatures() if f.existsAtInstant(instant)] + firstInstants.sort(key = lambda t: t[1]) + for f,t1 in firstInstants[:min(self.nPredictedTrajectories, len(firstInstants))]: + self.addPredictedTrajectories(predictedTrajectories, f, instant) else: - if not hasattr(obj, 'prototypeSimilarities'): - obj.prototypeSimilarities = [] - for proto in self.prototypes: - self.lcss.similarities(proto.getMovingObject().getPositions().asArray().T, obj.getPositions().asArray().T) - similarities = self.lcss.similarityTable[-1, :-1].astype(float) - obj.prototypeSimilarities.append(similarities/np.minimum(np.arange(1.,len(similarities)+1), proto.getMovingObject().length()*np.ones(len(similarities)))) - for proto, similarities in zip(self.prototypes, obj.prototypeSimilarities): - if similarities[instant-obj.getFirstInstant()] >= self.minSimilarity: - initialPosition = obj.getPositionAtInstant(instant) - initialVelocity = obj.getVelocityAtInstant(instant) - predictedTrajectories.append(PredictedTrajectoryPrototype(initialPosition, initialVelocity, proto.getMovingObject(), constantSpeed = self.constantSpeed, probability = proto.getNMatchings())) + self.addPredictedTrajectories(predictedTrajectories, obj, instant) return predictedTrajectories if __name__ == "__main__":
--- a/python/tests/prediction.txt Thu Jul 20 12:12:28 2017 -0400 +++ b/python/tests/prediction.txt Thu Jul 20 14:29:46 2017 -0400 @@ -54,7 +54,7 @@ >>> for t in xrange(int(proto.length())): x=et.predictPosition(t) >>> traj = et.getPredictedTrajectory() >>> traj.computeCumulativeDistances() ->>> (absolute(array(traj.distances) - et.initialSpeed) < 1e-5).all() +>>> absolute(array(traj.distances).mean() - et.initialSpeed < 1e-3) True >>> et = PredictedTrajectoryPrototype(proto.getPositionAt(10)+moving.Point(0.6, 0.6), proto.getVelocityAt(10)*0.7, proto, False)
--- a/scripts/safety-analysis.py Thu Jul 20 12:12:28 2017 -0400 +++ b/scripts/safety-analysis.py Thu Jul 20 14:29:46 2017 -0400 @@ -75,7 +75,7 @@ interactions = events.createInteractions(objects) for inter in interactions: inter.computeIndicators() - inter.computeCrossingsCollisions(predictionParameters, params.collisionDistance, params.predictionTimeHorizon, params.crossingZones, nProcesses = args.nProcesses, debug = True) +# inter.computeCrossingsCollisions(predictionParameters, params.collisionDistance, params.predictionTimeHorizon, params.crossingZones, nProcesses = args.nProcesses, debug = True) if args.computePET: for inter in interactions: