comparison trafficintelligence/traffic_engineering.py @ 1028:cc5cb04b04b0

major update using the trafficintelligence package name and install through pip
author Nicolas Saunier <nicolas.saunier@polymtl.ca>
date Fri, 15 Jun 2018 11:19:10 -0400
parents python/traffic_engineering.py@2cd1ce245024
children c6cf75a2ed08
comparison
equal deleted inserted replaced
1027:6129296848d3 1028:cc5cb04b04b0
1 #! /usr/bin/env python
2 ''' Traffic Engineering Tools and Examples'''
3
4 from trafficintelligence import prediction
5
6 from math import ceil
7
8
9 #########################
10 # Simulation
11 #########################
12
13 def generateTimeHeadways(meanTimeHeadway, simulationTime):
14 '''Generates the time headways between arrivals
15 given the meanTimeHeadway and the negative exponential distribution
16 over a time interval of length simulationTime (assumed to be in same time unit as headway'''
17 from random import expovariate
18 headways = []
19 totalTime = 0
20 flow = 1/meanTimeHeadway
21 while totalTime < simulationTime:
22 h = expovariate(flow)
23 headways.append(h)
24 totalTime += h
25 return headways
26
27 class RoadUser(object):
28 '''Simple example of inheritance to plot different road users '''
29 def __init__(self, position, velocity):
30 'Both fields are 2D numpy arrays'
31 self.position = position.astype(float)
32 self.velocity = velocity.astype(float)
33
34 def move(self, deltaT):
35 self.position += deltaT*self.velocity
36
37 def draw(self, init = False):
38 from matplotlib.pyplot import plot
39 if init:
40 self.plotLine = plot(self.position[0], self.position[1], self.getDescriptor())[0]
41 else:
42 self.plotLine.set_data(self.position[0], self.position[1])
43
44
45 class PassengerVehicle(RoadUser):
46 def getDescriptor(self):
47 return 'dr'
48
49 class Pedestrian(RoadUser):
50 def getDescriptor(self):
51 return 'xb'
52
53 class Cyclist(RoadUser):
54 def getDescriptor(self):
55 return 'og'
56
57 #########################
58 # queueing models
59 #########################
60
61 class CapacityReduction(object):
62 def __init__(self, beta, reductionDuration, demandCapacityRatio = None, demand = None, capacity = None):
63 '''reduction duration should be positive
64 demandCapacityRatio is demand/capacity (q/s)'''
65 if demandCapacityRatio is None and demand is None and capacity is None:
66 print('Missing too much information (demand, capacity and ratio)')
67 import sys
68 sys.exit()
69 if 0 <= beta < 1:
70 self.beta = beta
71 self.reductionDuration = reductionDuration
72
73 if demandCapacityRatio is not None:
74 self.demandCapacityRatio = demandCapacityRatio
75 if demand is not None:
76 self.demand = demand
77 if capacity is not None:
78 self.capacity = capacity
79 if capacity is not None and demand is not None:
80 self.demandCapacityRatio = float(self.demand)/self.capacity
81 if demand <= beta*capacity:
82 print('There is no queueing as the demand {} is inferior to the reduced capacity {}'.format(demand, beta*capacity))
83 else:
84 print('reduction coefficient (beta={}) is not in [0, 1['.format(beta))
85
86 def queueingDuration(self):
87 return self.reductionDuration*(1-self.beta)/(1-self.demandCapacityRatio)
88
89 def nArrived(self, t):
90 if self.demand is None:
91 print('Missing demand field')
92 return None
93 return self.demand*t
94
95 def nServed(self, t):
96 if self.capacity is None:
97 print('Missing capacity field')
98 return None
99 if 0<=t<=self.reductionDuration:
100 return self.beta*self.capacity*t
101 elif self.reductionDuration < t <= self.queueingDuration():
102 return self.beta*self.capacity*self.reductionDuration+self.capacity*(t-self.reductionDuration)
103
104 def nQueued(self, t):
105 return self.nArrived(t)-self.nServed(t)
106
107 def maxNQueued(self):
108 return self.nQueued(self.reductionDuration)
109
110 def totalDelay(self):
111 if self.capacity is None:
112 print('Missing capacity field')
113 return None
114 return self.capacity*self.reductionDuration**2*(1-self.beta)*(self.demandCapacityRatio-self.beta)/(2*(1-self.demandCapacityRatio))
115
116 def averageDelay(self):
117 return self.reductionDuration*(self.demandCapacityRatio-self.beta)/(2*self.demandCapacityRatio)
118
119 def averageNQueued(self):
120 return self.totalDelay()/self.queueingDuration()
121
122
123 #########################
124 # fundamental diagram
125 #########################
126
127 class FundamentalDiagram(object):
128 ''' '''
129 def __init__(self, name):
130 self.name = name
131
132 def q(self, k):
133 return k*self.v(k)
134
135 @staticmethod
136 def meanHeadway(k):
137 return 1/k
138
139 @staticmethod
140 def meanSpacing(q):
141 return 1/q
142
143 def plotVK(self, language='fr', units={}):
144 from numpy import arange
145 from matplotlib.pyplot import figure,plot,xlabel,ylabel
146 densities = [k for k in arange(1, self.kj+1)]
147 figure()
148 plot(densities, [self.v(k) for k in densities])
149 xlabel('Densite (veh/km)') # todo other languages and adapt to units
150 ylabel('Vitesse (km/h)')
151
152 def plotQK(self, language='fr', units={}):
153 from numpy import arange
154 from matplotlib.pyplot import figure,plot,xlabel,ylabel
155 densities = [k for k in arange(1, self.kj+1)]
156 figure()
157 plot(densities, [self.q(k) for k in densities])
158 xlabel('Densite (veh/km)') # todo other languages and adapt to units
159 ylabel('Debit (km/h)')
160
161 class GreenbergFD(FundamentalDiagram):
162 '''Speed is the logarithm of density'''
163 def __init__(self, vc, kj):
164 FundamentalDiagram.__init__(self,'Greenberg')
165 self.vc=vc
166 self.kj=kj
167
168 def v(self,k):
169 from numpy import log
170 return self.vc*log(self.kj/k)
171
172 def criticalDensity(self):
173 from numpy import e
174 self.kc = self.kj/e
175 return self.kc
176
177 def capacity(self):
178 self.qmax = self.kc*self.vc
179 return self.qmax
180
181 #########################
182 # intersection
183 #########################
184
185 class FourWayIntersection(object):
186 '''Simple class for simple intersection outline'''
187 def __init__(self, dimension, coordX, coordY):
188 self.dimension = dimension
189 self.coordX = coordX
190 self.coordY = coordY
191
192 def plot(self, options = 'k'):
193 from matplotlib.pyplot import plot, axis
194
195 minX = min(self.dimension[0])
196 maxX = max(self.dimension[0])
197 minY = min(self.dimension[1])
198 maxY = max(self.dimension[1])
199
200 plot([minX, self.coordX[0], self.coordX[0]], [self.coordY[0], self.coordY[0], minY],options)
201 plot([self.coordX[1], self.coordX[1], maxX], [minY, self.coordY[0], self.coordY[0]],options)
202 plot([minX, self.coordX[0], self.coordX[0]], [self.coordY[1], self.coordY[1], maxY],options)
203 plot([self.coordX[1], self.coordX[1], maxX], [maxY, self.coordY[1], self.coordY[1]],options)
204 axis('equal')
205
206 #########################
207 # traffic signals
208 #########################
209
210 class Volume(object):
211 '''Class to represent volumes with varied vehicule types '''
212 def __init__(self, volume, types = ['pc'], proportions = [1], equivalents = [1], nLanes = 1):
213 '''mvtEquivalent is the equivalent if the movement is right of left turn'''
214
215 # check the sizes of the lists
216 if sum(proportions) == 1:
217 self.volume = volume
218 self.types = types
219 self.proportions = proportions
220 self.equivalents = equivalents
221 self.nLanes = nLanes
222 else:
223 print('Proportions do not sum to 1')
224 pass
225
226 def checkProtected(self, opposedThroughMvt):
227 '''Checks if this left movement should be protected,
228 ie if one of the main two conditions on left turn is verified'''
229 return self.volume >= 200 or self.volume*opposedThroughMvt.volume/opposedThroughMvt.nLanes > 50000
230
231 def getPCUVolume(self):
232 '''Returns the passenger-car equivalent for the input volume'''
233 v = 0
234 for p, e in zip(self.proportions, self.equivalents):
235 v += p*e
236 return v*self.volume
237
238 class IntersectionMovement(object):
239 '''Represents an intersection movement
240 with a volume, a type (through, left or right)
241 and an equivalent for movement type'''
242 def __init__(self, volume, mvtEquivalent = 1):
243 self.volume = volume
244 self.mvtEquivalent = mvtEquivalent
245
246 def getTVUVolume(self):
247 return self.mvtEquivalent*self.volume.getPCUVolume()
248
249 class LaneGroup(object):
250 '''Class that represents a group of mouvements'''
251
252 def __init__(self, movements, nLanes):
253 self.movements = movements
254 self.nLanes = nLanes
255
256 def getTVUVolume(self):
257 return sum([mvt.getTVUVolume() for mvt in self.movements])
258
259 def getCharge(self, saturationVolume):
260 return self.getTVUVolume()/(self.nLanes*saturationVolume)
261
262 def optimalCycle(lostTime, criticalCharge):
263 return (1.5*lostTime+5)/(1-criticalCharge)
264
265 def minimumCycle(lostTime, criticalCharge, degreeSaturation=1.):
266 'degree of saturation can be used as the peak hour factor too'
267 return lostTime/(1-criticalCharge/degreeSaturation)
268
269 class Cycle(object):
270 '''Class to compute optimal cycle and the split of effective green times'''
271 def __init__(self, phases, lostTime, saturationVolume):
272 '''phases is a list of phases
273 a phase is a list of lanegroups'''
274 self.phases = phases
275 self.lostTime = lostTime
276 self.saturationVolume = saturationVolume
277
278 def computeCriticalCharges(self):
279 self.criticalCharges = [max([lg.getCharge(self.saturationVolume) for lg in phase]) for phase in self.phases]
280 self.criticalCharge = sum(self.criticalCharges)
281
282 def computeOptimalCycle(self):
283 self.computeCriticalCharges()
284 self.C = optimalCycle(self.lostTime, self.criticalCharge)
285 return self.C
286
287 def computeMinimumCycle(self, degreeSaturation=1.):
288 self.computeCriticalCharges()
289 self.C = minimumCycle(self.lostTime, self.criticalCharge, degreeSaturation)
290 return self.C
291
292 def computeEffectiveGreen(self):
293 #from numpy import round
294 #self.computeCycle() # in case it was not done before
295 effectiveGreenTime = self.C-self.lostTime
296 self.effectiveGreens = [round(c*effectiveGreenTime/self.criticalCharge,1) for c in self.criticalCharges]
297 return self.effectiveGreens
298
299
300 def computeInterGreen(perceptionReactionTime, initialSpeed, intersectionLength, vehicleAverageLength = 6, deceleration = 3):
301 '''Computes the intergreen time (yellow/amber plus all red time)
302 Deceleration is positive
303 All variables should be in the same units'''
304 if deceleration > 0:
305 return [perceptionReactionTime+float(initialSpeed)/(2*deceleration), float(intersectionLength+vehicleAverageLength)/initialSpeed]
306 else:
307 print('Issue deceleration should be strictly positive')
308 return None
309
310 def uniformDelay(cycleLength, effectiveGreen, saturationDegree):
311 '''Computes the uniform delay'''
312 return 0.5*cycleLength*(1-float(effectiveGreen)/cycleLength)**2/(1-float(effectiveGreen*saturationDegree)/cycleLength)
313
314 def randomDelay(volume, saturationDegree):
315 '''Computes the random delay = queueing time for M/D/1'''
316 return saturationDegree**2/(2*volume*(1-saturationDegree))
317
318 def incrementalDelay(T, X, c, k=0.5, I=1):
319 '''Computes the incremental delay (HCM)
320 T in hours
321 c capacity of the lane group
322 k default for fixed time signal
323 I=1 for isolated intersection (Poisson arrival)'''
324 from math import sqrt
325 return 900*T*(X - 1 + sqrt((X - 1)**2 + 8*k*I*X/(c*T)))
326
327 #########################
328 # misc
329 #########################
330
331 def timeChangingSpeed(v0, vf, a, TPR):
332 'for decelerations, a < 0'
333 return TPR-(vf-v0)/a
334
335 def distanceChangingSpeed(v0, vf, a, TPR):
336 'for decelerations, a < 0'
337 return TPR*v0-(vf**2-v0**2)/(2*a)