Mercurial Hosting > traffic-intelligence
diff trafficintelligence/traffic_engineering.py @ 1028:cc5cb04b04b0
major update using the trafficintelligence package name and install through pip
author | Nicolas Saunier <nicolas.saunier@polymtl.ca> |
---|---|
date | Fri, 15 Jun 2018 11:19:10 -0400 |
parents | python/traffic_engineering.py@2cd1ce245024 |
children | c6cf75a2ed08 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/trafficintelligence/traffic_engineering.py Fri Jun 15 11:19:10 2018 -0400 @@ -0,0 +1,337 @@ +#! /usr/bin/env python +''' Traffic Engineering Tools and Examples''' + +from trafficintelligence import prediction + +from math import ceil + + +######################### +# Simulation +######################### + +def generateTimeHeadways(meanTimeHeadway, simulationTime): + '''Generates the time headways between arrivals + given the meanTimeHeadway and the negative exponential distribution + over a time interval of length simulationTime (assumed to be in same time unit as headway''' + from random import expovariate + headways = [] + totalTime = 0 + flow = 1/meanTimeHeadway + while totalTime < simulationTime: + h = expovariate(flow) + headways.append(h) + totalTime += h + return headways + +class RoadUser(object): + '''Simple example of inheritance to plot different road users ''' + def __init__(self, position, velocity): + 'Both fields are 2D numpy arrays' + self.position = position.astype(float) + self.velocity = velocity.astype(float) + + def move(self, deltaT): + self.position += deltaT*self.velocity + + def draw(self, init = False): + from matplotlib.pyplot import plot + if init: + self.plotLine = plot(self.position[0], self.position[1], self.getDescriptor())[0] + else: + self.plotLine.set_data(self.position[0], self.position[1]) + + +class PassengerVehicle(RoadUser): + def getDescriptor(self): + return 'dr' + +class Pedestrian(RoadUser): + def getDescriptor(self): + return 'xb' + +class Cyclist(RoadUser): + def getDescriptor(self): + return 'og' + +######################### +# queueing models +######################### + +class CapacityReduction(object): + def __init__(self, beta, reductionDuration, demandCapacityRatio = None, demand = None, capacity = None): + '''reduction duration should be positive + demandCapacityRatio is demand/capacity (q/s)''' + if demandCapacityRatio is None and demand is None and capacity is None: + print('Missing too much information (demand, capacity and ratio)') + import sys + sys.exit() + if 0 <= beta < 1: + self.beta = beta + self.reductionDuration = reductionDuration + + if demandCapacityRatio is not None: + self.demandCapacityRatio = demandCapacityRatio + if demand is not None: + self.demand = demand + if capacity is not None: + self.capacity = capacity + if capacity is not None and demand is not None: + self.demandCapacityRatio = float(self.demand)/self.capacity + if demand <= beta*capacity: + print('There is no queueing as the demand {} is inferior to the reduced capacity {}'.format(demand, beta*capacity)) + else: + print('reduction coefficient (beta={}) is not in [0, 1['.format(beta)) + + def queueingDuration(self): + return self.reductionDuration*(1-self.beta)/(1-self.demandCapacityRatio) + + def nArrived(self, t): + if self.demand is None: + print('Missing demand field') + return None + return self.demand*t + + def nServed(self, t): + if self.capacity is None: + print('Missing capacity field') + return None + if 0<=t<=self.reductionDuration: + return self.beta*self.capacity*t + elif self.reductionDuration < t <= self.queueingDuration(): + return self.beta*self.capacity*self.reductionDuration+self.capacity*(t-self.reductionDuration) + + def nQueued(self, t): + return self.nArrived(t)-self.nServed(t) + + def maxNQueued(self): + return self.nQueued(self.reductionDuration) + + def totalDelay(self): + if self.capacity is None: + print('Missing capacity field') + return None + return self.capacity*self.reductionDuration**2*(1-self.beta)*(self.demandCapacityRatio-self.beta)/(2*(1-self.demandCapacityRatio)) + + def averageDelay(self): + return self.reductionDuration*(self.demandCapacityRatio-self.beta)/(2*self.demandCapacityRatio) + + def averageNQueued(self): + return self.totalDelay()/self.queueingDuration() + + +######################### +# fundamental diagram +######################### + +class FundamentalDiagram(object): + ''' ''' + def __init__(self, name): + self.name = name + + def q(self, k): + return k*self.v(k) + + @staticmethod + def meanHeadway(k): + return 1/k + + @staticmethod + def meanSpacing(q): + return 1/q + + def plotVK(self, language='fr', units={}): + from numpy import arange + from matplotlib.pyplot import figure,plot,xlabel,ylabel + densities = [k for k in arange(1, self.kj+1)] + figure() + plot(densities, [self.v(k) for k in densities]) + xlabel('Densite (veh/km)') # todo other languages and adapt to units + ylabel('Vitesse (km/h)') + + def plotQK(self, language='fr', units={}): + from numpy import arange + from matplotlib.pyplot import figure,plot,xlabel,ylabel + densities = [k for k in arange(1, self.kj+1)] + figure() + plot(densities, [self.q(k) for k in densities]) + xlabel('Densite (veh/km)') # todo other languages and adapt to units + ylabel('Debit (km/h)') + +class GreenbergFD(FundamentalDiagram): + '''Speed is the logarithm of density''' + def __init__(self, vc, kj): + FundamentalDiagram.__init__(self,'Greenberg') + self.vc=vc + self.kj=kj + + def v(self,k): + from numpy import log + return self.vc*log(self.kj/k) + + def criticalDensity(self): + from numpy import e + self.kc = self.kj/e + return self.kc + + def capacity(self): + self.qmax = self.kc*self.vc + return self.qmax + +######################### +# intersection +######################### + +class FourWayIntersection(object): + '''Simple class for simple intersection outline''' + def __init__(self, dimension, coordX, coordY): + self.dimension = dimension + self.coordX = coordX + self.coordY = coordY + + def plot(self, options = 'k'): + from matplotlib.pyplot import plot, axis + + minX = min(self.dimension[0]) + maxX = max(self.dimension[0]) + minY = min(self.dimension[1]) + maxY = max(self.dimension[1]) + + plot([minX, self.coordX[0], self.coordX[0]], [self.coordY[0], self.coordY[0], minY],options) + plot([self.coordX[1], self.coordX[1], maxX], [minY, self.coordY[0], self.coordY[0]],options) + plot([minX, self.coordX[0], self.coordX[0]], [self.coordY[1], self.coordY[1], maxY],options) + plot([self.coordX[1], self.coordX[1], maxX], [maxY, self.coordY[1], self.coordY[1]],options) + axis('equal') + +######################### +# traffic signals +######################### + +class Volume(object): + '''Class to represent volumes with varied vehicule types ''' + def __init__(self, volume, types = ['pc'], proportions = [1], equivalents = [1], nLanes = 1): + '''mvtEquivalent is the equivalent if the movement is right of left turn''' + + # check the sizes of the lists + if sum(proportions) == 1: + self.volume = volume + self.types = types + self.proportions = proportions + self.equivalents = equivalents + self.nLanes = nLanes + else: + print('Proportions do not sum to 1') + pass + + def checkProtected(self, opposedThroughMvt): + '''Checks if this left movement should be protected, + ie if one of the main two conditions on left turn is verified''' + return self.volume >= 200 or self.volume*opposedThroughMvt.volume/opposedThroughMvt.nLanes > 50000 + + def getPCUVolume(self): + '''Returns the passenger-car equivalent for the input volume''' + v = 0 + for p, e in zip(self.proportions, self.equivalents): + v += p*e + return v*self.volume + +class IntersectionMovement(object): + '''Represents an intersection movement + with a volume, a type (through, left or right) + and an equivalent for movement type''' + def __init__(self, volume, mvtEquivalent = 1): + self.volume = volume + self.mvtEquivalent = mvtEquivalent + + def getTVUVolume(self): + return self.mvtEquivalent*self.volume.getPCUVolume() + +class LaneGroup(object): + '''Class that represents a group of mouvements''' + + def __init__(self, movements, nLanes): + self.movements = movements + self.nLanes = nLanes + + def getTVUVolume(self): + return sum([mvt.getTVUVolume() for mvt in self.movements]) + + def getCharge(self, saturationVolume): + return self.getTVUVolume()/(self.nLanes*saturationVolume) + +def optimalCycle(lostTime, criticalCharge): + return (1.5*lostTime+5)/(1-criticalCharge) + +def minimumCycle(lostTime, criticalCharge, degreeSaturation=1.): + 'degree of saturation can be used as the peak hour factor too' + return lostTime/(1-criticalCharge/degreeSaturation) + +class Cycle(object): + '''Class to compute optimal cycle and the split of effective green times''' + def __init__(self, phases, lostTime, saturationVolume): + '''phases is a list of phases + a phase is a list of lanegroups''' + self.phases = phases + self.lostTime = lostTime + self.saturationVolume = saturationVolume + + def computeCriticalCharges(self): + self.criticalCharges = [max([lg.getCharge(self.saturationVolume) for lg in phase]) for phase in self.phases] + self.criticalCharge = sum(self.criticalCharges) + + def computeOptimalCycle(self): + self.computeCriticalCharges() + self.C = optimalCycle(self.lostTime, self.criticalCharge) + return self.C + + def computeMinimumCycle(self, degreeSaturation=1.): + self.computeCriticalCharges() + self.C = minimumCycle(self.lostTime, self.criticalCharge, degreeSaturation) + return self.C + + def computeEffectiveGreen(self): + #from numpy import round + #self.computeCycle() # in case it was not done before + effectiveGreenTime = self.C-self.lostTime + self.effectiveGreens = [round(c*effectiveGreenTime/self.criticalCharge,1) for c in self.criticalCharges] + return self.effectiveGreens + + +def computeInterGreen(perceptionReactionTime, initialSpeed, intersectionLength, vehicleAverageLength = 6, deceleration = 3): + '''Computes the intergreen time (yellow/amber plus all red time) + Deceleration is positive + All variables should be in the same units''' + if deceleration > 0: + return [perceptionReactionTime+float(initialSpeed)/(2*deceleration), float(intersectionLength+vehicleAverageLength)/initialSpeed] + else: + print('Issue deceleration should be strictly positive') + return None + +def uniformDelay(cycleLength, effectiveGreen, saturationDegree): + '''Computes the uniform delay''' + return 0.5*cycleLength*(1-float(effectiveGreen)/cycleLength)**2/(1-float(effectiveGreen*saturationDegree)/cycleLength) + +def randomDelay(volume, saturationDegree): + '''Computes the random delay = queueing time for M/D/1''' + return saturationDegree**2/(2*volume*(1-saturationDegree)) + +def incrementalDelay(T, X, c, k=0.5, I=1): + '''Computes the incremental delay (HCM) + T in hours + c capacity of the lane group + k default for fixed time signal + I=1 for isolated intersection (Poisson arrival)''' + from math import sqrt + return 900*T*(X - 1 + sqrt((X - 1)**2 + 8*k*I*X/(c*T))) + +######################### +# misc +######################### + +def timeChangingSpeed(v0, vf, a, TPR): + 'for decelerations, a < 0' + return TPR-(vf-v0)/a + +def distanceChangingSpeed(v0, vf, a, TPR): + 'for decelerations, a < 0' + return TPR*v0-(vf**2-v0**2)/(2*a)