view python/traffic_engineering.py @ 398:3399bd48cb40

Ajout d'une méthode pour obtenir le nombre de FPS Méthode de capture des trames vidéos plus résistante aux erreur Utilisation d'un dictionnaire pour les fichier de configuration afin de garder le nom des sections
author Jean-Philippe Jodoin <jpjodoin@gmail.com>
date Mon, 29 Jul 2013 13:46:07 -0400
parents 539e2b4cfaa3
children 7828fec8bbd2
line wrap: on
line source

#! /usr/bin/env python
''' Traffic Engineering Tools and Examples'''

from math import ceil

import prediction

__metaclass__ = type


#########################
# Simulation
#########################

class Vehicle:
    '''Generic vehicle class
    1D coordinates for now'''
    class PredictedTrajectory1D(prediction.PredictedTrajectory):
        def __init__(self, initialPosition, initialSpeed, control, maxSpeed = None):
            self.control = control
            self.maxSpeed = maxSpeed
            self.probability = None
            self.predictedPositions = {0: moving.Point(initialPosition)}
            self.predictedSpeedOrientations = {0: moving.NormAngle(initialSpeed, 0)}
            
        def setAcceleration(self, acceleration):
            self.control = moving.NormAngle(acceleration, 0)
            
        def getControl(self):
            return self.control


    def __init__(self, initialPosition = 0, initialSpeed = 0, acceleration = 0, prt = 2.5, leader = None):
        self.positions = PredictedTrajectory1D(initialPosition, initialSpeed)

        self.prt = prt
        self.leader = leader
        # todo add microModel (Treiber)

    def setAcceleration(self, acceleration):
        self.positions.setAcceleration(acceleration)

    def updatePosition(self, dt): # knowledge of time outside of vehicle ??
        speed = self.speed
        self.speed += self.acceleration*dt
        self.position += speed*dt
        if self.log:
            self.positions.append(self.position)
            self.speeds.append(self.speed)
            self.accelerations.append(self.acceleration)

    def updateAcceleration(self, dt):
        '''Updates acceleration and speed as a function of leader
        and other factors'''
        pass

    def update(self, dt):
        self.updatePosition(dt)
        self.updateAcceleration(dt) # function of leader

    def printStats(self):
        print('{0} {1} {2}'.format(self.position, self.speed, self.acceleration))

#########################
# fundamental diagram
#########################

class FundamentalDiagram:
    ''' '''
    def __init__(self, name):
        self.name = name

    def q(self, k):
        return k*self.v(k)

    @staticmethod
    def meanHeadway(k):
        return 1/k
    
    @staticmethod
    def meanSpacing(q):
        return 1/q

    def plotVK(self, language='fr', units={}):
        from numpy import arange
        from matplotlib.pyplot import figure,plot,xlabel,ylabel
        densities = [k for k in arange(1, self.kj+1)]
        figure()
        plot(densities, [self.v(k) for k in densities])
        xlabel('Densite (veh/km)') # todo other languages and adapt to units
        ylabel('Vitesse (km/h)')

    def plotQK(self, language='fr', units={}):
        from numpy import arange
        from matplotlib.pyplot import figure,plot,xlabel,ylabel
        densities = [k for k in arange(1, self.kj+1)]
        figure()
        plot(densities, [self.q(k) for k in densities])
        xlabel('Densite (veh/km)') # todo other languages and adapt to units
        ylabel('Debit (km/h)')

class GreenbergFD(FundamentalDiagram):
    '''Speed is the logarithm of density'''
    def __init__(self, vc, kj):
        FundamentalDiagram.__init__(self,'Greenberg')
        self.vc=vc
        self.kj=kj
    
    def v(self,k):
        from numpy import log
        return self.vc*log(self.kj/k)

    def criticalDensity(self): 
        from numpy import e
        self.kc = self.kj/e
        return self.kc

    def capacity(self):
        self.qmax = self.kc*self.vc
        return self.qmax

#########################
# intersection
#########################

class FourWayIntersection:
    '''Simple class for simple intersection outline'''
    def __init__(self, dimension, coordX, coordY):
        self.dimension = dimension
        self.coordX = coordX
        self.coordY = coordY

    def plot(self, options = 'k'):
        from matplotlib.pyplot import plot, axis
    
        minX = min(self.dimension[0])
        maxX = max(self.dimension[0])
        minY = min(self.dimension[1])
        maxY = max(self.dimension[1])
        
        plot([minX, self.coordX[0], self.coordX[0]], [self.coordY[0], self.coordY[0], minY],options)
        plot([self.coordX[1], self.coordX[1], maxX], [minY, self.coordY[0], self.coordY[0]],options)
        plot([minX, self.coordX[0], self.coordX[0]], [self.coordY[1], self.coordY[1], maxY],options)
        plot([self.coordX[1], self.coordX[1], maxX], [maxY, self.coordY[1], self.coordY[1]],options)
        axis('equal')

#########################
# traffic signals
#########################

class Volume:
    '''Class to represent volumes with varied vehicule types '''
    def __init__(self, volume, types = ['pc'], proportions = [1], equivalents = [1], nLanes = 1):
        '''mvtEquivalent is the equivalent if the movement is right of left turn'''

        # check the sizes of the lists
        if sum(proportions) == 1:
            self.volume = volume
            self.types = types
            self.proportions = proportions
            self.equivalents = equivalents
            self.nLanes = nLanes
        else:
            print('Proportions do not sum to 1')
            pass

    def checkProtected(self, opposedThroughMvt):
        '''Checks if this left movement should be protected,
        ie if one of the main two conditions on left turn is verified'''
        return self.volume >= 200 or self.volume*opposedThroughMvt.volume/opposedThroughMvt.nLanes > 50000

    def getPCUVolume(self):
        '''Returns the passenger-car equivalent for the input volume'''
        v = 0
        for p, e in zip(self.proportions, self.equivalents):
            v += p*e
        return v*self.volume

class IntersectionMovement:
    '''Represents an intersection movement
    with a volume, a type (through, left or right)
    and an equivalent for movement type'''
    def __init__(self, volume, mvtEquivalent = 1):
        self.volume = volume
        self.mvtEquivalent = mvtEquivalent

    def getTVUVolume(self):
        return self.mvtEquivalent*self.volume.getPCUVolume()    

class LaneGroup:
    '''Class that represents a group of mouvements'''

    def __init__(self, movements, nLanes):
        self.movements = movements
        self.nLanes = nLanes

    def getTVUVolume(self):
        return sum([mvt.getTVUVolume() for mvt in self.movements])

    def getCharge(self, saturationVolume):
        return self.getTVUVolume()/(self.nLanes*saturationVolume)

def optimalCycle(lostTime, criticalCharge):
    return (1.5*lostTime+5)/(1-criticalCharge)

def minimumCycle(lostTime, criticalCharge, degreeSaturation=1.):
    'degree of saturation can be used as the peak hour factor too'
    return lostTime/(1-criticalCharge/degreeSaturation)

class Cycle:
    '''Class to compute optimal cycle and the split of effective green times'''
    def __init__(self, phases, lostTime, saturationVolume):
        '''phases is a list of phases
        a phase is a list of lanegroups'''
        self.phases = phases
        self.lostTime = lostTime
        self.saturationVolume = saturationVolume

    def computeCriticalCharges(self):
        self.criticalCharges = [max([lg.getCharge(self.saturationVolume) for lg in phase]) for phase in self.phases]
        self.criticalCharge = sum(self.criticalCharges)
        
    def computeOptimalCycle(self):
        self.computeCriticalCharges()
        self.C = optimalCycle(self.lostTime, self.criticalCharge)
        return self.C

    def computeMinimumCycle(self, degreeSaturation=1.):
        self.computeCriticalCharges()
        self.C = minimumCycle(self.lostTime, self.criticalCharge, degreeSaturation)
        return self.C

    def computeEffectiveGreen(self):
        #from numpy import round
        #self.computeCycle() # in case it was not done before
        effectiveGreenTime = self.C-self.lostTime
        self.effectiveGreens = [round(c*effectiveGreenTime/self.criticalCharge,1) for c in self.criticalCharges]
        return self.effectiveGreens


def computeInterGreen(perceptionReactionTime, initialSpeed, intersectionLength, vehicleAverageLength = 6, deceleration = 3):
    '''Computes the intergreen time (yellow/amber plus all red time)
    Deceleration is positive
    All variables should be in the same units'''
    if deceleration > 0:
        return [perceptionReactionTime+float(initialSpeed)/(2*deceleration), float(intersectionLength+vehicleAverageLength)/initialSpeed]
    else:
        print 'Issue deceleration should be strictly positive'
        return None

def uniformDelay(cycleLength, effectiveGreen, saturationDegree):
    '''Computes the uniform delay'''
    return 0.5*cycleLength*(1-float(effectiveGreen)/cycleLength)/(1-float(effectiveGreen*saturationDegree)/cycleLength)

#########################
# misc
#########################

def timeChangingSpeed(v0, vf, a, TPR):
    return TPR+(vf-v0)/a

def distanceChangingSpeed(v0, vf, a, TPR):
    return TPR*v0+(vf*vf-v0*v0)/(2*a)