view python/extrapolation.py @ 260:36cb40c51a5e

modified the organization of the code
author Nicolas Saunier <nicolas.saunier@polymtl.ca>
date Tue, 24 Jul 2012 18:07:23 -0400
parents 8ab76b95ee72
children a04a6af4b810
line wrap: on
line source

#! /usr/bin/env python
'''Library for moving object extrapolation hypotheses'''

import moving
import math

class ExtrapolatedTrajectory:
    '''Class for extrapolated trajectories with lazy evaluation
    if the predicted position has not been already computed, compute it

    it should also have a probability'''

    def __init__(self):
        self.probability = 0.
        self.predictedPositions = {}
        self.predictedSpeedOrientations = {}
        self.collisionPoints = {}
        self.crossingZones = {}

    def predictPosition(self, nTimeSteps):
        if nTimeSteps > 0 and not nTimeSteps in self.predictedPositions.keys():
            self.predictPosition(nTimeSteps-1)
            self.predictedPositions[nTimeSteps], self.predictedSpeedOrientations[nTimeSteps] = moving.predictPosition(self.predictedPositions[nTimeSteps-1], self.predictedSpeedOrientations[nTimeSteps-1], self.getControl(), self.maxSpeed)
        return self.predictedPositions[nTimeSteps]

    def getPredictedTrajectory(self):
        return moving.Trajectory.fromPointList(self.predictedPositions.values())

    def getPredictedSpeeds(self):
        return [so.norm for so in self.predictedSpeedOrientations.values()]

    def draw(self, options = '', withOrigin = False, **kwargs):
        self.getPredictedTrajectory().draw(options, withOrigin, **kwargs)

class ExtrapolatedTrajectoryConstant(ExtrapolatedTrajectory):
    '''Extrapolated trajectory at constant speed or acceleration
    TODO generalize by passing a series of velocities/accelerations'''

    def __init__(self, initialPosition, initialVelocity, control = moving.NormAngle(0,0), probability = 1, maxSpeed = None):
        self.control = control
        self.maxSpeed = maxSpeed
        self.probability = probability
        self.predictedPositions = {0: initialPosition}
        self.predictedSpeedOrientations = {0: moving.NormAngle.fromPoint(initialVelocity)}

    def getControl(self):
        return self.control

class ExtrapolatedTrajectoryNormalAdaptation(ExtrapolatedTrajectory):
    '''Random small adaptation of vehicle control '''
    def __init__(self, initialPosition, initialVelocity, accelerationDistribution, steeringDistribution, probability = 1, maxSpeed = None):
        '''Constructor
        accelerationDistribution and steeringDistribution are distributions 
        that return random numbers drawn from them'''
        self.accelerationDistribution = accelerationDistribution
        self.steeringDistribution = steeringDistribution
        self.maxSpeed = maxSpeed
        self.probability = probability
        self.predictedPositions = {0: initialPosition}
        self.predictedSpeedOrientations = {0: moving.NormAngle.fromPoint(initialVelocity)}

    def getControl(self):
        return moving.NormAngle(self.accelerationDistribution(),self.steeringDistribution())


class ExtrapolationParameters:
    def __init__(self, name):
        self.name = name

def createExtrapolatedTrajectories(extrapolationParameters, initialPosition, initialVelocity):
    '''extrapolationParameters specific to each method (in name field)  '''
    if extrapolationParameters.name == 'constant velocity':
        return [ExtrapolatedTrajectoryConstant(initialPosition, initialVelocity, maxSpeed = extrapolationParameters.maxSpeed)]
    elif extrapolationParameters.name == 'normal adaptation':
        # generate several and with the right distribution
        extrapolatedTrajectories = []
        for i in xrange(extrapolationParameters.nExtrapolatedTrajectories):
            extrapolatedTrajectories.append(ExtrapolatedTrajectoryNormalAdaptation(initialPosition, initialVelocity, 
                                                                                   extrapolationParameters.accelerationDistribution, 
                                                                                   extrapolationParameters.steeringDistribution, 
                                                                                   maxSpeed = extrapolationParameters.maxSpeed))
        return extrapolatedTrajectories
    else:
        print('Unknown extrapolation hypothesis')
        return []

class SafetyPoint(moving.Point):
    '''Can represent a collision point or crossing zone 
    with respective safety indicator, TTC or pPET'''
    def __init__(self, p, probability = 1., indicator = -1):
        self.x = p.x
        self.y = p.y
        self.probability = probability
        self.indicator = indicator

def saveSafetyPoints(out, objNum1, objNum2, instant, points):
    for p in points:
        out.write('{0} {1} {2} {3} {4} {5} {6}\n'.format(objNum1, objNum2, instant, p.x, p.y, p.probability, p.indicator))

def computeExpectedIndicator(points):
    from numpy import sum
    return sum([p.indicator*p.probability for p in points])/sum([p.probability for p in points])

def computeCrossingsCollisionsAtInstant(extrapolatedTrajectories1, extrapolatedTrajectories2, collisionDistanceThreshold, timeHorizon):
    '''returns the lists of collision points and crossing zones '''
    collisionPoints = []
    crossingZones = []
    for et1 in extrapolatedTrajectories1:
        for et2 in extrapolatedTrajectories2:
            t = 1
            p1 = et1.predictPosition(t)
            p2 = et2.predictPosition(t)
            while t <= timeHorizon and (p1-p2).norm2() > collisionDistanceThreshold:
                p1 = et1.predictPosition(t)
                p2 = et2.predictPosition(t)
                t += 1

            if t <= timeHorizon:
                collisionPoints.append(SafetyPoint((p1+p2).multiply(0.5), et1.probability*et2.probability, t))
            else: # check if there is a crossing zone
                # TODO? zone should be around the points at which the traj are the closest
                # look for CZ at different times, otherwise it would be a collision
                # an approximation would be to look for close points at different times, ie the complementary of collision points
                cz = None
                t1 = 0
                while not cz and t1 < timeHorizon: # t1 <= timeHorizon-1
                    t2 = 0
                    while not cz and t2 < timeHorizon:
                        #if (et1.predictPosition(t1)-et2.predictPosition(t2)).norm2() < collisionDistanceThreshold:
                        #    cz = (et1.predictPosition(t1)+et2.predictPosition(t2)).multiply(0.5)
                        cz = moving.segmentIntersection(et1.predictPosition(t1), et1.predictPosition(t1+1), et2.predictPosition(t2), et2.predictPosition(t2+1))
                        if cz:
                            crossingZones.append(SafetyPoint(cz, et1.probability*et2.probability, abs(t1-t2)))
                        t2 += 1
                    t1 += 1                        
    return collisionPoints, crossingZones
    
def computeCrossingsCollisions(obj1, obj2, extrapolationParameters, collisionDistanceThreshold, timeHorizon, outCP, outCZ, debug = False):
    collisionPoints={}
    crossingZones={}
    #TTCs = {}
    #pPETs = {}
    commonTimeInterval = obj1.commonTimeInterval(obj2)
    for i in list(commonTimeInterval)[:-1]: # do not look at the 1 last position/velocities, often with errors
        extrapolatedTrajectories1 = createExtrapolatedTrajectories(extrapolationParameters, obj1.getPositionAtInstant(i), obj1.getVelocityAtInstant(i))
        extrapolatedTrajectories2 = createExtrapolatedTrajectories(extrapolationParameters, obj2.getPositionAtInstant(i), obj2.getVelocityAtInstant(i))

        collisionPoints[i], crossingZones[i] = computeCrossingsCollisionsAtInstant(extrapolatedTrajectories1, extrapolatedTrajectories2, collisionDistanceThreshold, timeHorizon)
        # if len(collisionPoints[i]) > 0:
        #     TTCs[i] = extrapolation.computeExpectedIndicator(collisionPoints[i])
        # if len(crossingZones[i]) > 0:
        #     pPETs[i] = extrapolation.computeExpectedIndicator(crossingZones[i])
        saveSafetyPoints(outCP, obj1.num, obj2.num, i, collisionPoints[i])
        saveSafetyPoints(outCZ, obj1.num, obj2.num, i, crossingZones[i])

        if debug:
            from matplotlib.pyplot import figure, axis
            figure()
            obj1.draw('r')
            obj2.draw('b')
            for et in extrapolatedTrajectories1:
                et.predictPosition(timeHorizon)
                et.draw('rx')
            for et in extrapolatedTrajectories2:
                et.predictPosition(timeHorizon)
                et.draw('bx')
            axis('equal')
                

        # probability of successful evasive action = 1-P(Collision) using larger control values

    return collisionPoints, crossingZones

###############

# Default values: to remove because we cannot tweak that from a script where the value may be different
FPS= 25  # No. of frame per second (FPS) 
vLimit= 25/FPS    #assume limit speed is 90km/hr = 25 m/sec
deltaT= FPS*5    # extrapolatation time Horizon = 5 second    
            
def motion (position, velocity, acceleration):
    ''' extrapolation hypothesis: constant acceleration'''
    from math import atan2,cos,sin
    vInit= velocity
    vInitial= velocity.norm2()
    theta= atan2(velocity.y,velocity.x)
    vFinal= vInitial+acceleration
    
    if acceleration<= 0:
        v= max(0,vFinal)
        velocity= moving.Point(v* cos(theta),v* sin(theta))
        position= position+ (velocity+vInit). multiply(0.5)
    else:
        v= min(vLimit,vFinal)
        velocity= moving.Point(v* cos(theta),v* sin(theta))
        position= position+ (velocity+vInit). multiply(0.5)
    return(position,velocity)    

def motionPET (position, velocity, acceleration, deltaT):
    ''' extrapolation hypothesis: constant acceleration for calculating pPET '''
    from math import atan2,cos,sin,fabs
    vInit= velocity
    vInitial= velocity.norm2()
    theta= atan2(velocity.y,velocity.x)
    vFinal= vInitial+acceleration * deltaT
    if acceleration< 0:
        if vFinal> 0:    
            velocity= moving.Point(vFinal* cos(theta),vFinal* sin(theta))
            position= position+ (vInit+ velocity). multiply(0.5*deltaT)
        else:
            T= fabs(vInitial/acceleration)
            position= position + vInit. multiply(0.5*T)
    elif acceleration> 0 :
        if vFinal<= vLimit:
            velocity= moving.Point(vFinal* cos(theta),vFinal* sin(theta))
            position= position+ (vInit+ velocity). multiply(0.5*deltaT)
        else:
            time1= fabs((vLimit-vInitial)/acceleration)
            velocity= moving.Point(vLimit* cos(theta),vLimit* sin(theta))
            position= (position+ (velocity+vInit). multiply(0.5*time1)) + (velocity.multiply (deltaT-time1))
    elif acceleration == 0:
        position= position + velocity. multiply(deltaT)
    
    return position

def timePET (position, velocity, acceleration, intersectedPoint ):
    ''' extrapolation hypothesis: constant acceleration for calculating pPET '''
    from math import atan2,cos,sin,fabs
    vInit= velocity
    vInitial= velocity.norm2()
    theta= atan2(velocity.y,velocity.x)
    vFinal= vInitial+acceleration * deltaT
    if acceleration< 0:
        if vFinal> 0:    
            velocity= moving.Point(vFinal* cos(theta),vFinal* sin(theta))
            time= fabs((intersectedPoint.x-position.x)/(0.5*(vInit.x+ velocity.x)))
        else:
            time= fabs((intersectedPoint.x-position.x)/(0.5*(vInit.x)))
    elif acceleration> 0 :
        if vFinal<= vLimit:
            velocity= moving.Point(vFinal* cos(theta),vFinal* sin(theta))
            time= fabs((intersectedPoint.x-position.x)/(0.5*(vInit.x+ velocity.x)))
        else:
            time1= fabs((vLimit-vInitial)/acceleration) 
            velocity= moving.Point(vLimit* cos(theta),vLimit* sin(theta))
            time2= fabs((intersectedPoint.x-position.x)/(0.5*(vInit.x+ velocity.x)))
            if time2<=time1:
                time= time2
            else:
                position2= (position+ (velocity+vInit). multiply(0.5*time1)) 
                time= time1+fabs((intersectedPoint.x-position2.x)/( velocity.x))
    elif acceleration == 0:
        time= fabs((intersectedPoint.x-position.x)/(velocity.x))
    
    return time
    
def motionSteering (position, velocity, deltaTheta, deltaT ):
    ''' extrapolation hypothesis: steering with deltaTheta'''
    from math import atan2,cos,sin
    vInitial= velocity.norm2()
    theta= atan2(velocity.y,velocity.x)
    newTheta= theta + deltaTheta
    velocity= moving.Point(vInitial* cos(newTheta),vInitial* sin(newTheta))
    position= position+ (velocity). multiply(deltaT)
    return position    
    
def MonteCarlo(movingObject1,movingObject2, instant):
    ''' Monte Carlo Simulation :  estimate the probability of collision'''
    from random import uniform
    from math import pow, sqrt, sin, cos,atan2        
    N=1000
    ProbOfCollision = 0
    for n in range (1, N):
        # acceleration limit    
        acc1 = uniform(-0.040444,0)
        acc2 = uniform(-0.040444,0)    
        p1= movingObject1.getPositionAtInstant(instant)
        p2= movingObject2.getPositionAtInstant(instant)
        v1= movingObject1.getVelocityAtInstant(instant)
        v2= movingObject2.getVelocityAtInstant(instant)
        distance= (p1-p2).norm2()
        distanceThreshold= 1.8    
        t=1                
        while distance > distanceThreshold and t <= deltaT:
            # Extrapolation position
            (p1,v1) = motion(p1,v1,acc1)
            (p2,v2) = motion(p2,v2,acc2)
            distance= (p1-p2).norm2()
            if distance <=distanceThreshold:
                ProbOfCollision= ProbOfCollision+1
            t+=1
    POC= float(ProbOfCollision)/N
    return POC
    
def velocitySteering(velocity,steering):
    from math import atan2,cos,sin
    vInitial= velocity.norm2()
    theta= atan2(velocity.y,velocity.x)
    newTheta= theta + steering
    v= moving.Point(vInitial* cos(newTheta),vInitial* sin(newTheta))
    return v

def MonteCarloSteering(movingObject1,movingObject2, instant,steering1,steering2):
    ''' Monte Carlo Simulation :  estimate the probability of collision in case of steering'''
    from random import uniform
    from math import pow, sqrt, sin, cos,atan2        
    N=1000
    L= 2.4
    ProbOfCollision = 0
    for n in range (1, N):
        # acceleration limit    
        acc1 = uniform(-0.040444,0)
        acc2 = uniform(-0.040444,0)    
        p1= movingObject1.getPositionAtInstant(instant)
        p2= movingObject2.getPositionAtInstant(instant)
        vInit1= movingObject1.getVelocityAtInstant(instant)
        v1= velocitySteering (vInit1,steering1)
        vInit2= movingObject2.getVelocityAtInstant(instant)
        v2= velocitySteering (vInit2,steering2)
        distance= (p1-p2).norm2()
        distanceThreshold= 1.8    
        t=1                
        while distance > distanceThreshold and t <= deltaT:
            # Extrapolation position
            (p1,v1) = motion(p1,v1,acc1)
            (p2,v2) = motion(p2,v2,acc2)
            distance= (p1-p2).norm2()
            if distance <=distanceThreshold:
                ProbOfCollision= ProbOfCollision+1
            t+=1
    POC= float(ProbOfCollision)/N
    return POC


if __name__ == "__main__":
    import doctest
    import unittest
    suite = doctest.DocFileSuite('tests/extrapolation.txt')
    #suite = doctest.DocTestSuite()
    unittest.TextTestRunner().run(suite)
    #doctest.testmod()
    #doctest.testfile("example.txt")