Mercurial Hosting > traffic-intelligence
comparison trafficintelligence/traffic_engineering.py @ 1028:cc5cb04b04b0
major update using the trafficintelligence package name and install through pip
author | Nicolas Saunier <nicolas.saunier@polymtl.ca> |
---|---|
date | Fri, 15 Jun 2018 11:19:10 -0400 |
parents | python/traffic_engineering.py@2cd1ce245024 |
children | c6cf75a2ed08 |
comparison
equal
deleted
inserted
replaced
1027:6129296848d3 | 1028:cc5cb04b04b0 |
---|---|
1 #! /usr/bin/env python | |
2 ''' Traffic Engineering Tools and Examples''' | |
3 | |
4 from trafficintelligence import prediction | |
5 | |
6 from math import ceil | |
7 | |
8 | |
9 ######################### | |
10 # Simulation | |
11 ######################### | |
12 | |
13 def generateTimeHeadways(meanTimeHeadway, simulationTime): | |
14 '''Generates the time headways between arrivals | |
15 given the meanTimeHeadway and the negative exponential distribution | |
16 over a time interval of length simulationTime (assumed to be in same time unit as headway''' | |
17 from random import expovariate | |
18 headways = [] | |
19 totalTime = 0 | |
20 flow = 1/meanTimeHeadway | |
21 while totalTime < simulationTime: | |
22 h = expovariate(flow) | |
23 headways.append(h) | |
24 totalTime += h | |
25 return headways | |
26 | |
27 class RoadUser(object): | |
28 '''Simple example of inheritance to plot different road users ''' | |
29 def __init__(self, position, velocity): | |
30 'Both fields are 2D numpy arrays' | |
31 self.position = position.astype(float) | |
32 self.velocity = velocity.astype(float) | |
33 | |
34 def move(self, deltaT): | |
35 self.position += deltaT*self.velocity | |
36 | |
37 def draw(self, init = False): | |
38 from matplotlib.pyplot import plot | |
39 if init: | |
40 self.plotLine = plot(self.position[0], self.position[1], self.getDescriptor())[0] | |
41 else: | |
42 self.plotLine.set_data(self.position[0], self.position[1]) | |
43 | |
44 | |
45 class PassengerVehicle(RoadUser): | |
46 def getDescriptor(self): | |
47 return 'dr' | |
48 | |
49 class Pedestrian(RoadUser): | |
50 def getDescriptor(self): | |
51 return 'xb' | |
52 | |
53 class Cyclist(RoadUser): | |
54 def getDescriptor(self): | |
55 return 'og' | |
56 | |
57 ######################### | |
58 # queueing models | |
59 ######################### | |
60 | |
61 class CapacityReduction(object): | |
62 def __init__(self, beta, reductionDuration, demandCapacityRatio = None, demand = None, capacity = None): | |
63 '''reduction duration should be positive | |
64 demandCapacityRatio is demand/capacity (q/s)''' | |
65 if demandCapacityRatio is None and demand is None and capacity is None: | |
66 print('Missing too much information (demand, capacity and ratio)') | |
67 import sys | |
68 sys.exit() | |
69 if 0 <= beta < 1: | |
70 self.beta = beta | |
71 self.reductionDuration = reductionDuration | |
72 | |
73 if demandCapacityRatio is not None: | |
74 self.demandCapacityRatio = demandCapacityRatio | |
75 if demand is not None: | |
76 self.demand = demand | |
77 if capacity is not None: | |
78 self.capacity = capacity | |
79 if capacity is not None and demand is not None: | |
80 self.demandCapacityRatio = float(self.demand)/self.capacity | |
81 if demand <= beta*capacity: | |
82 print('There is no queueing as the demand {} is inferior to the reduced capacity {}'.format(demand, beta*capacity)) | |
83 else: | |
84 print('reduction coefficient (beta={}) is not in [0, 1['.format(beta)) | |
85 | |
86 def queueingDuration(self): | |
87 return self.reductionDuration*(1-self.beta)/(1-self.demandCapacityRatio) | |
88 | |
89 def nArrived(self, t): | |
90 if self.demand is None: | |
91 print('Missing demand field') | |
92 return None | |
93 return self.demand*t | |
94 | |
95 def nServed(self, t): | |
96 if self.capacity is None: | |
97 print('Missing capacity field') | |
98 return None | |
99 if 0<=t<=self.reductionDuration: | |
100 return self.beta*self.capacity*t | |
101 elif self.reductionDuration < t <= self.queueingDuration(): | |
102 return self.beta*self.capacity*self.reductionDuration+self.capacity*(t-self.reductionDuration) | |
103 | |
104 def nQueued(self, t): | |
105 return self.nArrived(t)-self.nServed(t) | |
106 | |
107 def maxNQueued(self): | |
108 return self.nQueued(self.reductionDuration) | |
109 | |
110 def totalDelay(self): | |
111 if self.capacity is None: | |
112 print('Missing capacity field') | |
113 return None | |
114 return self.capacity*self.reductionDuration**2*(1-self.beta)*(self.demandCapacityRatio-self.beta)/(2*(1-self.demandCapacityRatio)) | |
115 | |
116 def averageDelay(self): | |
117 return self.reductionDuration*(self.demandCapacityRatio-self.beta)/(2*self.demandCapacityRatio) | |
118 | |
119 def averageNQueued(self): | |
120 return self.totalDelay()/self.queueingDuration() | |
121 | |
122 | |
123 ######################### | |
124 # fundamental diagram | |
125 ######################### | |
126 | |
127 class FundamentalDiagram(object): | |
128 ''' ''' | |
129 def __init__(self, name): | |
130 self.name = name | |
131 | |
132 def q(self, k): | |
133 return k*self.v(k) | |
134 | |
135 @staticmethod | |
136 def meanHeadway(k): | |
137 return 1/k | |
138 | |
139 @staticmethod | |
140 def meanSpacing(q): | |
141 return 1/q | |
142 | |
143 def plotVK(self, language='fr', units={}): | |
144 from numpy import arange | |
145 from matplotlib.pyplot import figure,plot,xlabel,ylabel | |
146 densities = [k for k in arange(1, self.kj+1)] | |
147 figure() | |
148 plot(densities, [self.v(k) for k in densities]) | |
149 xlabel('Densite (veh/km)') # todo other languages and adapt to units | |
150 ylabel('Vitesse (km/h)') | |
151 | |
152 def plotQK(self, language='fr', units={}): | |
153 from numpy import arange | |
154 from matplotlib.pyplot import figure,plot,xlabel,ylabel | |
155 densities = [k for k in arange(1, self.kj+1)] | |
156 figure() | |
157 plot(densities, [self.q(k) for k in densities]) | |
158 xlabel('Densite (veh/km)') # todo other languages and adapt to units | |
159 ylabel('Debit (km/h)') | |
160 | |
161 class GreenbergFD(FundamentalDiagram): | |
162 '''Speed is the logarithm of density''' | |
163 def __init__(self, vc, kj): | |
164 FundamentalDiagram.__init__(self,'Greenberg') | |
165 self.vc=vc | |
166 self.kj=kj | |
167 | |
168 def v(self,k): | |
169 from numpy import log | |
170 return self.vc*log(self.kj/k) | |
171 | |
172 def criticalDensity(self): | |
173 from numpy import e | |
174 self.kc = self.kj/e | |
175 return self.kc | |
176 | |
177 def capacity(self): | |
178 self.qmax = self.kc*self.vc | |
179 return self.qmax | |
180 | |
181 ######################### | |
182 # intersection | |
183 ######################### | |
184 | |
185 class FourWayIntersection(object): | |
186 '''Simple class for simple intersection outline''' | |
187 def __init__(self, dimension, coordX, coordY): | |
188 self.dimension = dimension | |
189 self.coordX = coordX | |
190 self.coordY = coordY | |
191 | |
192 def plot(self, options = 'k'): | |
193 from matplotlib.pyplot import plot, axis | |
194 | |
195 minX = min(self.dimension[0]) | |
196 maxX = max(self.dimension[0]) | |
197 minY = min(self.dimension[1]) | |
198 maxY = max(self.dimension[1]) | |
199 | |
200 plot([minX, self.coordX[0], self.coordX[0]], [self.coordY[0], self.coordY[0], minY],options) | |
201 plot([self.coordX[1], self.coordX[1], maxX], [minY, self.coordY[0], self.coordY[0]],options) | |
202 plot([minX, self.coordX[0], self.coordX[0]], [self.coordY[1], self.coordY[1], maxY],options) | |
203 plot([self.coordX[1], self.coordX[1], maxX], [maxY, self.coordY[1], self.coordY[1]],options) | |
204 axis('equal') | |
205 | |
206 ######################### | |
207 # traffic signals | |
208 ######################### | |
209 | |
210 class Volume(object): | |
211 '''Class to represent volumes with varied vehicule types ''' | |
212 def __init__(self, volume, types = ['pc'], proportions = [1], equivalents = [1], nLanes = 1): | |
213 '''mvtEquivalent is the equivalent if the movement is right of left turn''' | |
214 | |
215 # check the sizes of the lists | |
216 if sum(proportions) == 1: | |
217 self.volume = volume | |
218 self.types = types | |
219 self.proportions = proportions | |
220 self.equivalents = equivalents | |
221 self.nLanes = nLanes | |
222 else: | |
223 print('Proportions do not sum to 1') | |
224 pass | |
225 | |
226 def checkProtected(self, opposedThroughMvt): | |
227 '''Checks if this left movement should be protected, | |
228 ie if one of the main two conditions on left turn is verified''' | |
229 return self.volume >= 200 or self.volume*opposedThroughMvt.volume/opposedThroughMvt.nLanes > 50000 | |
230 | |
231 def getPCUVolume(self): | |
232 '''Returns the passenger-car equivalent for the input volume''' | |
233 v = 0 | |
234 for p, e in zip(self.proportions, self.equivalents): | |
235 v += p*e | |
236 return v*self.volume | |
237 | |
238 class IntersectionMovement(object): | |
239 '''Represents an intersection movement | |
240 with a volume, a type (through, left or right) | |
241 and an equivalent for movement type''' | |
242 def __init__(self, volume, mvtEquivalent = 1): | |
243 self.volume = volume | |
244 self.mvtEquivalent = mvtEquivalent | |
245 | |
246 def getTVUVolume(self): | |
247 return self.mvtEquivalent*self.volume.getPCUVolume() | |
248 | |
249 class LaneGroup(object): | |
250 '''Class that represents a group of mouvements''' | |
251 | |
252 def __init__(self, movements, nLanes): | |
253 self.movements = movements | |
254 self.nLanes = nLanes | |
255 | |
256 def getTVUVolume(self): | |
257 return sum([mvt.getTVUVolume() for mvt in self.movements]) | |
258 | |
259 def getCharge(self, saturationVolume): | |
260 return self.getTVUVolume()/(self.nLanes*saturationVolume) | |
261 | |
262 def optimalCycle(lostTime, criticalCharge): | |
263 return (1.5*lostTime+5)/(1-criticalCharge) | |
264 | |
265 def minimumCycle(lostTime, criticalCharge, degreeSaturation=1.): | |
266 'degree of saturation can be used as the peak hour factor too' | |
267 return lostTime/(1-criticalCharge/degreeSaturation) | |
268 | |
269 class Cycle(object): | |
270 '''Class to compute optimal cycle and the split of effective green times''' | |
271 def __init__(self, phases, lostTime, saturationVolume): | |
272 '''phases is a list of phases | |
273 a phase is a list of lanegroups''' | |
274 self.phases = phases | |
275 self.lostTime = lostTime | |
276 self.saturationVolume = saturationVolume | |
277 | |
278 def computeCriticalCharges(self): | |
279 self.criticalCharges = [max([lg.getCharge(self.saturationVolume) for lg in phase]) for phase in self.phases] | |
280 self.criticalCharge = sum(self.criticalCharges) | |
281 | |
282 def computeOptimalCycle(self): | |
283 self.computeCriticalCharges() | |
284 self.C = optimalCycle(self.lostTime, self.criticalCharge) | |
285 return self.C | |
286 | |
287 def computeMinimumCycle(self, degreeSaturation=1.): | |
288 self.computeCriticalCharges() | |
289 self.C = minimumCycle(self.lostTime, self.criticalCharge, degreeSaturation) | |
290 return self.C | |
291 | |
292 def computeEffectiveGreen(self): | |
293 #from numpy import round | |
294 #self.computeCycle() # in case it was not done before | |
295 effectiveGreenTime = self.C-self.lostTime | |
296 self.effectiveGreens = [round(c*effectiveGreenTime/self.criticalCharge,1) for c in self.criticalCharges] | |
297 return self.effectiveGreens | |
298 | |
299 | |
300 def computeInterGreen(perceptionReactionTime, initialSpeed, intersectionLength, vehicleAverageLength = 6, deceleration = 3): | |
301 '''Computes the intergreen time (yellow/amber plus all red time) | |
302 Deceleration is positive | |
303 All variables should be in the same units''' | |
304 if deceleration > 0: | |
305 return [perceptionReactionTime+float(initialSpeed)/(2*deceleration), float(intersectionLength+vehicleAverageLength)/initialSpeed] | |
306 else: | |
307 print('Issue deceleration should be strictly positive') | |
308 return None | |
309 | |
310 def uniformDelay(cycleLength, effectiveGreen, saturationDegree): | |
311 '''Computes the uniform delay''' | |
312 return 0.5*cycleLength*(1-float(effectiveGreen)/cycleLength)**2/(1-float(effectiveGreen*saturationDegree)/cycleLength) | |
313 | |
314 def randomDelay(volume, saturationDegree): | |
315 '''Computes the random delay = queueing time for M/D/1''' | |
316 return saturationDegree**2/(2*volume*(1-saturationDegree)) | |
317 | |
318 def incrementalDelay(T, X, c, k=0.5, I=1): | |
319 '''Computes the incremental delay (HCM) | |
320 T in hours | |
321 c capacity of the lane group | |
322 k default for fixed time signal | |
323 I=1 for isolated intersection (Poisson arrival)''' | |
324 from math import sqrt | |
325 return 900*T*(X - 1 + sqrt((X - 1)**2 + 8*k*I*X/(c*T))) | |
326 | |
327 ######################### | |
328 # misc | |
329 ######################### | |
330 | |
331 def timeChangingSpeed(v0, vf, a, TPR): | |
332 'for decelerations, a < 0' | |
333 return TPR-(vf-v0)/a | |
334 | |
335 def distanceChangingSpeed(v0, vf, a, TPR): | |
336 'for decelerations, a < 0' | |
337 return TPR*v0-(vf**2-v0**2)/(2*a) |