Mercurial Hosting > traffic-intelligence
comparison python/traffic_engineering.py @ 614:5e09583275a4
Merged Nicolas/trafficintelligence into default
author | Mohamed Gomaa <eng.m.gom3a@gmail.com> |
---|---|
date | Fri, 05 Dec 2014 12:13:53 -0500 |
parents | 850ed17c7b2f |
children | 3b13ec964476 |
comparison
equal
deleted
inserted
replaced
598:11f96bd08552 | 614:5e09583275a4 |
---|---|
9 | 9 |
10 | 10 |
11 ######################### | 11 ######################### |
12 # Simulation | 12 # Simulation |
13 ######################### | 13 ######################### |
14 | |
15 def generateTimeHeadways(meanTimeHeadway, simulationTime): | |
16 '''Generates the time headways between arrivals | |
17 given the meanTimeHeadway and the negative exponential distribution | |
18 over a time interval of length simulationTime (assumed to be in same time unit as headway''' | |
19 from random import expovariate | |
20 headways = [] | |
21 totalTime = 0 | |
22 flow = 1/meanTimeHeadway | |
23 while totalTime < simulationTime: | |
24 h = expovariate(flow) | |
25 headways.append(h) | |
26 totalTime += h | |
27 return headways | |
14 | 28 |
15 class Vehicle: | 29 class Vehicle: |
16 '''Generic vehicle class | 30 '''Generic vehicle class |
17 1D coordinates for now''' | 31 1D coordinates for now''' |
18 class PredictedTrajectory1D(prediction.PredictedTrajectory): | 32 class PredictedTrajectory1D(prediction.PredictedTrajectory): |
157 if sum(proportions) == 1: | 171 if sum(proportions) == 1: |
158 self.volume = volume | 172 self.volume = volume |
159 self.types = types | 173 self.types = types |
160 self.proportions = proportions | 174 self.proportions = proportions |
161 self.equivalents = equivalents | 175 self.equivalents = equivalents |
162 self.nLanes = nLanes # unused | 176 self.nLanes = nLanes |
163 else: | 177 else: |
164 print('Proportions do not sum to 1') | 178 print('Proportions do not sum to 1') |
165 pass | 179 pass |
180 | |
181 def checkProtected(self, opposedThroughMvt): | |
182 '''Checks if this left movement should be protected, | |
183 ie if one of the main two conditions on left turn is verified''' | |
184 return self.volume >= 200 or self.volume*opposedThroughMvt.volume/opposedThroughMvt.nLanes > 50000 | |
166 | 185 |
167 def getPCUVolume(self): | 186 def getPCUVolume(self): |
168 '''Returns the passenger-car equivalent for the input volume''' | 187 '''Returns the passenger-car equivalent for the input volume''' |
169 v = 0 | 188 v = 0 |
170 for p, e in zip(self.proportions, self.equivalents): | 189 for p, e in zip(self.proportions, self.equivalents): |
180 self.mvtEquivalent = mvtEquivalent | 199 self.mvtEquivalent = mvtEquivalent |
181 | 200 |
182 def getTVUVolume(self): | 201 def getTVUVolume(self): |
183 return self.mvtEquivalent*self.volume.getPCUVolume() | 202 return self.mvtEquivalent*self.volume.getPCUVolume() |
184 | 203 |
185 class IntersectionApproach: # should probably not be used | |
186 def __init__(self, leftTurnVolume, throughVolume, rightTurnVolume): | |
187 self.leftTurnVolume = leftTurnVolume | |
188 self.throughVolume = throughVolume | |
189 self.rightTurnVolume = rightTurnVolume | |
190 | |
191 def getTVUVolume(self, leftTurnEquivalent = 1, throughEquivalent = 1, rightTurnEquivalent = 1): | |
192 return self.leftTurnVolume.getPCUVolume()*leftTurnEquivalent+self.throughVolume.getPCUVolume()*throughEquivalent+self.rightTurnVolume.getPCUVolume()*rightTurnEquivalent | |
193 | |
194 class LaneGroup: | 204 class LaneGroup: |
195 '''Class that represents a group of mouvements''' | 205 '''Class that represents a group of mouvements''' |
196 | 206 |
197 def __init__(self, movements, nLanes): | 207 def __init__(self, movements, nLanes): |
198 self.movements = movements | 208 self.movements = movements |
201 def getTVUVolume(self): | 211 def getTVUVolume(self): |
202 return sum([mvt.getTVUVolume() for mvt in self.movements]) | 212 return sum([mvt.getTVUVolume() for mvt in self.movements]) |
203 | 213 |
204 def getCharge(self, saturationVolume): | 214 def getCharge(self, saturationVolume): |
205 return self.getTVUVolume()/(self.nLanes*saturationVolume) | 215 return self.getTVUVolume()/(self.nLanes*saturationVolume) |
206 | |
207 def checkProtectedLeftTurn(leftMvt, opposedThroughMvt): | |
208 '''Checks if one of the main two conditions on left turn is verified | |
209 The lane groups should contain left and through movement''' | |
210 return leftMvt.volume >= 200 or leftMvt.volume*opposedThroughMvt.volume/opposedThroughMvt.nLanes > 50000 | |
211 | 216 |
212 def optimalCycle(lostTime, criticalCharge): | 217 def optimalCycle(lostTime, criticalCharge): |
213 return (1.5*lostTime+5)/(1-criticalCharge) | 218 return (1.5*lostTime+5)/(1-criticalCharge) |
214 | 219 |
215 def minimumCycle(lostTime, criticalCharge, degreeSaturation=1.): | 220 def minimumCycle(lostTime, criticalCharge, degreeSaturation=1.): |
224 self.phases = phases | 229 self.phases = phases |
225 self.lostTime = lostTime | 230 self.lostTime = lostTime |
226 self.saturationVolume = saturationVolume | 231 self.saturationVolume = saturationVolume |
227 | 232 |
228 def computeCriticalCharges(self): | 233 def computeCriticalCharges(self): |
229 self.criticalCharges = [] | 234 self.criticalCharges = [max([lg.getCharge(self.saturationVolume) for lg in phase]) for phase in self.phases] |
230 for phase in self.phases: | |
231 self.criticalCharges.append(max([lg.getCharge(self.saturationVolume) for lg in phase])) | |
232 self.criticalCharge = sum(self.criticalCharges) | 235 self.criticalCharge = sum(self.criticalCharges) |
233 | 236 |
234 def computeOptimalCycle(self): | 237 def computeOptimalCycle(self): |
235 self.computeCriticalCharges() | 238 self.computeCriticalCharges() |
236 self.C = optimalCycle(self.lostTime, self.criticalCharge) | 239 self.C = optimalCycle(self.lostTime, self.criticalCharge) |
240 self.computeCriticalCharges() | 243 self.computeCriticalCharges() |
241 self.C = minimumCycle(self.lostTime, self.criticalCharge, degreeSaturation) | 244 self.C = minimumCycle(self.lostTime, self.criticalCharge, degreeSaturation) |
242 return self.C | 245 return self.C |
243 | 246 |
244 def computeEffectiveGreen(self): | 247 def computeEffectiveGreen(self): |
245 from numpy import round | 248 #from numpy import round |
246 #self.computeCycle() # in case it was not done before | 249 #self.computeCycle() # in case it was not done before |
247 effectiveGreenTime = self.C-self.lostTime | 250 effectiveGreenTime = self.C-self.lostTime |
248 self.effectiveGreens = [round(c*effectiveGreenTime/self.criticalCharge,1) for c in self.criticalCharges] | 251 self.effectiveGreens = [round(c*effectiveGreenTime/self.criticalCharge,1) for c in self.criticalCharges] |
249 return self.effectiveGreens | 252 return self.effectiveGreens |
250 | 253 |
261 | 264 |
262 def uniformDelay(cycleLength, effectiveGreen, saturationDegree): | 265 def uniformDelay(cycleLength, effectiveGreen, saturationDegree): |
263 '''Computes the uniform delay''' | 266 '''Computes the uniform delay''' |
264 return 0.5*cycleLength*(1-float(effectiveGreen)/cycleLength)/(1-float(effectiveGreen*saturationDegree)/cycleLength) | 267 return 0.5*cycleLength*(1-float(effectiveGreen)/cycleLength)/(1-float(effectiveGreen*saturationDegree)/cycleLength) |
265 | 268 |
269 def overflowDelay(T, X, c, k=0.5, I=1): | |
270 '''Computes the overflow delay (HCM) | |
271 T in hours | |
272 c capacity of the lane group | |
273 k default for fixed time signal | |
274 I=1 for isolated intersection (Poisson arrival)''' | |
275 from math import sqrt | |
276 return 900*T*(X - 1 + sqrt((X - 1)**2 + 8*k*I*X/(c*T))) | |
277 | |
266 ######################### | 278 ######################### |
267 # misc | 279 # misc |
268 ######################### | 280 ######################### |
269 | 281 |
270 def timeChangingSpeed(v0, vf, a, TPR): | 282 def timeChangingSpeed(v0, vf, a, TPR): |