Mercurial Hosting > traffic-intelligence
view python/traffic_engineering.py @ 398:3399bd48cb40
Ajout d'une méthode pour obtenir le nombre de FPS
Méthode de capture des trames vidéos plus résistante aux erreur
Utilisation d'un dictionnaire pour les fichier de configuration afin de garder le nom des sections
author | Jean-Philippe Jodoin <jpjodoin@gmail.com> |
---|---|
date | Mon, 29 Jul 2013 13:46:07 -0400 |
parents | 539e2b4cfaa3 |
children | 7828fec8bbd2 |
line wrap: on
line source
#! /usr/bin/env python ''' Traffic Engineering Tools and Examples''' from math import ceil import prediction __metaclass__ = type ######################### # Simulation ######################### class Vehicle: '''Generic vehicle class 1D coordinates for now''' class PredictedTrajectory1D(prediction.PredictedTrajectory): def __init__(self, initialPosition, initialSpeed, control, maxSpeed = None): self.control = control self.maxSpeed = maxSpeed self.probability = None self.predictedPositions = {0: moving.Point(initialPosition)} self.predictedSpeedOrientations = {0: moving.NormAngle(initialSpeed, 0)} def setAcceleration(self, acceleration): self.control = moving.NormAngle(acceleration, 0) def getControl(self): return self.control def __init__(self, initialPosition = 0, initialSpeed = 0, acceleration = 0, prt = 2.5, leader = None): self.positions = PredictedTrajectory1D(initialPosition, initialSpeed) self.prt = prt self.leader = leader # todo add microModel (Treiber) def setAcceleration(self, acceleration): self.positions.setAcceleration(acceleration) def updatePosition(self, dt): # knowledge of time outside of vehicle ?? speed = self.speed self.speed += self.acceleration*dt self.position += speed*dt if self.log: self.positions.append(self.position) self.speeds.append(self.speed) self.accelerations.append(self.acceleration) def updateAcceleration(self, dt): '''Updates acceleration and speed as a function of leader and other factors''' pass def update(self, dt): self.updatePosition(dt) self.updateAcceleration(dt) # function of leader def printStats(self): print('{0} {1} {2}'.format(self.position, self.speed, self.acceleration)) ######################### # fundamental diagram ######################### class FundamentalDiagram: ''' ''' def __init__(self, name): self.name = name def q(self, k): return k*self.v(k) @staticmethod def meanHeadway(k): return 1/k @staticmethod def meanSpacing(q): return 1/q def plotVK(self, language='fr', units={}): from numpy import arange from matplotlib.pyplot import figure,plot,xlabel,ylabel densities = [k for k in arange(1, self.kj+1)] figure() plot(densities, [self.v(k) for k in densities]) xlabel('Densite (veh/km)') # todo other languages and adapt to units ylabel('Vitesse (km/h)') def plotQK(self, language='fr', units={}): from numpy import arange from matplotlib.pyplot import figure,plot,xlabel,ylabel densities = [k for k in arange(1, self.kj+1)] figure() plot(densities, [self.q(k) for k in densities]) xlabel('Densite (veh/km)') # todo other languages and adapt to units ylabel('Debit (km/h)') class GreenbergFD(FundamentalDiagram): '''Speed is the logarithm of density''' def __init__(self, vc, kj): FundamentalDiagram.__init__(self,'Greenberg') self.vc=vc self.kj=kj def v(self,k): from numpy import log return self.vc*log(self.kj/k) def criticalDensity(self): from numpy import e self.kc = self.kj/e return self.kc def capacity(self): self.qmax = self.kc*self.vc return self.qmax ######################### # intersection ######################### class FourWayIntersection: '''Simple class for simple intersection outline''' def __init__(self, dimension, coordX, coordY): self.dimension = dimension self.coordX = coordX self.coordY = coordY def plot(self, options = 'k'): from matplotlib.pyplot import plot, axis minX = min(self.dimension[0]) maxX = max(self.dimension[0]) minY = min(self.dimension[1]) maxY = max(self.dimension[1]) plot([minX, self.coordX[0], self.coordX[0]], [self.coordY[0], self.coordY[0], minY],options) plot([self.coordX[1], self.coordX[1], maxX], [minY, self.coordY[0], self.coordY[0]],options) plot([minX, self.coordX[0], self.coordX[0]], [self.coordY[1], self.coordY[1], maxY],options) plot([self.coordX[1], self.coordX[1], maxX], [maxY, self.coordY[1], self.coordY[1]],options) axis('equal') ######################### # traffic signals ######################### class Volume: '''Class to represent volumes with varied vehicule types ''' def __init__(self, volume, types = ['pc'], proportions = [1], equivalents = [1], nLanes = 1): '''mvtEquivalent is the equivalent if the movement is right of left turn''' # check the sizes of the lists if sum(proportions) == 1: self.volume = volume self.types = types self.proportions = proportions self.equivalents = equivalents self.nLanes = nLanes else: print('Proportions do not sum to 1') pass def checkProtected(self, opposedThroughMvt): '''Checks if this left movement should be protected, ie if one of the main two conditions on left turn is verified''' return self.volume >= 200 or self.volume*opposedThroughMvt.volume/opposedThroughMvt.nLanes > 50000 def getPCUVolume(self): '''Returns the passenger-car equivalent for the input volume''' v = 0 for p, e in zip(self.proportions, self.equivalents): v += p*e return v*self.volume class IntersectionMovement: '''Represents an intersection movement with a volume, a type (through, left or right) and an equivalent for movement type''' def __init__(self, volume, mvtEquivalent = 1): self.volume = volume self.mvtEquivalent = mvtEquivalent def getTVUVolume(self): return self.mvtEquivalent*self.volume.getPCUVolume() class LaneGroup: '''Class that represents a group of mouvements''' def __init__(self, movements, nLanes): self.movements = movements self.nLanes = nLanes def getTVUVolume(self): return sum([mvt.getTVUVolume() for mvt in self.movements]) def getCharge(self, saturationVolume): return self.getTVUVolume()/(self.nLanes*saturationVolume) def optimalCycle(lostTime, criticalCharge): return (1.5*lostTime+5)/(1-criticalCharge) def minimumCycle(lostTime, criticalCharge, degreeSaturation=1.): 'degree of saturation can be used as the peak hour factor too' return lostTime/(1-criticalCharge/degreeSaturation) class Cycle: '''Class to compute optimal cycle and the split of effective green times''' def __init__(self, phases, lostTime, saturationVolume): '''phases is a list of phases a phase is a list of lanegroups''' self.phases = phases self.lostTime = lostTime self.saturationVolume = saturationVolume def computeCriticalCharges(self): self.criticalCharges = [max([lg.getCharge(self.saturationVolume) for lg in phase]) for phase in self.phases] self.criticalCharge = sum(self.criticalCharges) def computeOptimalCycle(self): self.computeCriticalCharges() self.C = optimalCycle(self.lostTime, self.criticalCharge) return self.C def computeMinimumCycle(self, degreeSaturation=1.): self.computeCriticalCharges() self.C = minimumCycle(self.lostTime, self.criticalCharge, degreeSaturation) return self.C def computeEffectiveGreen(self): #from numpy import round #self.computeCycle() # in case it was not done before effectiveGreenTime = self.C-self.lostTime self.effectiveGreens = [round(c*effectiveGreenTime/self.criticalCharge,1) for c in self.criticalCharges] return self.effectiveGreens def computeInterGreen(perceptionReactionTime, initialSpeed, intersectionLength, vehicleAverageLength = 6, deceleration = 3): '''Computes the intergreen time (yellow/amber plus all red time) Deceleration is positive All variables should be in the same units''' if deceleration > 0: return [perceptionReactionTime+float(initialSpeed)/(2*deceleration), float(intersectionLength+vehicleAverageLength)/initialSpeed] else: print 'Issue deceleration should be strictly positive' return None def uniformDelay(cycleLength, effectiveGreen, saturationDegree): '''Computes the uniform delay''' return 0.5*cycleLength*(1-float(effectiveGreen)/cycleLength)/(1-float(effectiveGreen*saturationDegree)/cycleLength) ######################### # misc ######################### def timeChangingSpeed(v0, vf, a, TPR): return TPR+(vf-v0)/a def distanceChangingSpeed(v0, vf, a, TPR): return TPR*v0+(vf*vf-v0*v0)/(2*a)