Mercurial Hosting > traffic-intelligence
view trafficintelligence/traffic_engineering.py @ 1262:f10e84505443
modif for highway level of service
author | Nicolas Saunier <nicolas.saunier@polymtl.ca> |
---|---|
date | Wed, 17 Apr 2024 16:46:23 -0400 |
parents | 3bfdb2ffd29d |
children |
line wrap: on
line source
#! /usr/bin/env python ''' Traffic Engineering Tools and Examples''' from math import ceil from numpy import e, log, arange from scipy import stats from matplotlib.pyplot import figure,plot,xlabel,ylabel, xlim, ylim from trafficintelligence import prediction ######################### # Simulation ######################### def generateTimeHeadways(meanTimeHeadway, simulationTime): '''Generates the time headways between arrivals given the meanTimeHeadway and the negative exponential distribution over a time interval of length simulationTime (assumed to be in same time unit as headway''' from random import expovariate headways = [] totalTime = 0 flow = 1/meanTimeHeadway while totalTime < simulationTime: h = expovariate(flow) headways.append(h) totalTime += h return headways class RoadUser(object): '''Simple example of inheritance to plot different road users ''' def __init__(self, position, velocity): 'Both fields are 2D numpy arrays' self.position = position.astype(float) self.velocity = velocity.astype(float) def move(self, deltaT): self.position += deltaT*self.velocity def draw(self, init = False): from matplotlib.pyplot import plot if init: self.plotLine = plot(self.position[0], self.position[1], self.getDescriptor())[0] else: self.plotLine.set_data(self.position[0], self.position[1]) class PassengerVehicle(RoadUser): def getDescriptor(self): return 'dr' class Pedestrian(RoadUser): def getDescriptor(self): return 'xb' class Cyclist(RoadUser): def getDescriptor(self): return 'og' ######################### # queueing models ######################### class CapacityReduction(object): def __init__(self, beta, reductionDuration, demandCapacityRatio = None, demand = None, capacity = None): '''reduction duration should be positive demandCapacityRatio is demand/capacity (q/s)''' if demandCapacityRatio is None and demand is None and capacity is None: print('Missing too much information (demand, capacity and ratio)') import sys sys.exit() if 0 <= beta < 1: self.beta = beta self.reductionDuration = reductionDuration if demandCapacityRatio is not None: self.demandCapacityRatio = demandCapacityRatio if demand is not None: self.demand = demand if capacity is not None: self.capacity = capacity if capacity is not None and demand is not None: self.demandCapacityRatio = float(self.demand)/self.capacity if demand <= beta*capacity: print('There is no queueing as the demand {} is inferior to the reduced capacity {}'.format(demand, beta*capacity)) else: print('reduction coefficient (beta={}) is not in [0, 1['.format(beta)) def queueingDuration(self): return self.reductionDuration*(1-self.beta)/(1-self.demandCapacityRatio) def nArrived(self, t): '''since the beginning of the capacity reduction''' if self.demand is None or t<0: print('Missing demand field') return None return self.demand*t def nServed(self, t): '''since the beginning of the capacity reduction''' if self.capacity is None or t<0: print('Missing capacity field') return None if 0<=t<=self.reductionDuration: return self.beta*self.capacity*t elif self.reductionDuration < t: qDuration = self.queueingDuration() if t <= qDuration: return self.beta*self.capacity*self.reductionDuration+self.capacity*(t-self.reductionDuration) else: return self.beta*self.capacity*self.reductionDuration+self.capacity*(qDuration-self.reductionDuration)+self.demand*(t-qDuration) def nQueued(self, t): return self.nArrived(t)-self.nServed(t) def maxNQueued(self): return self.nQueued(self.reductionDuration) def totalDelay(self): if self.capacity is None: print('Missing capacity field') return None return self.capacity*self.reductionDuration**2*(1-self.beta)*(self.demandCapacityRatio-self.beta)/(2*(1-self.demandCapacityRatio)) def averageDelay(self): return self.reductionDuration*(self.demandCapacityRatio-self.beta)/(2*self.demandCapacityRatio) def averageNQueued(self): return self.totalDelay()/self.queueingDuration() ######################### # fundamental diagrams ######################### class FundamentalDiagram(object): ''' ''' def __init__(self, name): self.name = name self.kj = None self.kc = None self.vf = None self.qmax = None def getJamDensity(self): return self.kj def getCriticalDensity(self): return self.kc def getCapacity(self): return self.qmax def getFreeFlowSpeed(self): return self.vf def q(self, k): return k*self.v(k) @staticmethod def meanHeadway(k): return 1/k @staticmethod def meanSpacing(q): return 1/q def plotVK(self, language='fr', units={}): densities = [k for k in arange(1, self.kj+1)] figure() plot(densities, [self.v(k) for k in densities]) xlim(xmin=0) ylim(ymin=0) xlabel('Densite (veh/km)') # todo other languages and adapt to units ylabel('Vitesse (km/h)') def plotQK(self, language='fr', units={}): densities = [k for k in arange(1, self.kj+1)] figure() plot(densities, [self.q(k) for k in densities]) xlim(xmin=0) ylim(ymin=0) xlabel('Densite (veh/km)') # todo other languages and adapt to units ylabel('Debit (km/h)') class GreenshieldsFD(FundamentalDiagram): '''Speed is a linear function of density''' def __init__(self, vf, kj): FundamentalDiagram.__init__(self,'Greenshields') self.vf=vf self.kj=kj self.kc=kj/2 self.qmax=vf*kj/4 def v(self,k): from numpy import log return self.vf*(1-k/self.kj) class GreenbergFD(FundamentalDiagram): '''Speed is the logarithm of density''' def __init__(self, vc, kj): FundamentalDiagram.__init__(self,'Greenberg') self.vc=vc self.kj=kj self.qmax = self.kc*self.vc self.kc = self.kj/e def v(self,k): return self.vc*log(self.kj/k) class TriangularFD(FundamentalDiagram): def __init__(self, vf = None, kc = None, kj = None, qmax = None, w = None): FundamentalDiagram.__init__(self,'Triangular') if vf is not None and qmax is not None and kj is not None: self.vf=vf self.qmax = qmax self.kj = kj self.kc = qmax/vf self.w = qmax/(self.kc-kj) def v(self, k): if k<self.kc: return self.vf else: return self.vf*self.kc*(self.kj/k-1)/(self.kj-self.kc) def generateDensities(n, maxDensity): return stats.uniform.rvs(size=n)*maxDensity def generateSpeedVolumes(fd, n, maxDensity, maxHGVProportion = 0, etrucks = 2.5): densities = generateDensities(n, maxDensity) speeds = [fd.v(k) for k in densities] volumes = [fd.q(k) for k in densities] if maxHGVProportion > 0: hgvProportions = stats.uniform.rvs(size=n)*maxHGVProportion # en pourcent volumes = [v/(1+(etrucks-1)*p/100) for v,p in zip(volumes, hgvProportions)] else: hgvProportions = None return speeds, volumes, hgvProportions higwayMaxDensityLOS = {'A':7, 'B':11, 'C':16, 'D':22, 'E': 28} def highwayLOS(k): 'returns the highway level of service for density k in veh/km' for los, kmax in higwayMaxDensityLOS.items(): if k<kmax: return los return 'F' ######################### # intersection ######################### class FourWayIntersection(object): '''Simple class for simple intersection outline''' def __init__(self, dimension, coordX, coordY): self.dimension = dimension self.coordX = coordX self.coordY = coordY def plot(self, options = 'k'): from matplotlib.pyplot import plot, axis minX = min(self.dimension[0]) maxX = max(self.dimension[0]) minY = min(self.dimension[1]) maxY = max(self.dimension[1]) plot([minX, self.coordX[0], self.coordX[0]], [self.coordY[0], self.coordY[0], minY],options) plot([self.coordX[1], self.coordX[1], maxX], [minY, self.coordY[0], self.coordY[0]],options) plot([minX, self.coordX[0], self.coordX[0]], [self.coordY[1], self.coordY[1], maxY],options) plot([self.coordX[1], self.coordX[1], maxX], [maxY, self.coordY[1], self.coordY[1]],options) axis('equal') ######################### # traffic signals ######################### class Volume(object): '''Class to represent volumes with varied vehicule types ''' def __init__(self, volume, types = ['pc'], proportions = [1], equivalents = [1], nLanes = 1): '''mvtEquivalent is the equivalent if the movement is right of left turn''' # check the sizes of the lists if sum(proportions) == 1: self.volume = volume self.types = types self.proportions = proportions self.equivalents = equivalents self.nLanes = nLanes else: print('Proportions do not sum to 1') pass def checkProtected(self, opposedThroughMvt): '''Checks if this left movement should be protected, ie if one of the main two conditions on left turn is verified''' return self.volume >= 200 or self.volume*opposedThroughMvt.volume/opposedThroughMvt.nLanes > 50000 def getPCUVolume(self): '''Returns the passenger-car equivalent for the input volume''' v = 0 for p, e in zip(self.proportions, self.equivalents): v += p*e return v*self.volume class IntersectionMovement(object): '''Represents an intersection movement with a volume, a type (through, left or right) and an equivalent for movement type''' def __init__(self, volume, mvtEquivalent = 1): self.volume = volume self.mvtEquivalent = mvtEquivalent def getTVUVolume(self): return self.mvtEquivalent*self.volume.getPCUVolume() class LaneGroup(object): '''Class that represents a group of mouvements''' def __init__(self, movements, nLanes): self.movements = movements self.nLanes = nLanes def getTVUVolume(self): return sum([mvt.getTVUVolume() for mvt in self.movements]) def getCharge(self, saturationVolume): return self.getTVUVolume()/(self.nLanes*saturationVolume) def optimalCycle(lostTime, criticalCharge): return (1.5*lostTime+5)/(1-criticalCharge) def minimumCycle(lostTime, criticalCharge, degreeSaturation=1.): 'degree of saturation can be used as the peak hour factor too' return lostTime/(1-criticalCharge/degreeSaturation) class Cycle(object): '''Class to compute optimal cycle and the split of effective green times''' def __init__(self, phases, lostTime, saturationVolume): '''phases is a list of phases a phase is a list of lanegroups''' self.phases = phases self.lostTime = lostTime self.saturationVolume = saturationVolume def computeCriticalCharges(self): self.criticalCharges = [max([lg.getCharge(self.saturationVolume) for lg in phase]) for phase in self.phases] self.criticalCharge = sum(self.criticalCharges) def computeOptimalCycle(self): self.computeCriticalCharges() self.C = optimalCycle(self.lostTime, self.criticalCharge) return self.C def computeMinimumCycle(self, degreeSaturation=1.): self.computeCriticalCharges() self.C = minimumCycle(self.lostTime, self.criticalCharge, degreeSaturation) return self.C def computeEffectiveGreen(self): #from numpy import round #self.computeCycle() # in case it was not done before effectiveGreenTime = self.C-self.lostTime self.effectiveGreens = [round(c*effectiveGreenTime/self.criticalCharge,1) for c in self.criticalCharges] return self.effectiveGreens def computeInterGreen(perceptionReactionTime, initialSpeed, intersectionLength, vehicleAverageLength = 6, deceleration = 3): '''Computes the intergreen time (yellow/amber plus all red time) Deceleration is positive All variables should be in the same units''' if deceleration > 0: return [perceptionReactionTime+float(initialSpeed)/(2*deceleration), float(intersectionLength+vehicleAverageLength)/initialSpeed] else: print('Issue deceleration should be strictly positive') return None def uniformDelay(cycleLength, effectiveGreen, saturationDegree): '''Computes the uniform delay''' return 0.5*cycleLength*(1-float(effectiveGreen)/cycleLength)**2/(1-float(effectiveGreen*saturationDegree)/cycleLength) def randomDelay(volume, saturationDegree): '''Computes the random delay = queueing time for M/D/1''' return saturationDegree**2/(2*volume*(1-saturationDegree)) def incrementalDelay(T, X, c, k=0.5, I=1): '''Computes the incremental delay (HCM) T in hours c capacity of the lane group k default for fixed time signal I=1 for isolated intersection (Poisson arrival)''' from math import sqrt return 900*T*(X - 1 + sqrt((X - 1)**2 + 8*k*I*X/(c*T))) ######################### # misc ######################### def timeChangingSpeed(v0, vf, a, TPR): 'for decelerations, a < 0' return TPR-(vf-v0)/a def distanceChangingSpeed(v0, vf, a, TPR): 'for decelerations, a < 0' return TPR*v0+(vf**2-v0**2)/(2*a)