diff trafficintelligence/prediction.py @ 1028:cc5cb04b04b0

major update using the trafficintelligence package name and install through pip
author Nicolas Saunier <nicolas.saunier@polymtl.ca>
date Fri, 15 Jun 2018 11:19:10 -0400
parents python/prediction.py@933670761a57
children c6cf75a2ed08
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/trafficintelligence/prediction.py	Fri Jun 15 11:19:10 2018 -0400
@@ -0,0 +1,598 @@
+#! /usr/bin/env python
+'''Library for motion prediction methods'''
+
+import moving
+from utils import LCSS
+
+import math, random
+from copy import copy
+import numpy as np
+#from multiprocessing import Pool
+
+
+class PredictedTrajectory(object):
+    '''Class for predicted trajectories with lazy evaluation
+    if the predicted position has not been already computed, compute it
+
+    it should also have a probability'''
+
+    def __init__(self):
+        self.probability = 0.
+        self.predictedPositions = {}
+        self.predictedSpeedOrientations = {}
+        #self.collisionPoints = {}
+        #self.crossingZones = {}
+
+    def predictPosition(self, nTimeSteps):
+        if nTimeSteps > 0 and not nTimeSteps in self.predictedPositions:
+            self.predictPosition(nTimeSteps-1)
+            self.predictedPositions[nTimeSteps], self.predictedSpeedOrientations[nTimeSteps] = moving.predictPosition(self.predictedPositions[nTimeSteps-1], self.predictedSpeedOrientations[nTimeSteps-1], self.getControl(), self.maxSpeed)
+        return self.predictedPositions[nTimeSteps]
+
+    def getPredictedTrajectory(self):
+        return moving.Trajectory.fromPointList(list(self.predictedPositions.values()))
+
+    def getPredictedSpeeds(self):
+        return [so.norm for so in self.predictedSpeedOrientations.values()]
+
+    def plot(self, options = '', withOrigin = False, timeStep = 1, **kwargs):
+        self.getPredictedTrajectory().plot(options, withOrigin, timeStep, **kwargs)
+
+class PredictedTrajectoryConstant(PredictedTrajectory):
+    '''Predicted trajectory at constant speed or acceleration
+    TODO generalize by passing a series of velocities/accelerations'''
+
+    def __init__(self, initialPosition, initialVelocity, control = moving.NormAngle(0,0), probability = 1., maxSpeed = None):
+        self.control = control
+        self.maxSpeed = maxSpeed
+        self.probability = probability
+        self.predictedPositions = {0: initialPosition}
+        self.predictedSpeedOrientations = {0: moving.NormAngle.fromPoint(initialVelocity)}
+
+    def getControl(self):
+        return self.control
+
+class PredictedTrajectoryPrototype(PredictedTrajectory):
+    '''Predicted trajectory that follows a prototype trajectory
+    The prototype is in the format of a moving.Trajectory: it could be
+    1. an observed trajectory (extracted from video)
+    2. a generic polyline (eg the road centerline) that a vehicle is supposed to follow
+
+    Prediction can be done
+    1. at constant speed (the instantaneous user speed)
+    2. following the trajectory path, at the speed of the user
+    (applying a constant ratio equal 
+    to the ratio of the user instantaneous speed and the trajectory closest speed)'''
+
+    def __init__(self, initialPosition, initialVelocity, prototype, constantSpeed = False, nFramesIgnore = 3, probability = 1.):
+        ''' prototype is a MovingObject
+
+        Prediction at constant speed will not work for unrealistic trajectories 
+        that do not follow a slowly changing velocity (eg moving object trajectories, 
+        but is good for realistic motion (eg features)'''
+        self.prototype = prototype
+        self.constantSpeed = constantSpeed
+        self.nFramesIgnore = nFramesIgnore
+        self.probability = probability
+        self.predictedPositions = {0: initialPosition}
+        self.closestPointIdx = prototype.getPositions().getClosestPoint(initialPosition)
+        self.deltaPosition = initialPosition-prototype.getPositionAt(self.closestPointIdx) #should be computed in relative coordinates to position
+        self.theta = prototype.getVelocityAt(self.closestPointIdx).angle()
+        self.initialSpeed = initialVelocity.norm2()
+        if not constantSpeed:
+            self.ratio = self.initialSpeed/prototype.getVelocityAt(self.closestPointIdx).norm2()
+    
+    def predictPosition(self, nTimeSteps):
+        if nTimeSteps > 0 and not nTimeSteps in self.predictedPositions:
+            deltaPosition = copy(self.deltaPosition)
+            if self.constantSpeed:
+                traj = self.prototype.getPositions()
+                trajLength = traj.length()
+                traveledDistance = nTimeSteps*self.initialSpeed + traj.getCumulativeDistance(self.closestPointIdx)
+                i = self.closestPointIdx
+                while i < trajLength and traj.getCumulativeDistance(i) < traveledDistance:
+                    i += 1
+                if i == trajLength:
+                    v = self.prototype.getVelocityAt(-1-self.nFramesIgnore)
+                    self.predictedPositions[nTimeSteps] = deltaPosition.rotate(v.angle()-self.theta)+traj[i-1]+v*((traveledDistance-traj.getCumulativeDistance(i-1))/v.norm2())
+                else:
+                    v = self.prototype.getVelocityAt(min(i-1, int(self.prototype.length())-1-self.nFramesIgnore))
+                    self.predictedPositions[nTimeSteps] = deltaPosition.rotate(v.angle()-self.theta)+traj[i-1]+(traj[i]-traj[i-1])*((traveledDistance-traj.getCumulativeDistance(i-1))/traj.getDistance(i-1))
+            else:
+                traj = self.prototype.getPositions()
+                trajLength = traj.length()
+                nSteps = self.ratio*nTimeSteps+self.closestPointIdx
+                i = int(np.floor(nSteps))
+                if nSteps < trajLength-1:
+                    v = self.prototype.getVelocityAt(min(i, int(self.prototype.length())-1-self.nFramesIgnore))
+                    self.predictedPositions[nTimeSteps] = deltaPosition.rotate(v.angle()-self.theta)+traj[i]+(traj[i+1]-traj[i])*(nSteps-i)
+                else:
+                    v = self.prototype.getVelocityAt(-1-self.nFramesIgnore)
+                    self.predictedPositions[nTimeSteps] = deltaPosition.rotate(v.angle()-self.theta)+traj[-1]+v*(nSteps-trajLength+1)
+        return self.predictedPositions[nTimeSteps]
+
+class PredictedTrajectoryRandomControl(PredictedTrajectory):
+    '''Random vehicle control: suitable for normal adaptation'''
+    def __init__(self, initialPosition, initialVelocity, accelerationDistribution, steeringDistribution, probability = 1., maxSpeed = None):
+        '''Constructor
+        accelerationDistribution and steeringDistribution are distributions 
+        that return random numbers drawn from them'''
+        self.accelerationDistribution = accelerationDistribution
+        self.steeringDistribution = steeringDistribution
+        self.maxSpeed = maxSpeed
+        self.probability = probability
+        self.predictedPositions = {0: initialPosition}
+        self.predictedSpeedOrientations = {0: moving.NormAngle.fromPoint(initialVelocity)}
+
+    def getControl(self):
+        return moving.NormAngle(self.accelerationDistribution(),self.steeringDistribution())
+
+class SafetyPoint(moving.Point):
+    '''Can represent a collision point or crossing zone 
+    with respective safety indicator, TTC or pPET'''
+    def __init__(self, p, probability = 1., indicator = -1):
+        self.x = p.x
+        self.y = p.y
+        self.probability = probability
+        self.indicator = indicator
+
+    def __str__(self):
+        return '{0} {1} {2} {3}'.format(self.x, self.y, self.probability, self.indicator)
+
+    @staticmethod
+    def save(out, points, predictionInstant, objNum1, objNum2):
+        for p in points:
+            out.write('{0} {1} {2} {3}\n'.format(objNum1, objNum2, predictionInstant, p))
+
+    @staticmethod
+    def computeExpectedIndicator(points):
+        return np.sum([p.indicator*p.probability for p in points])/sum([p.probability for p in points])
+
+def computeCollisionTime(predictedTrajectory1, predictedTrajectory2, collisionDistanceThreshold, timeHorizon):
+    '''Computes the first instant 
+    at which two predicted trajectories are within some distance threshold
+    Computes all the times including timeHorizon
+    
+    User has to check the first variable collision to know about a collision'''
+    t = 1
+    p1 = predictedTrajectory1.predictPosition(t)
+    p2 = predictedTrajectory2.predictPosition(t)
+    collision = (p1-p2).norm2() <= collisionDistanceThreshold
+    while t < timeHorizon and not collision:
+        t += 1
+        p1 = predictedTrajectory1.predictPosition(t)
+        p2 = predictedTrajectory2.predictPosition(t)
+        collision = (p1-p2).norm2() <= collisionDistanceThreshold
+    return collision, t, p1, p2
+
+def savePredictedTrajectoriesFigure(currentInstant, obj1, obj2, predictedTrajectories1, predictedTrajectories2, timeHorizon, printFigure = True):
+    from matplotlib.pyplot import figure, axis, title, clf, savefig
+    if printFigure:
+        clf()
+    else:
+        figure()
+    for et in predictedTrajectories1:
+        for t in range(int(np.round(timeHorizon))):
+            et.predictPosition(t)
+            et.plot('rx')
+    for et in predictedTrajectories2:
+        for t in range(int(np.round(timeHorizon))):
+            et.predictPosition(t)
+            et.plot('bx')
+    obj1.plot('r', withOrigin = True)
+    obj2.plot('b', withOrigin = True)
+    title('instant {0}'.format(currentInstant))
+    axis('equal')
+    if printFigure:
+        savefig('predicted-trajectories-t-{0}.png'.format(currentInstant))
+
+def calculateProbability(nMatching,similarity,objects):
+    sumFrequencies=sum([nMatching[p] for p in similarity])
+    prototypeProbability={}
+    for i in similarity:
+        prototypeProbability[i]= similarity[i] * float(nMatching[i])/sumFrequencies
+    sumProbabilities= sum([prototypeProbability[p] for p in prototypeProbability])
+    probabilities={}
+    for i in prototypeProbability:
+        probabilities[objects[i]]= float(prototypeProbability[i])/sumProbabilities
+    return probabilities
+
+def findPrototypes(prototypes,nMatching,objects,route,partialObjPositions,noiseEntryNums,noiseExitNums,minSimilarity=0.1,mostMatched=None,spatialThreshold=1.0, delta=180):
+    ''' behaviour prediction first step'''
+    if route[0] not in noiseEntryNums: 
+        prototypesRoutes= [ x for x in sorted(prototypes.keys()) if route[0]==x[0]]
+    elif route[1] not in noiseExitNums:
+        prototypesRoutes=[ x for x in sorted(prototypes.keys()) if route[1]==x[1]]
+    else:
+        prototypesRoutes=[x for x in sorted(prototypes.keys())]
+    lcss = LCSS(similarityFunc=lambda x,y: (distanceForLCSS(x,y) <= spatialThreshold),delta=delta)
+    similarity={}
+    for y in prototypesRoutes: 
+        if y in prototypes:
+            prototypesIDs=prototypes[y]            
+            for x in prototypesIDs:
+                s=lcss.computeNormalized(partialObjPositions, objects[x].positions)
+                if s >= minSimilarity:
+                    similarity[x]=s
+    
+    if mostMatched==None:
+        probabilities= calculateProbability(nMatching,similarity,objects)        
+        return probabilities
+    else:
+        mostMatchedValues=sorted(similarity.values(),reverse=True)[:mostMatched]
+        keys=[k for k in similarity if similarity[k] in mostMatchedValues]
+        newSimilarity={}
+        for i in keys:
+            newSimilarity[i]=similarity[i]
+        probabilities= calculateProbability(nMatching,newSimilarity,objects)        
+        return probabilities        
+        
+def findPrototypesSpeed(prototypes,secondStepPrototypes,nMatching,objects,route,partialObjPositions,noiseEntryNums,noiseExitNums,minSimilarity=0.1,mostMatched=None,useDestination=True,spatialThreshold=1.0, delta=180):
+    if useDestination:
+        prototypesRoutes=[route]
+    else:
+        if route[0] not in noiseEntryNums: 
+            prototypesRoutes= [ x for x in sorted(prototypes.keys()) if route[0]==x[0]]
+        elif route[1] not in noiseExitNums:
+            prototypesRoutes=[ x for x in sorted(prototypes.keys()) if route[1]==x[1]]
+        else:
+            prototypesRoutes=[x for x in sorted(prototypes.keys())]
+    lcss = LCSS(similarityFunc=lambda x,y: (distanceForLCSS(x,y) <= spatialThreshold),delta=delta)
+    similarity={}
+    for y in prototypesRoutes: 
+        if y in prototypes:
+            prototypesIDs=prototypes[y]    
+            for x in prototypesIDs:
+                s=lcss.computeNormalized(partialObjPositions, objects[x].positions)
+                if s >= minSimilarity:
+                    similarity[x]=s
+    
+    newSimilarity={}
+    for i in similarity:
+        if i in secondStepPrototypes:
+            for j in secondStepPrototypes[i]:
+                newSimilarity[j]=similarity[i]
+    probabilities= calculateProbability(nMatching,newSimilarity,objects)        
+    return probabilities
+    
+def getPrototypeTrajectory(obj,route,currentInstant,prototypes,secondStepPrototypes,nMatching,objects,noiseEntryNums,noiseExitNums,minSimilarity=0.1,mostMatched=None,useDestination=True,useSpeedPrototype=True):
+    partialInterval=moving.Interval(obj.getFirstInstant(),currentInstant)
+    partialObjPositions= obj.getObjectInTimeInterval(partialInterval).positions    
+    if useSpeedPrototype:
+        prototypeTrajectories=findPrototypesSpeed(prototypes,secondStepPrototypes,nMatching,objects,route,partialObjPositions,noiseEntryNums,noiseExitNums,minSimilarity,mostMatched,useDestination)
+    else:
+        prototypeTrajectories=findPrototypes(prototypes,nMatching,objects,route,partialObjPositions,noiseEntryNums,noiseExitNums,minSimilarity,mostMatched)
+    return prototypeTrajectories
+
+
+class PredictionParameters(object):
+    def __init__(self, name, maxSpeed):
+        self.name = name
+        self.maxSpeed = maxSpeed
+
+    def __str__(self):
+        return '{0} {1}'.format(self.name, self.maxSpeed)
+
+    def generatePredictedTrajectories(self, obj, instant):
+        return None
+
+    def computeCrossingsCollisionsAtInstant(self, currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False):
+        '''returns the lists of collision points and crossing zones'''
+        predictedTrajectories1 = self.generatePredictedTrajectories(obj1, currentInstant)
+        predictedTrajectories2 = self.generatePredictedTrajectories(obj2, currentInstant)
+
+        collisionPoints = []
+        if computeCZ:
+            crossingZones = []
+        else:
+            crossingZones = None
+        for et1 in predictedTrajectories1:
+            for et2 in predictedTrajectories2:
+                collision, t, p1, p2 = computeCollisionTime(et1, et2, collisionDistanceThreshold, timeHorizon)
+                if collision:
+                    collisionPoints.append(SafetyPoint((p1+p2)*0.5, et1.probability*et2.probability, t))
+                elif computeCZ: # check if there is a crossing zone
+                    # TODO same computation as PET with metric + concatenate past trajectory with future trajectory
+                    cz = None
+                    t1 = 0
+                    while not cz and t1 < timeHorizon: # t1 <= timeHorizon-1
+                        t2 = 0
+                        while not cz and t2 < timeHorizon:
+                            cz = moving.segmentIntersection(et1.predictPosition(t1), et1.predictPosition(t1+1), et2.predictPosition(t2), et2.predictPosition(t2+1))
+                            if cz is not None:
+                                deltaV= (et1.predictPosition(t1)- et1.predictPosition(t1+1) - et2.predictPosition(t2)+ et2.predictPosition(t2+1)).norm2()
+                                crossingZones.append(SafetyPoint(cz, et1.probability*et2.probability, abs(t1-t2)-(float(collisionDistanceThreshold)/deltaV)))
+                            t2 += 1
+                        t1 += 1                        
+
+        if debug:
+            savePredictedTrajectoriesFigure(currentInstant, obj1, obj2, predictedTrajectories1, predictedTrajectories2, timeHorizon)
+
+        return collisionPoints, crossingZones
+
+    def computeCrossingsCollisions(self, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None):#, nProcesses = 1):
+        '''Computes all crossing and collision points at each common instant for two road users. '''
+        collisionPoints = {}
+        if computeCZ:
+            crossingZones = {}
+        else:
+            crossingZones = None
+        if timeInterval is not None:
+            commonTimeInterval = timeInterval
+        else:
+            commonTimeInterval = obj1.commonTimeInterval(obj2)
+        #if nProcesses == 1:
+        for i in list(commonTimeInterval)[:-1]: # do not look at the 1 last position/velocities, often with errors
+            cp, cz = self.computeCrossingsCollisionsAtInstant(i, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ, debug)
+            if len(cp) != 0:
+                collisionPoints[i] = cp
+            if computeCZ and len(cz) != 0:
+                crossingZones[i] = cz
+        return collisionPoints, crossingZones
+
+    def computeCollisionProbability(self, obj1, obj2, collisionDistanceThreshold, timeHorizon, debug = False, timeInterval = None):
+        '''Computes only collision probabilities
+        Returns for each instant the collision probability and number of samples drawn'''
+        collisionProbabilities = {}
+        if timeInterval is not None:
+            commonTimeInterval = timeInterval
+        else:
+            commonTimeInterval = obj1.commonTimeInterval(obj2)
+        for i in list(commonTimeInterval)[:-1]:
+            nCollisions = 0
+            predictedTrajectories1 = self.generatePredictedTrajectories(obj1, i)
+            predictedTrajectories2 = self.generatePredictedTrajectories(obj2, i)
+            for et1 in predictedTrajectories1:
+                for et2 in predictedTrajectories2:
+                    collision, t, p1, p2 = computeCollisionTime(et1, et2, collisionDistanceThreshold, timeHorizon)
+                    if collision:
+                        nCollisions += 1
+            # take into account probabilities ??
+            nSamples = float(len(predictedTrajectories1)*len(predictedTrajectories2))
+            collisionProbabilities[i] = [nSamples, float(nCollisions)/nSamples]
+
+            if debug:
+                savePredictedTrajectoriesFigure(i, obj1, obj2, predictedTrajectories1, predictedTrajectories2, timeHorizon)
+
+        return collisionProbabilities
+
+class ConstantPredictionParameters(PredictionParameters):
+    def __init__(self, maxSpeed):
+        PredictionParameters.__init__(self, 'constant velocity', maxSpeed)
+
+    def generatePredictedTrajectories(self, obj, instant):
+        return [PredictedTrajectoryConstant(obj.getPositionAtInstant(instant), obj.getVelocityAtInstant(instant), maxSpeed = self.maxSpeed)]
+
+class NormalAdaptationPredictionParameters(PredictionParameters):
+    def __init__(self, maxSpeed, nPredictedTrajectories, accelerationDistribution, steeringDistribution, useFeatures = False):
+        '''An example of acceleration and steering distributions is
+        lambda: random.triangular(-self.maxAcceleration, self.maxAcceleration, 0.)
+        '''
+        if useFeatures:
+            name = 'point set normal adaptation'
+        else:
+            name = 'normal adaptation'
+        PredictionParameters.__init__(self, name, maxSpeed)
+        self.nPredictedTrajectories = nPredictedTrajectories
+        self.useFeatures = useFeatures
+        self.accelerationDistribution = accelerationDistribution
+        self.steeringDistribution = steeringDistribution
+        
+    def __str__(self):
+        return PredictionParameters.__str__(self)+' {0} {1} {2}'.format(self.nPredictedTrajectories, 
+                                                                        self.maxAcceleration, 
+                                                                        self.maxSteering)
+
+    def generatePredictedTrajectories(self, obj, instant):
+        predictedTrajectories = []
+        if self.useFeatures and obj.hasFeatures():
+            features = [f for f in obj.getFeatures() if f.existsAtInstant(instant)]
+            positions = [f.getPositionAtInstant(instant) for f in features]
+            velocities = [f.getVelocityAtInstant(instant) for f in features]
+        else:
+            positions = [obj.getPositionAtInstant(instant)]
+            velocities = [obj.getVelocityAtInstant(instant)]
+        probability = 1./float(len(positions)*self.nPredictedTrajectories)
+        for i in range(self.nPredictedTrajectories):
+            for initialPosition,initialVelocity in zip(positions, velocities):
+                predictedTrajectories.append(PredictedTrajectoryRandomControl(initialPosition, 
+                                                                              initialVelocity, 
+                                                                              self.accelerationDistribution, 
+                                                                              self.steeringDistribution, 
+                                                                              probability, 
+                                                                              maxSpeed = self.maxSpeed))
+        return predictedTrajectories
+
+class PointSetPredictionParameters(PredictionParameters):
+    def __init__(self, maxSpeed):
+        PredictionParameters.__init__(self, 'point set', maxSpeed)
+    
+    def generatePredictedTrajectories(self, obj, instant):
+        predictedTrajectories = []
+        if obj.hasFeatures():
+            features = [f for f in obj.getFeatures() if f.existsAtInstant(instant)]
+            positions = [f.getPositionAtInstant(instant) for f in features]
+            velocities = [f.getVelocityAtInstant(instant) for f in features]
+            probability = 1./float(len(positions))
+            for initialPosition,initialVelocity in zip(positions, velocities):
+                predictedTrajectories.append(PredictedTrajectoryConstant(initialPosition, initialVelocity, probability = probability, maxSpeed = self.maxSpeed))
+            return predictedTrajectories
+        else:
+            print('Object {} has no features'.format(obj.getNum()))
+            return None
+
+        
+class EvasiveActionPredictionParameters(PredictionParameters):
+    def __init__(self, maxSpeed, nPredictedTrajectories, accelerationDistribution, steeringDistribution, useFeatures = False):
+        '''Suggested acceleration distribution may not be symmetric, eg
+        lambda: random.triangular(self.minAcceleration, self.maxAcceleration, 0.)'''
+
+        if useFeatures:
+            name = 'point set evasive action'
+        else:
+            name = 'evasive action'
+        PredictionParameters.__init__(self, name, maxSpeed)
+        self.nPredictedTrajectories = nPredictedTrajectories
+        self.useFeatures = useFeatures
+        self.accelerationDistribution = accelerationDistribution
+        self.steeringDistribution = steeringDistribution
+
+    def __str__(self):
+        return PredictionParameters.__str__(self)+' {0} {1} {2} {3}'.format(self.nPredictedTrajectories, self.minAcceleration, self.maxAcceleration, self.maxSteering)
+
+    def generatePredictedTrajectories(self, obj, instant):
+        predictedTrajectories = []
+        if self.useFeatures and obj.hasFeatures():
+            features = [f for f in obj.getFeatures() if f.existsAtInstant(instant)]
+            positions = [f.getPositionAtInstant(instant) for f in features]
+            velocities = [f.getVelocityAtInstant(instant) for f in features]
+        else:
+            positions = [obj.getPositionAtInstant(instant)]
+            velocities = [obj.getVelocityAtInstant(instant)]
+        probability = 1./float(self.nPredictedTrajectories)
+        for i in range(self.nPredictedTrajectories):
+            for initialPosition,initialVelocity in zip(positions, velocities):
+                predictedTrajectories.append(PredictedTrajectoryConstant(initialPosition, 
+                                                                         initialVelocity, 
+                                                                         moving.NormAngle(self.accelerationDistribution(), 
+                                                                                          self.steeringDistribution()), 
+                                                                         probability, 
+                                                                         self.maxSpeed))
+        return predictedTrajectories
+
+
+class CVDirectPredictionParameters(PredictionParameters):
+    '''Prediction parameters of prediction at constant velocity
+    using direct computation of the intersecting point
+    Warning: the computed time to collision may be higher than timeHorizon (not used)'''
+    
+    def __init__(self):
+        PredictionParameters.__init__(self, 'constant velocity (direct computation)', None)
+
+    def computeCrossingsCollisionsAtInstant(self, currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, *kwargs):
+        collisionPoints = []
+        if computeCZ:
+            crossingZones = []
+        else:
+            crossingZones = None
+
+        p1 = obj1.getPositionAtInstant(currentInstant)
+        p2 = obj2.getPositionAtInstant(currentInstant)
+        if (p1-p2).norm2() <= collisionDistanceThreshold:
+            collisionPoints = [SafetyPoint((p1+p2)*0.5, 1., 0.)]
+        else:
+            v1 = obj1.getVelocityAtInstant(currentInstant)
+            v2 = obj2.getVelocityAtInstant(currentInstant)
+            intersection = moving.intersection(p1, p1+v1, p2, p2+v2)
+
+            if intersection is not None:
+                dp1 = intersection-p1
+                dp2 = intersection-p2
+                dot1 = moving.Point.dot(dp1, v1)
+                dot2 = moving.Point.dot(dp2, v2)
+                if (computeCZ and (dot1 > 0 or dot2 > 0)) or (dot1 > 0 and dot2 > 0): # if the road users are moving towards the intersection or if computing pPET
+                    dist1 = dp1.norm2()
+                    dist2 = dp2.norm2()
+                    s1 = math.copysign(v1.norm2(), dot1)
+                    s2 = math.copysign(v2.norm2(), dot2)
+                    halfCollisionDistanceThreshold = collisionDistanceThreshold/2.
+                    timeInterval1 = moving.TimeInterval(max(0,dist1-halfCollisionDistanceThreshold)/s1, (dist1+halfCollisionDistanceThreshold)/s1)
+                    timeInterval2 = moving.TimeInterval(max(0,dist2-halfCollisionDistanceThreshold)/s2, (dist2+halfCollisionDistanceThreshold)/s2)
+                    collisionTimeInterval = moving.TimeInterval.intersection(timeInterval1, timeInterval2)
+                    
+                    if collisionTimeInterval.empty():
+                        if computeCZ:
+                            crossingZones = [SafetyPoint(intersection, 1., timeInterval1.distance(timeInterval2))]
+                    else:
+                        collisionPoints = [SafetyPoint(intersection, 1., collisionTimeInterval.center())]
+    
+        if debug and intersection is not None:
+            from matplotlib.pyplot import plot, figure, axis, title
+            figure()
+            plot([p1.x, intersection.x], [p1.y, intersection.y], 'r')
+            plot([p2.x, intersection.x], [p2.y, intersection.y], 'b')
+            intersection.plot()            
+            obj1.plot('r')
+            obj2.plot('b')
+            title('instant {0}'.format(currentInstant))
+            axis('equal')
+
+        return collisionPoints, crossingZones
+
+class CVExactPredictionParameters(PredictionParameters):
+    '''Prediction parameters of prediction at constant velocity
+    using direct computation of the intersecting point (solving the equation)
+    Warning: the computed time to collision may be higher than timeHorizon (not used)'''
+    
+    def __init__(self):
+        PredictionParameters.__init__(self, 'constant velocity (direct exact computation)', None)
+
+    def computeCrossingsCollisionsAtInstant(self, currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, *kwargs):
+        'TODO compute pPET'
+        collisionPoints = []
+        crossingZones = []
+
+        p1 = obj1.getPositionAtInstant(currentInstant)
+        p2 = obj2.getPositionAtInstant(currentInstant)
+        v1 = obj1.getVelocityAtInstant(currentInstant)
+        v2 = obj2.getVelocityAtInstant(currentInstant)
+        #intersection = moving.intersection(p1, p1+v1, p2, p2+v2)
+
+        if not moving.Point.parallel(v1, v2):
+            ttc = moving.Point.timeToCollision(p1, p2, v1, v2, collisionDistanceThreshold)
+            if ttc is not None:
+                collisionPoints = [SafetyPoint((p1+(v1*ttc)+p2+(v2*ttc))*0.5, 1., ttc)]
+            else:
+                pass # compute pPET
+
+        return collisionPoints, crossingZones
+
+class PrototypePredictionParameters(PredictionParameters):
+    def __init__(self, prototypes, nPredictedTrajectories, pointSimilarityDistance, minSimilarity, lcssMetric = 'cityblock', minFeatureTime = 10, constantSpeed = False, useFeatures = True):
+        PredictionParameters.__init__(self, 'prototypes', None)
+        self.prototypes = prototypes
+        self.nPredictedTrajectories = nPredictedTrajectories
+        self.lcss = LCSS(metric = lcssMetric, epsilon = pointSimilarityDistance)
+        self.minSimilarity = minSimilarity
+        self.minFeatureTime = minFeatureTime
+        self.constantSpeed = constantSpeed
+        self.useFeatures = useFeatures
+
+    def getLcss(self):
+        return self.lcss
+        
+    def addPredictedTrajectories(self, predictedTrajectories, obj, instant):
+        obj.computeTrajectorySimilarities(self.prototypes, self.lcss)
+        for proto, similarities in zip(self.prototypes, obj.prototypeSimilarities):
+            if similarities[instant-obj.getFirstInstant()] >= self.minSimilarity:
+                initialPosition = obj.getPositionAtInstant(instant)
+                initialVelocity = obj.getVelocityAtInstant(instant)
+                predictedTrajectories.append(PredictedTrajectoryPrototype(initialPosition, initialVelocity, proto.getMovingObject(), constantSpeed = self.constantSpeed, probability = proto.getNMatchings()))
+        
+    def generatePredictedTrajectories(self, obj, instant):
+        predictedTrajectories = []
+        if instant-obj.getFirstInstant()+1 >= self.minFeatureTime:
+            if self.useFeatures and obj.hasFeatures():
+                if not hasattr(obj, 'currentPredictionFeatures'):
+                    obj.currentPredictionFeatures = []
+                else:
+                    obj.currentPredictionFeatures[:] = [f for f in obj.currentPredictionFeatures if f.existsAtInstant(instant)]
+                firstInstants = [(f,f.getFirstInstant()) for f in obj.getFeatures() if f.existsAtInstant(instant) and f not in obj.currentPredictionFeatures]
+                firstInstants.sort(key = lambda t: t[1])
+                for f,t1 in firstInstants[:min(self.nPredictedTrajectories, len(firstInstants), self.nPredictedTrajectories-len(obj.currentPredictionFeatures))]:
+                    obj.currentPredictionFeatures.append(f)
+                for f in obj.currentPredictionFeatures:
+                    self.addPredictedTrajectories(predictedTrajectories, f, instant)
+            else:
+                self.addPredictedTrajectories(predictedTrajectories, obj, instant)
+        return predictedTrajectories
+
+if __name__ == "__main__":
+    import doctest
+    import unittest
+    suite = doctest.DocFileSuite('tests/prediction.txt')
+    #suite = doctest.DocTestSuite()
+    unittest.TextTestRunner().run(suite)
+    #doctest.testmod()
+    #doctest.testfile("example.txt")
+