view python/traffic_engineering.py @ 300:f65b828e5521

working on trajectory simulation
author Nicolas Saunier <nicolas.saunier@polymtl.ca>
date Wed, 13 Feb 2013 18:26:49 -0500
parents 82b4101d9a2f
children 9d88a4d97473
line wrap: on
line source

#! /usr/bin/env python
''' Traffic Engineering Tools and Examples'''

from math import ceil

__metaclass__ = type


#########################
# Simulation
#########################

class Vehicle:
    'Generic vehicle class
    1D coordinates for now'
    class PredictedTrajectory1D(prediction.PredictedTrajectory):
        def __init__(self, initialPosition, initialSpeed, control, maxSpeed = None):
            self.control = control
            self.maxSpeed = maxSpeed
            self.probability = None
            self.predictedPositions = {0: moving.Point(initialPosition)}
            self.predictedSpeedOrientations = {0: moving.NormAngle(initialSpeed, 0)}
            
        def setAcceleration(self, acceleration):
            self.control = moving.NormAngle(acceleration, 0)
            
        def getControl(self):
            return self.control


    def __init__(self, initialPosition = 0, initialSpeed = 0, acceleration = 0, prt = 2.5, leader = None):
        self.positions = PredictedTrajectory1D(initialPosition, initialSpeed)

        self.prt = prt
        self.leader = leader
        # todo add microModel (Treiber)

    def setAcceleration(self, acceleration):
        self.positions.setAcceleration(acceleration)

    def updatePosition(self, dt): # knowledge of time outside of vehicle ??
        speed = self.speed
        self.speed += self.acceleration*dt
        self.position += speed*dt
        if self.log:
            self.positions.append(self.position)
            self.speeds.append(self.speed)
            self.accelerations.append(self.acceleration)

    def updateAcceleration(self, dt):
        '''Updates acceleration and speed as a function of leader
        and other factors'''
        pass

    def update(self, dt):
        self.updatePosition(dt)
        self.updateAcceleration(dt) # function of leader

    def printStats(self):
        print('{0} {1} {2}'.format(self.position, self.speed, self.acceleration))

#########################
# fundamental diagram
#########################

class FundamentalDiagram:
    ''' '''
    def __init__(self, name):
        self.name = name

    def q(self, k):
        return k*self.v(k)

    @staticmethod
    def meanHeadway(k):
        return 1/k
    
    @staticmethod
    def meanSpacing(q):
        return 1/q

    def plotVK(self, language='fr', units={}):
        from numpy import arange
        from matplotlib.pyplot import figure,plot,xlabel,ylabel
        densities = [k for k in arange(1, self.kj+1)]
        figure()
        plot(densities, [self.v(k) for k in densities])
        xlabel('Densite (veh/km)') # todo other languages and adapt to units
        ylabel('Vitesse (km/h)')

    def plotQK(self, language='fr', units={}):
        from numpy import arange
        from matplotlib.pyplot import figure,plot,xlabel,ylabel
        densities = [k for k in arange(1, self.kj+1)]
        figure()
        plot(densities, [self.q(k) for k in densities])
        xlabel('Densite (veh/km)') # todo other languages and adapt to units
        ylabel('Debit (km/h)')

class GreenbergFD(FundamentalDiagram):
    '''Speed is the logarithm of density'''
    def __init__(self, vc, kj):
        FundamentalDiagram.__init__(self,'Greenberg')
        self.vc=vc
        self.kj=kj
    
    def v(self,k):
        from numpy import log
        return self.vc*log(self.kj/k)

    def criticalDensity(self): 
        from numpy import e
        self.kc = self.kj/e
        return self.kc

    def capacity(self):
        self.qmax = self.kc*self.vc
        return self.qmax

#########################
# intersection
#########################

class FourWayIntersection:
    '''Simple class for simple intersection outline'''
    def __init__(self, dimension, coordX, coordY):
        self.dimension = dimension
        self.coordX = coordX
        self.coordY = coordY

    def plot(self, options = 'k'):
        from matplotlib.pyplot import plot, axis
    
        minX = min(self.dimension[0])
        maxX = max(self.dimension[0])
        minY = min(self.dimension[1])
        maxY = max(self.dimension[1])
        
        plot([minX, self.coordX[0], self.coordX[0]], [self.coordY[0], self.coordY[0], minY],options)
        plot([self.coordX[1], self.coordX[1], maxX], [minY, self.coordY[0], self.coordY[0]],options)
        plot([minX, self.coordX[0], self.coordX[0]], [self.coordY[1], self.coordY[1], maxY],options)
        plot([self.coordX[1], self.coordX[1], maxX], [maxY, self.coordY[1], self.coordY[1]],options)
        axis('equal')

#########################
# traffic signals
#########################

class Volume:
    '''Class to represent volumes with varied vehicule types '''
    def __init__(self, volume, types = ['pc'], proportions = [1], equivalents = [1], nLanes = 1):
        '''mvtEquivalent is the equivalent if the movement is right of left turn'''

        # check the sizes of the lists
        if sum(proportions) == 1:
            self.volume = volume
            self.types = types
            self.proportions = proportions
            self.equivalents = equivalents
            self.nLanes = nLanes # unused
        else:
            pass

    def getPCUVolume(self):
        '''Returns the passenger-car equivalent for the input volume'''
        v = 0
        for p, e in zip(self.proportions, self.equivalents):
            v += p*e
        return v*self.volume

class IntersectionMovement:
    '''Represents an intersection movement
    with a volume, a type (through, left or right)
    and an equivalent for movement type'''
    def __init__(self, volume, mvtEquivalent = 1):
        self.volume = volume
        self.mvtEquivalent = mvtEquivalent

    def getTVUVolume(self):
        return self.mvtEquivalent*self.volume.getPCUVolume()    

class IntersectionApproach: # should probably not be used
    def __init__(self, leftTurnVolume, throughVolume, rightTurnVolume):
        self.leftTurnVolume = leftTurnVolume
        self.throughVolume = throughVolume
        self.rightTurnVolume = rightTurnVolume

    def getTVUVolume(self, leftTurnEquivalent = 1, throughEquivalent = 1, rightTurnEquivalent = 1):
        return self.leftTurnVolume.getPCUVolume()*leftTurnEquivalent+self.throughVolume.getPCUVolume()*throughEquivalent+self.rightTurnVolume.getPCUVolume()*rightTurnEquivalent

class LaneGroup:
    '''Class that represents a group of mouvements'''

    def __init__(self, movements, nLanes):
        self.movements = movements
        self.nLanes = nLanes

    def getTVUVolume(self):
        return sum([mvt.getTVUVolume() for mvt in self.movements])

    def getCharge(self, saturationVolume):
        return self.getTVUVolume()/(self.nLanes*saturationVolume)

def checkProtectedLeftTurn(leftMvt, opposedThroughMvt):
    '''Checks if one of the main two conditions on left turn is verified
    The lane groups should contain left and through movement'''
    return leftMvt.volume >= 200 or leftMvt.volume*opposedThroughMvt.volume/opposedThroughMvt.nLanes > 50000

def optimalCycle(lostTime, criticalCharge):
    return (1.5*lostTime+5)/(1-criticalCharge)

def minimumCycle(lostTime, criticalCharge, degreeSaturation=1.):
    'degree of saturation can be used as the peak hour factor too'
    return lostTime/(1-criticalCharge/degreeSaturation)

class Cycle:
    '''Class to compute optimal cycle and the split of effective green times'''
    def __init__(self, phases, lostTime, saturationVolume):
        '''phases is a list of phases
        a phase is a list of lanegroups'''
        self.phases = phases
        self.lostTime = lostTime
        self.saturationVolume = saturationVolume

    def computeCriticalCharges(self):
        self.criticalCharges = []
        for phase in self.phases:
            self.criticalCharges.append(max([lg.getCharge(self.saturationVolume) for lg in phase]))
        self.criticalCharge = sum(self.criticalCharges)
        
    def computeOptimalCycle(self):
        self.computeCriticalCharges()
        self.C = optimalCycle(self.lostTime, self.criticalCharge)
        return self.C

    def computeMinimumCycle(self, degreeSaturation=1.):
        self.computeCriticalCharges()
        self.C = minimumCycle(self.lostTime, self.criticalCharge, degreeSaturation)
        return self.C

    def computeEffectiveGreen(self):
        from numpy import round
        #self.computeCycle() # in case it was not done before
        effectiveGreenTime = self.C-self.lostTime
        self.effectiveGreens = [round(c*effectiveGreenTime/self.criticalCharge,1) for c in self.criticalCharges]
        return self.effectiveGreens


def computeInterGreen(perceptionReactionTime, initialSpeed, intersectionLength, vehicleAverageLength = 6, deceleration = 3):
    '''Computes the intergreen time (yellow/amber plus all red time)
    Deceleration is positive
    All variables should be in the same units'''
    if deceleration > 0:
        return [perceptionReactionTime+float(initialSpeed)/(2*deceleration), float(intersectionLength+vehicleAverageLength)/initialSpeed]
    else:
        print 'Issue deceleration should be strictly positive'
        return None

def uniformDelay(cycleLength, effectiveGreen, saturationDegree):
    '''Computes the uniform delay'''
    return 0.5*cycleLength*(1-float(effectiveGreen)/cycleLength)/(1-float(effectiveGreen*saturationDegree)/cycleLength)

#########################
# misc
#########################

def timeChangingSpeed(v0, vf, a, TPR):
    return TPR+(vf-v0)/a

def distanceChangingSpeed(v0, vf, a, TPR):
    return TPR*v0+(vf*vf-v0*v0)/(2*a)