Mercurial Hosting > traffic-intelligence
view python/prediction.py @ 962:64259b9885bf
verbose option to print classification information (more to add)
author | Nicolas Saunier <nicolas.saunier@polymtl.ca> |
---|---|
date | Mon, 06 Nov 2017 21:25:41 -0500 |
parents | d6c1c05d11f5 |
children | 668a85c963c3 |
line wrap: on
line source
#! /usr/bin/env python '''Library for motion prediction methods''' import moving from utils import LCSS import math, random from copy import copy import numpy as np #from multiprocessing import Pool class PredictedTrajectory(object): '''Class for predicted trajectories with lazy evaluation if the predicted position has not been already computed, compute it it should also have a probability''' def __init__(self): self.probability = 0. self.predictedPositions = {} self.predictedSpeedOrientations = {} #self.collisionPoints = {} #self.crossingZones = {} def predictPosition(self, nTimeSteps): if nTimeSteps > 0 and not nTimeSteps in self.predictedPositions.keys(): self.predictPosition(nTimeSteps-1) self.predictedPositions[nTimeSteps], self.predictedSpeedOrientations[nTimeSteps] = moving.predictPosition(self.predictedPositions[nTimeSteps-1], self.predictedSpeedOrientations[nTimeSteps-1], self.getControl(), self.maxSpeed) return self.predictedPositions[nTimeSteps] def getPredictedTrajectory(self): return moving.Trajectory.fromPointList(self.predictedPositions.values()) def getPredictedSpeeds(self): return [so.norm for so in self.predictedSpeedOrientations.values()] def plot(self, options = '', withOrigin = False, timeStep = 1, **kwargs): self.getPredictedTrajectory().plot(options, withOrigin, timeStep, **kwargs) class PredictedTrajectoryConstant(PredictedTrajectory): '''Predicted trajectory at constant speed or acceleration TODO generalize by passing a series of velocities/accelerations''' def __init__(self, initialPosition, initialVelocity, control = moving.NormAngle(0,0), probability = 1., maxSpeed = None): self.control = control self.maxSpeed = maxSpeed self.probability = probability self.predictedPositions = {0: initialPosition} self.predictedSpeedOrientations = {0: moving.NormAngle.fromPoint(initialVelocity)} def getControl(self): return self.control class PredictedTrajectoryPrototype(PredictedTrajectory): '''Predicted trajectory that follows a prototype trajectory The prototype is in the format of a moving.Trajectory: it could be 1. an observed trajectory (extracted from video) 2. a generic polyline (eg the road centerline) that a vehicle is supposed to follow Prediction can be done 1. at constant speed (the instantaneous user speed) 2. following the trajectory path, at the speed of the user (applying a constant ratio equal to the ratio of the user instantaneous speed and the trajectory closest speed)''' def __init__(self, initialPosition, initialVelocity, prototype, constantSpeed = False, nFramesIgnore = 3, probability = 1.): ''' prototype is a MovingObject Prediction at constant speed will not work for unrealistic trajectories that do not follow a slowly changing velocity (eg moving object trajectories, but is good for realistic motion (eg features)''' self.prototype = prototype self.constantSpeed = constantSpeed self.nFramesIgnore = nFramesIgnore self.probability = probability self.predictedPositions = {0: initialPosition} self.closestPointIdx = prototype.getPositions().getClosestPoint(initialPosition) self.deltaPosition = initialPosition-prototype.getPositionAt(self.closestPointIdx) #should be computed in relative coordinates to position self.theta = prototype.getVelocityAt(self.closestPointIdx).angle() self.initialSpeed = initialVelocity.norm2() if not constantSpeed: self.ratio = self.initialSpeed/prototype.getVelocityAt(self.closestPointIdx).norm2() def predictPosition(self, nTimeSteps): if nTimeSteps > 0 and not nTimeSteps in self.predictedPositions.keys(): deltaPosition = copy(self.deltaPosition) if self.constantSpeed: traj = self.prototype.getPositions() trajLength = traj.length() traveledDistance = nTimeSteps*self.initialSpeed + traj.getCumulativeDistance(self.closestPointIdx) i = self.closestPointIdx while i < trajLength and traj.getCumulativeDistance(i) < traveledDistance: i += 1 if i == trajLength: v = self.prototype.getVelocityAt(-1-self.nFramesIgnore) self.predictedPositions[nTimeSteps] = deltaPosition.rotate(v.angle()-self.theta)+traj[i-1]+v*((traveledDistance-traj.getCumulativeDistance(i-1))/v.norm2()) else: v = self.prototype.getVelocityAt(min(i-1, int(self.prototype.length())-1-self.nFramesIgnore)) self.predictedPositions[nTimeSteps] = deltaPosition.rotate(v.angle()-self.theta)+traj[i-1]+(traj[i]-traj[i-1])*((traveledDistance-traj.getCumulativeDistance(i-1))/traj.getDistance(i-1)) else: traj = self.prototype.getPositions() trajLength = traj.length() nSteps = self.ratio*nTimeSteps+self.closestPointIdx i = int(np.floor(nSteps)) if nSteps < trajLength-1: v = self.prototype.getVelocityAt(min(i, int(self.prototype.length())-1-self.nFramesIgnore)) self.predictedPositions[nTimeSteps] = deltaPosition.rotate(v.angle()-self.theta)+traj[i]+(traj[i+1]-traj[i])*(nSteps-i) else: v = self.prototype.getVelocityAt(-1-self.nFramesIgnore) self.predictedPositions[nTimeSteps] = deltaPosition.rotate(v.angle()-self.theta)+traj[-1]+v*(nSteps-trajLength+1) return self.predictedPositions[nTimeSteps] class PredictedTrajectoryRandomControl(PredictedTrajectory): '''Random vehicle control: suitable for normal adaptation''' def __init__(self, initialPosition, initialVelocity, accelerationDistribution, steeringDistribution, probability = 1., maxSpeed = None): '''Constructor accelerationDistribution and steeringDistribution are distributions that return random numbers drawn from them''' self.accelerationDistribution = accelerationDistribution self.steeringDistribution = steeringDistribution self.maxSpeed = maxSpeed self.probability = probability self.predictedPositions = {0: initialPosition} self.predictedSpeedOrientations = {0: moving.NormAngle.fromPoint(initialVelocity)} def getControl(self): return moving.NormAngle(self.accelerationDistribution(),self.steeringDistribution()) class SafetyPoint(moving.Point): '''Can represent a collision point or crossing zone with respective safety indicator, TTC or pPET''' def __init__(self, p, probability = 1., indicator = -1): self.x = p.x self.y = p.y self.probability = probability self.indicator = indicator def __str__(self): return '{0} {1} {2} {3}'.format(self.x, self.y, self.probability, self.indicator) @staticmethod def save(out, points, predictionInstant, objNum1, objNum2): for p in points: out.write('{0} {1} {2} {3}\n'.format(objNum1, objNum2, predictionInstant, p)) @staticmethod def computeExpectedIndicator(points): return np.sum([p.indicator*p.probability for p in points])/sum([p.probability for p in points]) def computeCollisionTime(predictedTrajectory1, predictedTrajectory2, collisionDistanceThreshold, timeHorizon): '''Computes the first instant at which two predicted trajectories are within some distance threshold Computes all the times including timeHorizon User has to check the first variable collision to know about a collision''' t = 1 p1 = predictedTrajectory1.predictPosition(t) p2 = predictedTrajectory2.predictPosition(t) collision = (p1-p2).norm2() <= collisionDistanceThreshold while t < timeHorizon and not collision: t += 1 p1 = predictedTrajectory1.predictPosition(t) p2 = predictedTrajectory2.predictPosition(t) collision = (p1-p2).norm2() <= collisionDistanceThreshold return collision, t, p1, p2 def savePredictedTrajectoriesFigure(currentInstant, obj1, obj2, predictedTrajectories1, predictedTrajectories2, timeHorizon, printFigure = True): from matplotlib.pyplot import figure, axis, title, clf, savefig if printFigure: clf() else: figure() for et in predictedTrajectories1: for t in xrange(int(np.round(timeHorizon))): et.predictPosition(t) et.plot('rx') for et in predictedTrajectories2: for t in xrange(int(np.round(timeHorizon))): et.predictPosition(t) et.plot('bx') obj1.plot('r', withOrigin = True) obj2.plot('b', withOrigin = True) title('instant {0}'.format(currentInstant)) axis('equal') if printFigure: savefig('predicted-trajectories-t-{0}.png'.format(currentInstant)) def calculateProbability(nMatching,similarity,objects): sumFrequencies=sum([nMatching[p] for p in similarity.keys()]) prototypeProbability={} for i in similarity.keys(): prototypeProbability[i]= similarity[i] * float(nMatching[i])/sumFrequencies sumProbabilities= sum([prototypeProbability[p] for p in prototypeProbability.keys()]) probabilities={} for i in prototypeProbability.keys(): probabilities[objects[i]]= float(prototypeProbability[i])/sumProbabilities return probabilities def findPrototypes(prototypes,nMatching,objects,route,partialObjPositions,noiseEntryNums,noiseExitNums,minSimilarity=0.1,mostMatched=None,spatialThreshold=1.0, delta=180): ''' behaviour prediction first step''' if route[0] not in noiseEntryNums: prototypesRoutes= [ x for x in sorted(prototypes.keys()) if route[0]==x[0]] elif route[1] not in noiseExitNums: prototypesRoutes=[ x for x in sorted(prototypes.keys()) if route[1]==x[1]] else: prototypesRoutes=[x for x in sorted(prototypes.keys())] lcss = LCSS(similarityFunc=lambda x,y: (distanceForLCSS(x,y) <= spatialThreshold),delta=delta) similarity={} for y in prototypesRoutes: if y in prototypes.keys(): prototypesIDs=prototypes[y] for x in prototypesIDs: s=lcss.computeNormalized(partialObjPositions, objects[x].positions) if s >= minSimilarity: similarity[x]=s if mostMatched==None: probabilities= calculateProbability(nMatching,similarity,objects) return probabilities else: mostMatchedValues=sorted(similarity.values(),reverse=True)[:mostMatched] keys=[k for k in similarity.keys() if similarity[k] in mostMatchedValues] newSimilarity={} for i in keys: newSimilarity[i]=similarity[i] probabilities= calculateProbability(nMatching,newSimilarity,objects) return probabilities def findPrototypesSpeed(prototypes,secondStepPrototypes,nMatching,objects,route,partialObjPositions,noiseEntryNums,noiseExitNums,minSimilarity=0.1,mostMatched=None,useDestination=True,spatialThreshold=1.0, delta=180): if useDestination: prototypesRoutes=[route] else: if route[0] not in noiseEntryNums: prototypesRoutes= [ x for x in sorted(prototypes.keys()) if route[0]==x[0]] elif route[1] not in noiseExitNums: prototypesRoutes=[ x for x in sorted(prototypes.keys()) if route[1]==x[1]] else: prototypesRoutes=[x for x in sorted(prototypes.keys())] lcss = LCSS(similarityFunc=lambda x,y: (distanceForLCSS(x,y) <= spatialThreshold),delta=delta) similarity={} for y in prototypesRoutes: if y in prototypes.keys(): prototypesIDs=prototypes[y] for x in prototypesIDs: s=lcss.computeNormalized(partialObjPositions, objects[x].positions) if s >= minSimilarity: similarity[x]=s newSimilarity={} for i in similarity.keys(): if i in secondStepPrototypes.keys(): for j in secondStepPrototypes[i]: newSimilarity[j]=similarity[i] probabilities= calculateProbability(nMatching,newSimilarity,objects) return probabilities def getPrototypeTrajectory(obj,route,currentInstant,prototypes,secondStepPrototypes,nMatching,objects,noiseEntryNums,noiseExitNums,minSimilarity=0.1,mostMatched=None,useDestination=True,useSpeedPrototype=True): partialInterval=moving.Interval(obj.getFirstInstant(),currentInstant) partialObjPositions= obj.getObjectInTimeInterval(partialInterval).positions if useSpeedPrototype: prototypeTrajectories=findPrototypesSpeed(prototypes,secondStepPrototypes,nMatching,objects,route,partialObjPositions,noiseEntryNums,noiseExitNums,minSimilarity,mostMatched,useDestination) else: prototypeTrajectories=findPrototypes(prototypes,nMatching,objects,route,partialObjPositions,noiseEntryNums,noiseExitNums,minSimilarity,mostMatched) return prototypeTrajectories def computeCrossingsCollisionsAtInstant(predictionParams,currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False): '''returns the lists of collision points and crossing zones''' predictedTrajectories1 = predictionParams.generatePredictedTrajectories(obj1, currentInstant) predictedTrajectories2 = predictionParams.generatePredictedTrajectories(obj2, currentInstant) collisionPoints = [] crossingZones = [] for et1 in predictedTrajectories1: for et2 in predictedTrajectories2: collision, t, p1, p2 = computeCollisionTime(et1, et2, collisionDistanceThreshold, timeHorizon) if collision: collisionPoints.append(SafetyPoint((p1+p2)*0.5, et1.probability*et2.probability, t)) elif computeCZ: # check if there is a crossing zone # TODO same computation as PET with metric + concatenate past trajectory with future trajectory cz = None t1 = 0 while not cz and t1 < timeHorizon: # t1 <= timeHorizon-1 t2 = 0 while not cz and t2 < timeHorizon: cz = moving.segmentIntersection(et1.predictPosition(t1), et1.predictPosition(t1+1), et2.predictPosition(t2), et2.predictPosition(t2+1)) if cz is not None: deltaV= (et1.predictPosition(t1)- et1.predictPosition(t1+1) - et2.predictPosition(t2)+ et2.predictPosition(t2+1)).norm2() crossingZones.append(SafetyPoint(cz, et1.probability*et2.probability, abs(t1-t2)-(float(collisionDistanceThreshold)/deltaV))) t2 += 1 t1 += 1 if debug: savePredictedTrajectoriesFigure(currentInstant, obj1, obj2, predictedTrajectories1, predictedTrajectories2, timeHorizon) return currentInstant, collisionPoints, crossingZones class PredictionParameters(object): def __init__(self, name, maxSpeed): self.name = name self.maxSpeed = maxSpeed def __str__(self): return '{0} {1}'.format(self.name, self.maxSpeed) def generatePredictedTrajectories(self, obj, instant): return [] def computeCrossingsCollisions(self, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None):#, nProcesses = 1): '''Computes all crossing and collision points at each common instant for two road users. ''' collisionPoints={} crossingZones={} if timeInterval: commonTimeInterval = timeInterval else: commonTimeInterval = obj1.commonTimeInterval(obj2) #if nProcesses == 1: for i in list(commonTimeInterval)[:-1]: # do not look at the 1 last position/velocities, often with errors i, cp, cz = computeCrossingsCollisionsAtInstant(self, i, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ, debug) if len(cp) != 0: collisionPoints[i] = cp if len(cz) != 0: crossingZones[i] = cz # else: # pool = Pool(processes = nProcesses) # jobs = [pool.apply_async(computeCrossingsCollisionsAtInstant, args = (self, i, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ, debug)) for i in list(commonTimeInterval)[:-1]] # for j in jobs: # i, cp, cz = j.get() # if len(cp) != 0: # collisionPoints[i] = cp # if len(cz) != 0: # crossingZones[i] = cz # pool.close() return collisionPoints, crossingZones def computeCollisionProbability(self, obj1, obj2, collisionDistanceThreshold, timeHorizon, debug = False, timeInterval = None): '''Computes only collision probabilities Returns for each instant the collision probability and number of samples drawn''' collisionProbabilities = {} if timeInterval: commonTimeInterval = timeInterval else: commonTimeInterval = obj1.commonTimeInterval(obj2) for i in list(commonTimeInterval)[:-1]: nCollisions = 0 predictedTrajectories1 = self.generatePredictedTrajectories(obj1, i) predictedTrajectories2 = self.generatePredictedTrajectories(obj2, i) for et1 in predictedTrajectories1: for et2 in predictedTrajectories2: collision, t, p1, p2 = computeCollisionTime(et1, et2, collisionDistanceThreshold, timeHorizon) if collision: nCollisions += 1 # take into account probabilities ?? nSamples = float(len(predictedTrajectories1)*len(predictedTrajectories2)) collisionProbabilities[i] = [nSamples, float(nCollisions)/nSamples] if debug: savePredictedTrajectoriesFigure(i, obj1, obj2, predictedTrajectories1, predictedTrajectories2, timeHorizon) return collisionProbabilities class ConstantPredictionParameters(PredictionParameters): def __init__(self, maxSpeed): PredictionParameters.__init__(self, 'constant velocity', maxSpeed) def generatePredictedTrajectories(self, obj, instant): return [PredictedTrajectoryConstant(obj.getPositionAtInstant(instant), obj.getVelocityAtInstant(instant), maxSpeed = self.maxSpeed)] class NormalAdaptationPredictionParameters(PredictionParameters): def __init__(self, maxSpeed, nPredictedTrajectories, accelerationDistribution, steeringDistribution, useFeatures = False): '''An example of acceleration and steering distributions is lambda: random.triangular(-self.maxAcceleration, self.maxAcceleration, 0.) ''' if useFeatures: name = 'point set normal adaptation' else: name = 'normal adaptation' PredictionParameters.__init__(self, name, maxSpeed) self.nPredictedTrajectories = nPredictedTrajectories self.useFeatures = useFeatures self.accelerationDistribution = accelerationDistribution self.steeringDistribution = steeringDistribution def __str__(self): return PredictionParameters.__str__(self)+' {0} {1} {2}'.format(self.nPredictedTrajectories, self.maxAcceleration, self.maxSteering) def generatePredictedTrajectories(self, obj, instant): predictedTrajectories = [] if self.useFeatures and obj.hasFeatures(): features = [f for f in obj.getFeatures() if f.existsAtInstant(instant)] positions = [f.getPositionAtInstant(instant) for f in features] velocities = [f.getVelocityAtInstant(instant) for f in features] else: positions = [obj.getPositionAtInstant(instant)] velocities = [obj.getVelocityAtInstant(instant)] probability = 1./float(len(positions)*self.nPredictedTrajectories) for i in xrange(self.nPredictedTrajectories): for initialPosition,initialVelocity in zip(positions, velocities): predictedTrajectories.append(PredictedTrajectoryRandomControl(initialPosition, initialVelocity, self.accelerationDistribution, self.steeringDistribution, probability, maxSpeed = self.maxSpeed)) return predictedTrajectories class PointSetPredictionParameters(PredictionParameters): def __init__(self, maxSpeed): PredictionParameters.__init__(self, 'point set', maxSpeed) def generatePredictedTrajectories(self, obj, instant): predictedTrajectories = [] if obj.hasFeatures(): features = [f for f in obj.getFeatures() if f.existsAtInstant(instant)] positions = [f.getPositionAtInstant(instant) for f in features] velocities = [f.getVelocityAtInstant(instant) for f in features] probability = 1./float(len(positions)) for initialPosition,initialVelocity in zip(positions, velocities): predictedTrajectories.append(PredictedTrajectoryConstant(initialPosition, initialVelocity, probability = probability, maxSpeed = self.maxSpeed)) return predictedTrajectories else: print('Object {} has no features'.format(obj.getNum())) return None class EvasiveActionPredictionParameters(PredictionParameters): def __init__(self, maxSpeed, nPredictedTrajectories, accelerationDistribution, steeringDistribution, useFeatures = False): '''Suggested acceleration distribution may not be symmetric, eg lambda: random.triangular(self.minAcceleration, self.maxAcceleration, 0.)''' if useFeatures: name = 'point set evasive action' else: name = 'evasive action' PredictionParameters.__init__(self, name, maxSpeed) self.nPredictedTrajectories = nPredictedTrajectories self.useFeatures = useFeatures self.accelerationDistribution = accelerationDistribution self.steeringDistribution = steeringDistribution def __str__(self): return PredictionParameters.__str__(self)+' {0} {1} {2} {3}'.format(self.nPredictedTrajectories, self.minAcceleration, self.maxAcceleration, self.maxSteering) def generatePredictedTrajectories(self, obj, instant): predictedTrajectories = [] if self.useFeatures and obj.hasFeatures(): features = [f for f in obj.getFeatures() if f.existsAtInstant(instant)] positions = [f.getPositionAtInstant(instant) for f in features] velocities = [f.getVelocityAtInstant(instant) for f in features] else: positions = [obj.getPositionAtInstant(instant)] velocities = [obj.getVelocityAtInstant(instant)] probability = 1./float(self.nPredictedTrajectories) for i in xrange(self.nPredictedTrajectories): for initialPosition,initialVelocity in zip(positions, velocities): predictedTrajectories.append(PredictedTrajectoryConstant(initialPosition, initialVelocity, moving.NormAngle(self.accelerationDistribution(), self.steeringDistribution()), probability, self.maxSpeed)) return predictedTrajectories class CVDirectPredictionParameters(PredictionParameters): '''Prediction parameters of prediction at constant velocity using direct computation of the intersecting point Warning: the computed time to collision may be higher than timeHorizon (not used)''' def __init__(self): PredictionParameters.__init__(self, 'constant velocity (direct computation)', None) def computeCrossingsCollisionsAtInstant(self, currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, *kwargs): collisionPoints = [] crossingZones = [] p1 = obj1.getPositionAtInstant(currentInstant) p2 = obj2.getPositionAtInstant(currentInstant) if (p1-p2).norm2() <= collisionDistanceThreshold: collisionPoints = [SafetyPoint((p1+p1)*0.5, 1., 0.)] else: v1 = obj1.getVelocityAtInstant(currentInstant) v2 = obj2.getVelocityAtInstant(currentInstant) intersection = moving.intersection(p1, p1+v1, p2, p2+v2) if intersection is not None: dp1 = intersection-p1 dp2 = intersection-p2 dot1 = moving.Point.dot(dp1, v1) dot2 = moving.Point.dot(dp2, v2) if (computeCZ and (dot1 > 0 or dot2 > 0)) or (dot1 > 0 and dot2 > 0): # if the road users are moving towards the intersection or if computing pPET dist1 = dp1.norm2() dist2 = dp2.norm2() s1 = math.copysign(v1.norm2(), dot1) s2 = math.copysign(v2.norm2(), dot2) halfCollisionDistanceThreshold = collisionDistanceThreshold/2. timeInterval1 = moving.TimeInterval(max(0,dist1-halfCollisionDistanceThreshold)/s1, (dist1+halfCollisionDistanceThreshold)/s1) timeInterval2 = moving.TimeInterval(max(0,dist2-halfCollisionDistanceThreshold)/s2, (dist2+halfCollisionDistanceThreshold)/s2) collisionTimeInterval = moving.TimeInterval.intersection(timeInterval1, timeInterval2) if collisionTimeInterval.empty(): if computeCZ: crossingZones = [SafetyPoint(intersection, 1., timeInterval1.distance(timeInterval2))] else: collisionPoints = [SafetyPoint(intersection, 1., collisionTimeInterval.center())] if debug and intersection is not None: from matplotlib.pyplot import plot, figure, axis, title figure() plot([p1.x, intersection.x], [p1.y, intersection.y], 'r') plot([p2.x, intersection.x], [p2.y, intersection.y], 'b') intersection.plot() obj1.plot('r') obj2.plot('b') title('instant {0}'.format(currentInstant)) axis('equal') return currentInstant, collisionPoints, crossingZones class CVExactPredictionParameters(PredictionParameters): '''Prediction parameters of prediction at constant velocity using direct computation of the intersecting point (solving the equation) Warning: the computed time to collision may be higher than timeHorizon (not used)''' def __init__(self): PredictionParameters.__init__(self, 'constant velocity (direct exact computation)', None) def computeCrossingsCollisionsAtInstant(self, currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, *kwargs): 'TODO compute pPET' collisionPoints = [] crossingZones = [] p1 = obj1.getPositionAtInstant(currentInstant) p2 = obj2.getPositionAtInstant(currentInstant) v1 = obj1.getVelocityAtInstant(currentInstant) v2 = obj2.getVelocityAtInstant(currentInstant) intersection = moving.intersection(p1, p1+v1, p2, p2+v2) if intersection is not None: ttc = moving.Point.timeToCollision(p1, p2, v1, v2, collisionDistanceThreshold) if ttc is not None: collisionPoints = [SafetyPoint(intersection, 1., ttc)] else: pass # compute pPET return currentInstant, collisionPoints, crossingZones class PrototypePredictionParameters(PredictionParameters): def __init__(self, prototypes, nPredictedTrajectories, pointSimilarityDistance, minSimilarity, lcssMetric = 'cityblock', minFeatureTime = 10, constantSpeed = False, useFeatures = True): PredictionParameters.__init__(self, 'prototypes', None) self.prototypes = prototypes self.nPredictedTrajectories = nPredictedTrajectories self.lcss = LCSS(metric = lcssMetric, epsilon = pointSimilarityDistance) self.minSimilarity = minSimilarity self.minFeatureTime = minFeatureTime self.constantSpeed = constantSpeed self.useFeatures = useFeatures def getLcss(self): return self.lcss def addPredictedTrajectories(self, predictedTrajectories, obj, instant): obj.computeTrajectorySimilarities(self.prototypes, self.lcss) for proto, similarities in zip(self.prototypes, obj.prototypeSimilarities): if similarities[instant-obj.getFirstInstant()] >= self.minSimilarity: initialPosition = obj.getPositionAtInstant(instant) initialVelocity = obj.getVelocityAtInstant(instant) predictedTrajectories.append(PredictedTrajectoryPrototype(initialPosition, initialVelocity, proto.getMovingObject(), constantSpeed = self.constantSpeed, probability = proto.getNMatchings())) def generatePredictedTrajectories(self, obj, instant): predictedTrajectories = [] if instant-obj.getFirstInstant()+1 >= self.minFeatureTime: if self.useFeatures and obj.hasFeatures(): if not hasattr(obj, 'currentPredictionFeatures'): obj.currentPredictionFeatures = [] else: obj.currentPredictionFeatures[:] = [f for f in obj.currentPredictionFeatures if f.existsAtInstant(instant)] firstInstants = [(f,f.getFirstInstant()) for f in obj.getFeatures() if f.existsAtInstant(instant) and f not in obj.currentPredictionFeatures] firstInstants.sort(key = lambda t: t[1]) for f,t1 in firstInstants[:min(self.nPredictedTrajectories, len(firstInstants), self.nPredictedTrajectories-len(obj.currentPredictionFeatures))]: obj.currentPredictionFeatures.append(f) for f in obj.currentPredictionFeatures: self.addPredictedTrajectories(predictedTrajectories, f, instant) else: self.addPredictedTrajectories(predictedTrajectories, obj, instant) return predictedTrajectories if __name__ == "__main__": import doctest import unittest suite = doctest.DocFileSuite('tests/prediction.txt') #suite = doctest.DocTestSuite() unittest.TextTestRunner().run(suite) #doctest.testmod() #doctest.testfile("example.txt")