view python/extrapolation.py @ 270:05c9b0cb8202

updates for drawing
author Nicolas Saunier <nicolas.saunier@polymtl.ca>
date Thu, 02 Aug 2012 05:35:57 -0400
parents a9988971aac8
children
line wrap: on
line source

#! /usr/bin/env python
'''Library for moving object extrapolation hypotheses'''

import moving
import math
import random

class ExtrapolatedTrajectory:
    '''Class for extrapolated trajectories with lazy evaluation
    if the predicted position has not been already computed, compute it

    it should also have a probability'''

    def __init__(self):
        self.probability = 0.
        self.predictedPositions = {}
        self.predictedSpeedOrientations = {}
        self.collisionPoints = {}
        self.crossingZones = {}

    def predictPosition(self, nTimeSteps):
        if nTimeSteps > 0 and not nTimeSteps in self.predictedPositions.keys():
            self.predictPosition(nTimeSteps-1)
            self.predictedPositions[nTimeSteps], self.predictedSpeedOrientations[nTimeSteps] = moving.predictPosition(self.predictedPositions[nTimeSteps-1], self.predictedSpeedOrientations[nTimeSteps-1], self.getControl(), self.maxSpeed)
        return self.predictedPositions[nTimeSteps]

    def getPredictedTrajectory(self):
        return moving.Trajectory.fromPointList(self.predictedPositions.values())

    def getPredictedSpeeds(self):
        return [so.norm for so in self.predictedSpeedOrientations.values()]

    def draw(self, options = '', withOrigin = False, timeStep = 1, **kwargs):
        self.getPredictedTrajectory().draw(options, withOrigin, timeStep, **kwargs)

class ExtrapolatedTrajectoryConstant(ExtrapolatedTrajectory):
    '''Extrapolated trajectory at constant speed or acceleration
    TODO generalize by passing a series of velocities/accelerations'''

    def __init__(self, initialPosition, initialVelocity, control = moving.NormAngle(0,0), probability = 1, maxSpeed = None):
        self.control = control
        self.maxSpeed = maxSpeed
        self.probability = probability
        self.predictedPositions = {0: initialPosition}
        self.predictedSpeedOrientations = {0: moving.NormAngle.fromPoint(initialVelocity)}

    def getControl(self):
        return self.control

class ExtrapolatedTrajectoryNormalAdaptation(ExtrapolatedTrajectory):
    '''Random small adaptation of vehicle control '''
    def __init__(self, initialPosition, initialVelocity, accelerationDistribution, steeringDistribution, probability = 1, maxSpeed = None):
        '''Constructor
        accelerationDistribution and steeringDistribution are distributions 
        that return random numbers drawn from them'''
        self.accelerationDistribution = accelerationDistribution
        self.steeringDistribution = steeringDistribution
        self.maxSpeed = maxSpeed
        self.probability = probability
        self.predictedPositions = {0: initialPosition}
        self.predictedSpeedOrientations = {0: moving.NormAngle.fromPoint(initialVelocity)}

    def getControl(self):
        return moving.NormAngle(self.accelerationDistribution(),self.steeringDistribution())

class ExtrapolationParameters:
    def __init__(self, name, maxSpeed):
        self.name = name
        self.maxSpeed = maxSpeed

    def __str__(self):
        return '{0} {1}'.format(self.name, self.maxSpeed)

class ConstantExtrapolationParameters(ExtrapolationParameters):
    def __init__(self, maxSpeed):
        ExtrapolationParameters.__init__(self, 'constant velocity', maxSpeed)

    def generateExtrapolatedTrajectories(self, obj, instant):
        return [ExtrapolatedTrajectoryConstant(obj.getPositionAtInstant(instant), obj.getVelocityAtInstant(instant), 
                                               maxSpeed = self.maxSpeed)]

class NormalAdaptationExtrapolationParameters(ExtrapolationParameters):
    def __init__(self, maxSpeed, nExtrapolatedTrajectories, maxAcceleration, maxSteering):
        ExtrapolationParameters.__init__(self, 'normal adaptation', maxSpeed)
        self.nExtrapolatedTrajectories = nExtrapolatedTrajectories
        self.maxAcceleration = maxAcceleration
        self.maxSteering = maxSteering
        self.accelerationDistribution = lambda: random.triangular(-self.maxAcceleration, 
                                                                   self.maxAcceleration, 0.)
        self.steeringDistribution = lambda: random.triangular(-self.maxSteering, 
                                                               self.maxSteering, 0.)
        
    def __str__(self):
        return ExtrapolationParameters.__str__(self)+' {0} {1} {2}'.format(self.nExtrapolatedTrajectories, 
                                                                           self.maxAcceleration, 
                                                                           self.maxSteering)

    def generateExtrapolatedTrajectories(self, obj, instant):
        extrapolatedTrajectories = []
        for i in xrange(self.nExtrapolatedTrajectories):
            extrapolatedTrajectories.append(ExtrapolatedTrajectoryNormalAdaptation(obj.getPositionAtInstant(instant), 
                                                                                   obj.getVelocityAtInstant(instant), 
                                                                                   self.accelerationDistribution, 
                                                                                   self.steeringDistribution, 
                                                                                   maxSpeed = self.maxSpeed))
        return extrapolatedTrajectories

class PointSetExtrapolationParameters(ExtrapolationParameters):
    # todo generate several trajectories with normal adaptatoins from each position (feature)
    def __init__(self, maxSpeed):
        ExtrapolationParameters.__init__(self, 'point set', maxSpeed)
    
    def generateExtrapolatedTrajectories(self, obj, instant):
        extrapolatedTrajectories = []        
        features = [f for f in obj.features if f.existsAtInstant(instant)]
        positions = [f.getPositionAtInstant(instant) for f in features]
        velocities = [f.getVelocityAtInstant(instant) for f in features]
        for initialPosition,initialVelocity in zip(positions, velocities):
            extrapolatedTrajectories.append(ExtrapolatedTrajectoryConstant(initialPosition, initialVelocity, 
                                                                           maxSpeed = self.maxSpeed))
        return extrapolatedTrajectories

class EvasiveActionExtrapolationParameters(ExtrapolationParameters):
    def __init__(self, maxSpeed, nExtrapolatedTrajectories, minAcceleration, maxAcceleration, maxSteering, useFeatures = False):
        if useFeatures:
            name = 'point set evasive action'
        else:
            name = 'evasive action'
        ExtrapolationParameters.__init__(self, name, maxSpeed)
        self.nExtrapolatedTrajectories = nExtrapolatedTrajectories
        self.minAcceleration = minAcceleration
        self.maxAcceleration = maxAcceleration
        self.maxSteering = maxSteering
        self.useFeatures = useFeatures
        self.accelerationDistribution = lambda: random.triangular(self.minAcceleration, 
                                                                  self.maxAcceleration, 0.)
        self.steeringDistribution = lambda: random.triangular(-self.maxSteering, 
                                                               self.maxSteering, 0.)

    def __str__(self):
        return ExtrapolationParameters.__str__(self)+' {0} {1} {2} {3}'.format(self.nExtrapolatedTrajectories, self.minAcceleration, self.maxAcceleration, self.maxSteering)

    def generateExtrapolatedTrajectories(self, obj, instant):
        extrapolatedTrajectories = []
        if self.useFeatures:
            features = [f for f in obj.features if f.existsAtInstant(instant)]
            positions = [f.getPositionAtInstant(instant) for f in features]
            velocities = [f.getVelocityAtInstant(instant) for f in features]
        else:
            positions = [obj.getPositionAtInstant(instant)]
            velocities = [obj.getVelocityAtInstant(instant)]
        for i in xrange(self.nExtrapolatedTrajectories):
            for initialPosition,initialVelocity in zip(positions, velocities):
                extrapolatedTrajectories.append(ExtrapolatedTrajectoryConstant(initialPosition, 
                                                                               initialVelocity, 
                                                                               moving.NormAngle(self.accelerationDistribution(), 
                                                                                                self.steeringDistribution()), 
                                                                               maxSpeed = self.maxSpeed))
        return extrapolatedTrajectories

class SafetyPoint(moving.Point):
    '''Can represent a collision point or crossing zone 
    with respective safety indicator, TTC or pPET'''
    def __init__(self, p, probability = 1., indicator = -1):
        self.x = p.x
        self.y = p.y
        self.probability = probability
        self.indicator = indicator

    @staticmethod
    def save(out, points, objNum1, objNum2, instant):
        for p in points:
            out.write('{0} {1} {2} {3} {4} {5} {6}\n'.format(objNum1, objNum2, instant, p.x, p.y, p.probability, p.indicator))

def computeExpectedIndicator(points):
    from numpy import sum
    return sum([p.indicator*p.probability for p in points])/sum([p.probability for p in points])

def computeCollisionTime(extrapolatedTrajectory1, extrapolatedTrajectory2, collisionDistanceThreshold, timeHorizon):
    t = 1
    p1 = extrapolatedTrajectory1.predictPosition(t)
    p2 = extrapolatedTrajectory2.predictPosition(t)
    while t <= timeHorizon and (p1-p2).norm2() > collisionDistanceThreshold:
        p1 = extrapolatedTrajectory1.predictPosition(t)
        p2 = extrapolatedTrajectory2.predictPosition(t)
        t += 1
    return t, p1, p2

def computeCrossingsCollisionsAtInstant(i, obj1, obj2, extrapolationParameters, collisionDistanceThreshold, timeHorizon, debug = False):
    '''returns the lists of collision points and crossing zones
    
    Check: Extrapolating all the points together, as if they represent the whole vehicle'''
    extrapolatedTrajectories1 = extrapolationParameters.generateExtrapolatedTrajectories(obj1, i)
    extrapolatedTrajectories2 = extrapolationParameters.generateExtrapolatedTrajectories(obj2, i)

    collisionPoints = []
    crossingZones = []
    for et1 in extrapolatedTrajectories1:
        for et2 in extrapolatedTrajectories2:
            t, p1, p2 = computeCollisionTime(et1, et2, collisionDistanceThreshold, timeHorizon)
            
            if t <= timeHorizon:
                collisionPoints.append(SafetyPoint((p1+p2).multiply(0.5), et1.probability*et2.probability, t))
            else: # check if there is a crossing zone
                # TODO? zone should be around the points at which the traj are the closest
                # look for CZ at different times, otherwise it would be a collision
                # an approximation would be to look for close points at different times, ie the complementary of collision points
                cz = None
                t1 = 0
                while not cz and t1 < timeHorizon: # t1 <= timeHorizon-1
                    t2 = 0
                    while not cz and t2 < timeHorizon:
                        #if (et1.predictPosition(t1)-et2.predictPosition(t2)).norm2() < collisionDistanceThreshold:
                        #    cz = (et1.predictPosition(t1)+et2.predictPosition(t2)).multiply(0.5)
                        cz = moving.segmentIntersection(et1.predictPosition(t1), et1.predictPosition(t1+1), et2.predictPosition(t2), et2.predictPosition(t2+1))
                        if cz:
                            crossingZones.append(SafetyPoint(cz, et1.probability*et2.probability, abs(t1-t2)))
                        t2 += 1
                    t1 += 1                        

    if debug:
        from matplotlib.pyplot import figure, axis, title
        figure()
        for et in extrapolatedTrajectories1:
            et.predictPosition(timeHorizon)
            et.draw('rx')

        for et in extrapolatedTrajectories2:
            et.predictPosition(timeHorizon)
            et.draw('bx')
        obj1.draw('r')
        obj2.draw('b')
        title('instant {0}'.format(i))
        axis('equal')

    return collisionPoints, crossingZones
    
def computeCrossingsCollisions(obj1, obj2, extrapolationParameters, collisionDistanceThreshold, timeHorizon, outCP, outCZ, debug = False, timeInterval = None):
    collisionPoints={}
    crossingZones={}
    if timeInterval:
        commonTimeInterval = timeInterval
    else:
        commonTimeInterval = obj1.commonTimeInterval(obj2)
    for i in list(commonTimeInterval)[:-1]: # do not look at the 1 last position/velocities, often with errors
        print(obj1.num, obj2.num, i)
        collisionPoints[i], crossingZones[i] = computeCrossingsCollisionsAtInstant(i, obj1, obj2, extrapolationParameters, collisionDistanceThreshold, timeHorizon, debug)
        SafetyPoint.save(outCP, collisionPoints[i], obj1.num, obj2.num, i)
        SafetyPoint.save(outCZ, crossingZones[i], obj1.num, obj2.num, i)

    return collisionPoints, crossingZones
 
def computeCollisionProbability(obj1, obj2, extrapolationParameters, collisionDistanceThreshold, timeHorizon, out, debug = False, timeInterval = None):
    collisionProbabilities = {}
    if timeInterval:
        commonTimeInterval = timeInterval
    else:
        commonTimeInterval = obj1.commonTimeInterval(obj2)
    for i in list(commonTimeInterval)[:-1]:
        nCollisions = 0
        print(obj1.num, obj2.num, i)
        extrapolatedTrajectories1 = extrapolationParameters.generateExtrapolatedTrajectories(obj1, i)
        extrapolatedTrajectories2 = extrapolationParameters.generateExtrapolatedTrajectories(obj2, i)
        for et1 in extrapolatedTrajectories1:
            for et2 in extrapolatedTrajectories2:
                t, p1, p2 = computeCollisionTime(et1, et2, collisionDistanceThreshold, timeHorizon)
                if t <= timeHorizon:
                    nCollisions += 1
        # take into account probabilities ??
        nSamples = float(len(extrapolatedTrajectories1)*len(extrapolatedTrajectories2))
        collisionProbabilities[i] = float(nCollisions)/nSamples
        out.write('{0} {1} {2} {3} {4}\n'.format(obj1.num, obj2.num, nSamples, i, collisionProbabilities[i]))

        if debug:
            from matplotlib.pyplot import figure, axis, title
            figure()
            for et in extrapolatedTrajectories1:
                et.predictPosition(timeHorizon)
                et.draw('rx')
                
            for et in extrapolatedTrajectories2:
                et.predictPosition(timeHorizon)
                et.draw('bx')
            obj1.draw('r')
            obj2.draw('b')
            title('instant {0}'.format(i))
            axis('equal')

    return collisionProbabilities


if __name__ == "__main__":
    import doctest
    import unittest
    suite = doctest.DocFileSuite('tests/extrapolation.txt')
    #suite = doctest.DocTestSuite()
    unittest.TextTestRunner().run(suite)
    #doctest.testmod()
    #doctest.testfile("example.txt")