view python/prediction.py @ 398:3399bd48cb40

Ajout d'une méthode pour obtenir le nombre de FPS Méthode de capture des trames vidéos plus résistante aux erreur Utilisation d'un dictionnaire pour les fichier de configuration afin de garder le nom des sections
author Jean-Philippe Jodoin <jpjodoin@gmail.com>
date Mon, 29 Jul 2013 13:46:07 -0400
parents 91679eb2ff2c
children 55b424d98b68
line wrap: on
line source

#! /usr/bin/env python
'''Library for motion prediction methods'''

import moving
import math
import random

class PredictedTrajectory:
    '''Class for predicted trajectories with lazy evaluation
    if the predicted position has not been already computed, compute it

    it should also have a probability'''

    def __init__(self):
        self.probability = 0.
        self.predictedPositions = {}
        self.predictedSpeedOrientations = {}
        #self.collisionPoints = {}
        #self.crossingZones = {}

    def predictPosition(self, nTimeSteps):
        if nTimeSteps > 0 and not nTimeSteps in self.predictedPositions.keys():
            self.predictPosition(nTimeSteps-1)
            self.predictedPositions[nTimeSteps], self.predictedSpeedOrientations[nTimeSteps] = moving.predictPosition(self.predictedPositions[nTimeSteps-1], self.predictedSpeedOrientations[nTimeSteps-1], self.getControl(), self.maxSpeed)
        return self.predictedPositions[nTimeSteps]

    def getPredictedTrajectory(self):
        return moving.Trajectory.fromPointList(self.predictedPositions.values())

    def getPredictedSpeeds(self):
        return [so.norm for so in self.predictedSpeedOrientations.values()]

    def draw(self, options = '', withOrigin = False, timeStep = 1, **kwargs):
        self.getPredictedTrajectory().draw(options, withOrigin, timeStep, **kwargs)

class PredictedTrajectoryConstant(PredictedTrajectory):
    '''Predicted trajectory at constant speed or acceleration
    TODO generalize by passing a series of velocities/accelerations'''

    def __init__(self, initialPosition, initialVelocity, control = moving.NormAngle(0,0), probability = 1., maxSpeed = None):
        self.control = control
        self.maxSpeed = maxSpeed
        self.probability = probability
        self.predictedPositions = {0: initialPosition}
        self.predictedSpeedOrientations = {0: moving.NormAngle.fromPoint(initialVelocity)}

    def getControl(self):
        return self.control

class PredictedTrajectoryNormalAdaptation(PredictedTrajectory):
    '''Random small adaptation of vehicle control '''
    def __init__(self, initialPosition, initialVelocity, accelerationDistribution, steeringDistribution, probability = 1., maxSpeed = None):
        '''Constructor
        accelerationDistribution and steeringDistribution are distributions 
        that return random numbers drawn from them'''
        self.accelerationDistribution = accelerationDistribution
        self.steeringDistribution = steeringDistribution
        self.maxSpeed = maxSpeed
        self.probability = probability
        self.predictedPositions = {0: initialPosition}
        self.predictedSpeedOrientations = {0: moving.NormAngle.fromPoint(initialVelocity)}

    def getControl(self):
        return moving.NormAngle(self.accelerationDistribution(),self.steeringDistribution())

class SafetyPoint(moving.Point):
    '''Can represent a collision point or crossing zone 
    with respective safety indicator, TTC or pPET'''
    def __init__(self, p, probability = 1., indicator = -1):
        self.x = p.x
        self.y = p.y
        self.probability = probability
        self.indicator = indicator

    def __str__(self):
        return '{0} {1} {2} {3}'.format(self.x, self.y, self.probability, self.indicator)

    @staticmethod
    def save(out, points, predictionInstant, objNum1, objNum2):
        for p in points:
            out.write('{0} {1} {2} {3}\n'.format(objNum1, objNum2, predictionInstant, p))

    @staticmethod
    def computeExpectedIndicator(points):
        from numpy import sum
        return sum([p.indicator*p.probability for p in points])/sum([p.probability for p in points])

def computeCollisionTime(predictedTrajectory1, predictedTrajectory2, collisionDistanceThreshold, timeHorizon):
    '''Computes the first instant at which two predicted trajectories are within some distance threshold'''
    t = 1
    p1 = predictedTrajectory1.predictPosition(t)
    p2 = predictedTrajectory2.predictPosition(t)
    while t <= timeHorizon and (p1-p2).norm2() > collisionDistanceThreshold:
        p1 = predictedTrajectory1.predictPosition(t)
        p2 = predictedTrajectory2.predictPosition(t)
        t += 1
    return t, p1, p2

class PredictionParameters:
    def __init__(self, name, maxSpeed):
        self.name = name
        self.maxSpeed = maxSpeed

    def __str__(self):
        return '{0} {1}'.format(self.name, self.maxSpeed)

    def generatePredictedTrajectories(self, obj, instant):
        return []

    def computeCrossingsCollisionsAtInstant(self, currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False):
        '''returns the lists of collision points and crossing zones'''
        predictedTrajectories1 = self.generatePredictedTrajectories(obj1, currentInstant)
        predictedTrajectories2 = self.generatePredictedTrajectories(obj2, currentInstant)

        collisionPoints = []
        crossingZones = []
        for et1 in predictedTrajectories1:
            for et2 in predictedTrajectories2:
                t, p1, p2 = computeCollisionTime(et1, et2, collisionDistanceThreshold, timeHorizon)

                if t <= timeHorizon:
                    collisionPoints.append(SafetyPoint((p1+p2).multiply(0.5), et1.probability*et2.probability, t))
                elif computeCZ: # check if there is a crossing zone
                    # TODO? zone should be around the points at which the traj are the closest
                    # look for CZ at different times, otherwise it would be a collision
                    # an approximation would be to look for close points at different times, ie the complementary of collision points
                    cz = None
                    t1 = 0
                    while not cz and t1 < timeHorizon: # t1 <= timeHorizon-1
                        t2 = 0
                        while not cz and t2 < timeHorizon:
                            #if (et1.predictPosition(t1)-et2.predictPosition(t2)).norm2() < collisionDistanceThreshold:
                            #    cz = (et1.predictPosition(t1)+et2.predictPosition(t2)).multiply(0.5)
                            cz = moving.segmentIntersection(et1.predictPosition(t1), et1.predictPosition(t1+1), et2.predictPosition(t2), et2.predictPosition(t2+1))
                            if cz:
                                crossingZones.append(SafetyPoint(cz, et1.probability*et2.probability, abs(t1-t2)))
                            t2 += 1
                        t1 += 1                        

        if debug:
            from matplotlib.pyplot import figure, axis, title
            figure()
            for et in predictedTrajectories1:
                et.predictPosition(timeHorizon)
                et.draw('rx')

            for et in predictedTrajectories2:
                et.predictPosition(timeHorizon)
                et.draw('bx')
            obj1.draw('r')
            obj2.draw('b')
            title('instant {0}'.format(currentInstant))
            axis('equal')

        return collisionPoints, crossingZones

    def computeCrossingsCollisions(self, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None):
        '''Computes all crossing and collision points at each common instant for two road users. '''
        collisionPoints={}
        crossingZones={}
        if timeInterval:
            commonTimeInterval = timeInterval
        else:
            commonTimeInterval = obj1.commonTimeInterval(obj2)
        for i in list(commonTimeInterval)[:-1]: # do not look at the 1 last position/velocities, often with errors
            collisionPoints[i], crossingZones[i] = self.computeCrossingsCollisionsAtInstant(i, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ, debug)

        return collisionPoints, crossingZones

    def computeCollisionProbability(self, obj1, obj2, collisionDistanceThreshold, timeHorizon, debug = False, timeInterval = None):
        '''Computes only collision probabilities
        Returns for each instant the collision probability and number of samples drawn'''
        collisionProbabilities = {}
        if timeInterval:
            commonTimeInterval = timeInterval
        else:
            commonTimeInterval = obj1.commonTimeInterval(obj2)
        for i in list(commonTimeInterval)[:-1]:
            nCollisions = 0
            print(obj1.num, obj2.num, i)
            predictedTrajectories1 = self.generatePredictedTrajectories(obj1, i)
            predictedTrajectories2 = self.generatePredictedTrajectories(obj2, i)
            for et1 in predictedTrajectories1:
                for et2 in predictedTrajectories2:
                    t, p1, p2 = computeCollisionTime(et1, et2, collisionDistanceThreshold, timeHorizon)
                    if t <= timeHorizon:
                        nCollisions += 1
            # take into account probabilities ??
            nSamples = float(len(predictedTrajectories1)*len(predictedTrajectories2))
            collisionProbabilities[i] = [nSamples, float(nCollisions)/nSamples]

            if debug:
                from matplotlib.pyplot import figure, axis, title
                figure()
                for et in predictedTrajectories1:
                    et.predictPosition(timeHorizon)
                    et.draw('rx')

                for et in predictedTrajectories2:
                    et.predictPosition(timeHorizon)
                    et.draw('bx')
                obj1.draw('r')
                obj2.draw('b')
                title('instant {0}'.format(i))
                axis('equal')

        return collisionProbabilities

class ConstantPredictionParameters(PredictionParameters):
    def __init__(self, maxSpeed):
        PredictionParameters.__init__(self, 'constant velocity', maxSpeed)

    def generatePredictedTrajectories(self, obj, instant):
        return [PredictedTrajectoryConstant(obj.getPositionAtInstant(instant), obj.getVelocityAtInstant(instant), 
                                               maxSpeed = self.maxSpeed)]

class NormalAdaptationPredictionParameters(PredictionParameters):
    def __init__(self, maxSpeed, nPredictedTrajectories, maxAcceleration, maxSteering, useFeatures = False):
        if useFeatures:
            name = 'point set normal adaptation'
        else:
            name = 'normal adaptation'
        PredictionParameters.__init__(self, name, maxSpeed)
        self.nPredictedTrajectories = nPredictedTrajectories
        self.maxAcceleration = maxAcceleration
        self.maxSteering = maxSteering
        self.useFeatures = useFeatures
        self.accelerationDistribution = lambda: random.triangular(-self.maxAcceleration, 
                                                                   self.maxAcceleration, 0.)
        self.steeringDistribution = lambda: random.triangular(-self.maxSteering, 
                                                               self.maxSteering, 0.)
        
    def __str__(self):
        return PredictionParameters.__str__(self)+' {0} {1} {2}'.format(self.nPredictedTrajectories, 
                                                                        self.maxAcceleration, 
                                                                        self.maxSteering)

    def generatePredictedTrajectories(self, obj, instant):
        predictedTrajectories = []
        if self.useFeatures:
            features = [f for f in obj.features if f.existsAtInstant(instant)]
            positions = [f.getPositionAtInstant(instant) for f in features]
            velocities = [f.getVelocityAtInstant(instant) for f in features]
        else:
            positions = [obj.getPositionAtInstant(instant)]
            velocities = [obj.getVelocityAtInstant(instant)]
        for i in xrange(self.nPredictedTrajectories):
            for initialPosition,initialVelocity in zip(positions, velocities):
                predictedTrajectories.append(PredictedTrajectoryNormalAdaptation(initialPosition, 
                                                                                 initialVelocity, 
                                                                                 self.accelerationDistribution, 
                                                                                 self.steeringDistribution, 
                                                                                 maxSpeed = self.maxSpeed))
        return predictedTrajectories

class PointSetPredictionParameters(PredictionParameters):
    # todo generate several trajectories with normal adaptatoins from each position (feature)
    def __init__(self, nPredictedTrajectories, maxSpeed):
        PredictionParameters.__init__(self, 'point set', maxSpeed)
        self.nPredictedTrajectories = nPredictedTrajectories
    
    def generatePredictedTrajectories(self, obj, instant):
        predictedTrajectories = []        
        features = [f for f in obj.features if f.existsAtInstant(instant)]
        positions = [f.getPositionAtInstant(instant) for f in features]
        velocities = [f.getVelocityAtInstant(instant) for f in features]
        for i in xrange(self.nPredictedTrajectories):
            for initialPosition,initialVelocity in zip(positions, velocities):
                predictedTrajectories.append(PredictedTrajectoryConstant(initialPosition, initialVelocity, 
                                                                         maxSpeed = self.maxSpeed))
        return predictedTrajectories

class EvasiveActionPredictionParameters(PredictionParameters):
    def __init__(self, maxSpeed, nPredictedTrajectories, minAcceleration, maxAcceleration, maxSteering, useFeatures = False):
        if useFeatures:
            name = 'point set evasive action'
        else:
            name = 'evasive action'
        PredictionParameters.__init__(self, name, maxSpeed)
        self.nPredictedTrajectories = nPredictedTrajectories
        self.minAcceleration = minAcceleration
        self.maxAcceleration = maxAcceleration
        self.maxSteering = maxSteering
        self.useFeatures = useFeatures
        self.accelerationDistribution = lambda: random.triangular(self.minAcceleration, 
                                                                  self.maxAcceleration, 0.)
        self.steeringDistribution = lambda: random.triangular(-self.maxSteering, 
                                                               self.maxSteering, 0.)

    def __str__(self):
        return PredictionParameters.__str__(self)+' {0} {1} {2} {3}'.format(self.nPredictedTrajectories, self.minAcceleration, self.maxAcceleration, self.maxSteering)

    def generatePredictedTrajectories(self, obj, instant):
        predictedTrajectories = []
        if self.useFeatures:
            features = [f for f in obj.features if f.existsAtInstant(instant)]
            positions = [f.getPositionAtInstant(instant) for f in features]
            velocities = [f.getVelocityAtInstant(instant) for f in features]
        else:
            positions = [obj.getPositionAtInstant(instant)]
            velocities = [obj.getVelocityAtInstant(instant)]
        for i in xrange(self.nPredictedTrajectories):
            for initialPosition,initialVelocity in zip(positions, velocities):
                predictedTrajectories.append(PredictedTrajectoryConstant(initialPosition, 
                                                                         initialVelocity, 
                                                                         moving.NormAngle(self.accelerationDistribution(), 
                                                                                          self.steeringDistribution()), 
                                                                         maxSpeed = self.maxSpeed))
        return predictedTrajectories


class CVDirectPredictionParameters(PredictionParameters):
    '''Prediction parameters of prediction at constant velocity
    using direct computation of the intersecting point'''
    
    def __init__(self):
        PredictionParameters.__init__(self, 'constant velocity (direct computation)', None)

    def computeCrossingsCollisionsAtInstant(self, currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False):
        collisionPoints = []
        crossingZones = []

        p1 = obj1.getPositionAtInstant(currentInstant)
        p2 = obj2.getPositionAtInstant(currentInstant)
        if (p1-p2).norm2() <= collisionDistanceThreshold:
            collisionPoints = [SafetyPoint((p1+p1).multiply(0.5), 1., 0.)]
        else:
            v1 = obj1.getVelocityAtInstant(currentInstant)
            v2 = obj2.getVelocityAtInstant(currentInstant)
            intersection = moving.intersection(p1, p2, v1, v2)

            if intersection != None:
                dp1 = intersection-p1
                dp2 = intersection-p2
                if moving.Point.dot(dp1, v1) > 0 and moving.Point.dot(dp2, v2) > 0: # if the road users are moving towards the intersection
                    dist1 = dp1.norm2()
                    dist2 = dp2.norm2()
                    s1 = v1.norm2()
                    s2 = v2.norm2()
                    halfCollisionDistanceThreshold = collisionDistanceThreshold/2.
                    timeInterval1 = moving.TimeInterval(max(0,dist1-halfCollisionDistanceThreshold)/s1, (dist1+halfCollisionDistanceThreshold)/s1)
                    timeInterval2 = moving.TimeInterval(max(0,dist2-halfCollisionDistanceThreshold)/s2, (dist2+halfCollisionDistanceThreshold)/s2)
                    collisionTimeInterval = moving.TimeInterval.intersection(timeInterval1, timeInterval2)
                    if computeCZ and collisionTimeInterval.empty():
                        crossingZones = [SafetyPoint(intersection, 1., timeInterval1.distance(timeInterval2))]
                    else:
                        collisionPoints = [SafetyPoint(intersection, 1., collisionTimeInterval.center())]
    
        if debug and intersection!= None:
            from matplotlib.pyplot import plot, figure, axis, title
            figure()
            plot([p1.x, intersection.x], [p1.y, intersection.y], 'r')
            plot([p2.x, intersection.x], [p2.y, intersection.y], 'b')
            intersection.draw()            
            obj1.draw('r')
            obj2.draw('b')
            title('instant {0}'.format(currentInstant))
            axis('equal')

        return collisionPoints, crossingZones




####
# Other Methods
####





if __name__ == "__main__":
    import doctest
    import unittest
    suite = doctest.DocFileSuite('tests/prediction.txt')
    #suite = doctest.DocTestSuite()
    unittest.TextTestRunner().run(suite)
    #doctest.testmod()
    #doctest.testfile("example.txt")