Mercurial Hosting > traffic-intelligence
view python/traffic_engineering.py @ 300:f65b828e5521
working on trajectory simulation
author | Nicolas Saunier <nicolas.saunier@polymtl.ca> |
---|---|
date | Wed, 13 Feb 2013 18:26:49 -0500 |
parents | 82b4101d9a2f |
children | 9d88a4d97473 |
line wrap: on
line source
#! /usr/bin/env python ''' Traffic Engineering Tools and Examples''' from math import ceil __metaclass__ = type ######################### # Simulation ######################### class Vehicle: 'Generic vehicle class 1D coordinates for now' class PredictedTrajectory1D(prediction.PredictedTrajectory): def __init__(self, initialPosition, initialSpeed, control, maxSpeed = None): self.control = control self.maxSpeed = maxSpeed self.probability = None self.predictedPositions = {0: moving.Point(initialPosition)} self.predictedSpeedOrientations = {0: moving.NormAngle(initialSpeed, 0)} def setAcceleration(self, acceleration): self.control = moving.NormAngle(acceleration, 0) def getControl(self): return self.control def __init__(self, initialPosition = 0, initialSpeed = 0, acceleration = 0, prt = 2.5, leader = None): self.positions = PredictedTrajectory1D(initialPosition, initialSpeed) self.prt = prt self.leader = leader # todo add microModel (Treiber) def setAcceleration(self, acceleration): self.positions.setAcceleration(acceleration) def updatePosition(self, dt): # knowledge of time outside of vehicle ?? speed = self.speed self.speed += self.acceleration*dt self.position += speed*dt if self.log: self.positions.append(self.position) self.speeds.append(self.speed) self.accelerations.append(self.acceleration) def updateAcceleration(self, dt): '''Updates acceleration and speed as a function of leader and other factors''' pass def update(self, dt): self.updatePosition(dt) self.updateAcceleration(dt) # function of leader def printStats(self): print('{0} {1} {2}'.format(self.position, self.speed, self.acceleration)) ######################### # fundamental diagram ######################### class FundamentalDiagram: ''' ''' def __init__(self, name): self.name = name def q(self, k): return k*self.v(k) @staticmethod def meanHeadway(k): return 1/k @staticmethod def meanSpacing(q): return 1/q def plotVK(self, language='fr', units={}): from numpy import arange from matplotlib.pyplot import figure,plot,xlabel,ylabel densities = [k for k in arange(1, self.kj+1)] figure() plot(densities, [self.v(k) for k in densities]) xlabel('Densite (veh/km)') # todo other languages and adapt to units ylabel('Vitesse (km/h)') def plotQK(self, language='fr', units={}): from numpy import arange from matplotlib.pyplot import figure,plot,xlabel,ylabel densities = [k for k in arange(1, self.kj+1)] figure() plot(densities, [self.q(k) for k in densities]) xlabel('Densite (veh/km)') # todo other languages and adapt to units ylabel('Debit (km/h)') class GreenbergFD(FundamentalDiagram): '''Speed is the logarithm of density''' def __init__(self, vc, kj): FundamentalDiagram.__init__(self,'Greenberg') self.vc=vc self.kj=kj def v(self,k): from numpy import log return self.vc*log(self.kj/k) def criticalDensity(self): from numpy import e self.kc = self.kj/e return self.kc def capacity(self): self.qmax = self.kc*self.vc return self.qmax ######################### # intersection ######################### class FourWayIntersection: '''Simple class for simple intersection outline''' def __init__(self, dimension, coordX, coordY): self.dimension = dimension self.coordX = coordX self.coordY = coordY def plot(self, options = 'k'): from matplotlib.pyplot import plot, axis minX = min(self.dimension[0]) maxX = max(self.dimension[0]) minY = min(self.dimension[1]) maxY = max(self.dimension[1]) plot([minX, self.coordX[0], self.coordX[0]], [self.coordY[0], self.coordY[0], minY],options) plot([self.coordX[1], self.coordX[1], maxX], [minY, self.coordY[0], self.coordY[0]],options) plot([minX, self.coordX[0], self.coordX[0]], [self.coordY[1], self.coordY[1], maxY],options) plot([self.coordX[1], self.coordX[1], maxX], [maxY, self.coordY[1], self.coordY[1]],options) axis('equal') ######################### # traffic signals ######################### class Volume: '''Class to represent volumes with varied vehicule types ''' def __init__(self, volume, types = ['pc'], proportions = [1], equivalents = [1], nLanes = 1): '''mvtEquivalent is the equivalent if the movement is right of left turn''' # check the sizes of the lists if sum(proportions) == 1: self.volume = volume self.types = types self.proportions = proportions self.equivalents = equivalents self.nLanes = nLanes # unused else: pass def getPCUVolume(self): '''Returns the passenger-car equivalent for the input volume''' v = 0 for p, e in zip(self.proportions, self.equivalents): v += p*e return v*self.volume class IntersectionMovement: '''Represents an intersection movement with a volume, a type (through, left or right) and an equivalent for movement type''' def __init__(self, volume, mvtEquivalent = 1): self.volume = volume self.mvtEquivalent = mvtEquivalent def getTVUVolume(self): return self.mvtEquivalent*self.volume.getPCUVolume() class IntersectionApproach: # should probably not be used def __init__(self, leftTurnVolume, throughVolume, rightTurnVolume): self.leftTurnVolume = leftTurnVolume self.throughVolume = throughVolume self.rightTurnVolume = rightTurnVolume def getTVUVolume(self, leftTurnEquivalent = 1, throughEquivalent = 1, rightTurnEquivalent = 1): return self.leftTurnVolume.getPCUVolume()*leftTurnEquivalent+self.throughVolume.getPCUVolume()*throughEquivalent+self.rightTurnVolume.getPCUVolume()*rightTurnEquivalent class LaneGroup: '''Class that represents a group of mouvements''' def __init__(self, movements, nLanes): self.movements = movements self.nLanes = nLanes def getTVUVolume(self): return sum([mvt.getTVUVolume() for mvt in self.movements]) def getCharge(self, saturationVolume): return self.getTVUVolume()/(self.nLanes*saturationVolume) def checkProtectedLeftTurn(leftMvt, opposedThroughMvt): '''Checks if one of the main two conditions on left turn is verified The lane groups should contain left and through movement''' return leftMvt.volume >= 200 or leftMvt.volume*opposedThroughMvt.volume/opposedThroughMvt.nLanes > 50000 def optimalCycle(lostTime, criticalCharge): return (1.5*lostTime+5)/(1-criticalCharge) def minimumCycle(lostTime, criticalCharge, degreeSaturation=1.): 'degree of saturation can be used as the peak hour factor too' return lostTime/(1-criticalCharge/degreeSaturation) class Cycle: '''Class to compute optimal cycle and the split of effective green times''' def __init__(self, phases, lostTime, saturationVolume): '''phases is a list of phases a phase is a list of lanegroups''' self.phases = phases self.lostTime = lostTime self.saturationVolume = saturationVolume def computeCriticalCharges(self): self.criticalCharges = [] for phase in self.phases: self.criticalCharges.append(max([lg.getCharge(self.saturationVolume) for lg in phase])) self.criticalCharge = sum(self.criticalCharges) def computeOptimalCycle(self): self.computeCriticalCharges() self.C = optimalCycle(self.lostTime, self.criticalCharge) return self.C def computeMinimumCycle(self, degreeSaturation=1.): self.computeCriticalCharges() self.C = minimumCycle(self.lostTime, self.criticalCharge, degreeSaturation) return self.C def computeEffectiveGreen(self): from numpy import round #self.computeCycle() # in case it was not done before effectiveGreenTime = self.C-self.lostTime self.effectiveGreens = [round(c*effectiveGreenTime/self.criticalCharge,1) for c in self.criticalCharges] return self.effectiveGreens def computeInterGreen(perceptionReactionTime, initialSpeed, intersectionLength, vehicleAverageLength = 6, deceleration = 3): '''Computes the intergreen time (yellow/amber plus all red time) Deceleration is positive All variables should be in the same units''' if deceleration > 0: return [perceptionReactionTime+float(initialSpeed)/(2*deceleration), float(intersectionLength+vehicleAverageLength)/initialSpeed] else: print 'Issue deceleration should be strictly positive' return None def uniformDelay(cycleLength, effectiveGreen, saturationDegree): '''Computes the uniform delay''' return 0.5*cycleLength*(1-float(effectiveGreen)/cycleLength)/(1-float(effectiveGreen*saturationDegree)/cycleLength) ######################### # misc ######################### def timeChangingSpeed(v0, vf, a, TPR): return TPR+(vf-v0)/a def distanceChangingSpeed(v0, vf, a, TPR): return TPR*v0+(vf*vf-v0*v0)/(2*a)