Mercurial Hosting > traffic-intelligence
comparison python/prediction.py @ 358:c41ff9f3c263
moved current method for collision points and crossing zones computation into prediction parameters (put expectedindicator in SafetyPoint)
author | Nicolas Saunier <nicolas.saunier@polymtl.ca> |
---|---|
date | Thu, 11 Jul 2013 00:17:25 -0400 |
parents | e5fe0e6d48a1 |
children | 619ae9a9a788 |
comparison
equal
deleted
inserted
replaced
357:e5fe0e6d48a1 | 358:c41ff9f3c263 |
---|---|
61 self.predictedSpeedOrientations = {0: moving.NormAngle.fromPoint(initialVelocity)} | 61 self.predictedSpeedOrientations = {0: moving.NormAngle.fromPoint(initialVelocity)} |
62 | 62 |
63 def getControl(self): | 63 def getControl(self): |
64 return moving.NormAngle(self.accelerationDistribution(),self.steeringDistribution()) | 64 return moving.NormAngle(self.accelerationDistribution(),self.steeringDistribution()) |
65 | 65 |
66 def computeCollisionTime(predictedTrajectory1, predictedTrajectory2, collisionDistanceThreshold, timeHorizon): | |
67 '''Computes the first instant at which two predicted trajectories are within some distance threshold''' | |
68 t = 1 | |
69 p1 = predictedTrajectory1.predictPosition(t) | |
70 p2 = predictedTrajectory2.predictPosition(t) | |
71 while t <= timeHorizon and (p1-p2).norm2() > collisionDistanceThreshold: | |
72 p1 = predictedTrajectory1.predictPosition(t) | |
73 p2 = predictedTrajectory2.predictPosition(t) | |
74 t += 1 | |
75 return t, p1, p2 | |
76 | |
66 class PredictionParameters: | 77 class PredictionParameters: |
67 def __init__(self, name, maxSpeed): | 78 def __init__(self, name, maxSpeed): |
68 self.name = name | 79 self.name = name |
69 self.maxSpeed = maxSpeed | 80 self.maxSpeed = maxSpeed |
70 | 81 |
71 def __str__(self): | 82 def __str__(self): |
72 return '{0} {1}'.format(self.name, self.maxSpeed) | 83 return '{0} {1}'.format(self.name, self.maxSpeed) |
84 | |
85 def generatePredictedTrajectories(self, obj, instant): | |
86 return [] | |
87 | |
88 def computeCrossingsCollisionsAtInstant(self, currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False): | |
89 '''returns the lists of collision points and crossing zones | |
90 | |
91 Check: Predicting all the points together, as if they represent the whole vehicle''' | |
92 predictedTrajectories1 = self.generatePredictedTrajectories(obj1, currentInstant) | |
93 predictedTrajectories2 = self.generatePredictedTrajectories(obj2, currentInstant) | |
94 | |
95 collisionPoints = [] | |
96 crossingZones = [] | |
97 for et1 in predictedTrajectories1: | |
98 for et2 in predictedTrajectories2: | |
99 t, p1, p2 = computeCollisionTime(et1, et2, collisionDistanceThreshold, timeHorizon) | |
100 | |
101 if t <= timeHorizon: | |
102 collisionPoints.append(SafetyPoint((p1+p2).multiply(0.5), et1.probability*et2.probability, t)) | |
103 elif computeCZ: # check if there is a crossing zone | |
104 # TODO? zone should be around the points at which the traj are the closest | |
105 # look for CZ at different times, otherwise it would be a collision | |
106 # an approximation would be to look for close points at different times, ie the complementary of collision points | |
107 cz = None | |
108 t1 = 0 | |
109 while not cz and t1 < timeHorizon: # t1 <= timeHorizon-1 | |
110 t2 = 0 | |
111 while not cz and t2 < timeHorizon: | |
112 #if (et1.predictPosition(t1)-et2.predictPosition(t2)).norm2() < collisionDistanceThreshold: | |
113 # cz = (et1.predictPosition(t1)+et2.predictPosition(t2)).multiply(0.5) | |
114 cz = moving.segmentIntersection(et1.predictPosition(t1), et1.predictPosition(t1+1), et2.predictPosition(t2), et2.predictPosition(t2+1)) | |
115 if cz: | |
116 crossingZones.append(SafetyPoint(cz, et1.probability*et2.probability, abs(t1-t2))) | |
117 t2 += 1 | |
118 t1 += 1 | |
119 | |
120 if debug: | |
121 from matplotlib.pyplot import figure, axis, title | |
122 figure() | |
123 for et in predictedTrajectories1: | |
124 et.predictPosition(timeHorizon) | |
125 et.draw('rx') | |
126 | |
127 for et in predictedTrajectories2: | |
128 et.predictPosition(timeHorizon) | |
129 et.draw('bx') | |
130 obj1.draw('r') | |
131 obj2.draw('b') | |
132 title('instant {0}'.format(currentInstant)) | |
133 axis('equal') | |
134 | |
135 return collisionPoints, crossingZones | |
136 | |
137 def computeCrossingsCollisions(self, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None): | |
138 '''Computes all crossing and collision points at each common instant for two road users. ''' | |
139 collisionPoints={} | |
140 crossingZones={} | |
141 if timeInterval: | |
142 commonTimeInterval = timeInterval | |
143 else: | |
144 commonTimeInterval = obj1.commonTimeInterval(obj2) | |
145 for i in list(commonTimeInterval)[:-1]: # do not look at the 1 last position/velocities, often with errors | |
146 collisionPoints[i], crossingZones[i] = self.computeCrossingsCollisionsAtInstant(i, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ, debug) | |
147 | |
148 return collisionPoints, crossingZones | |
149 | |
150 def computeCollisionProbability(self, obj1, obj2, collisionDistanceThreshold, timeHorizon, debug = False, timeInterval = None): | |
151 '''Computes only collision probabilities | |
152 Returns for each instant the collision probability and number of samples drawn''' | |
153 collisionProbabilities = {} | |
154 if timeInterval: | |
155 commonTimeInterval = timeInterval | |
156 else: | |
157 commonTimeInterval = obj1.commonTimeInterval(obj2) | |
158 for i in list(commonTimeInterval)[:-1]: | |
159 nCollisions = 0 | |
160 print(obj1.num, obj2.num, i) | |
161 predictedTrajectories1 = self.generatePredictedTrajectories(obj1, i) | |
162 predictedTrajectories2 = self.generatePredictedTrajectories(obj2, i) | |
163 for et1 in predictedTrajectories1: | |
164 for et2 in predictedTrajectories2: | |
165 t, p1, p2 = computeCollisionTime(et1, et2, collisionDistanceThreshold, timeHorizon) | |
166 if t <= timeHorizon: | |
167 nCollisions += 1 | |
168 # take into account probabilities ?? | |
169 nSamples = float(len(predictedTrajectories1)*len(predictedTrajectories2)) | |
170 collisionProbabilities[i] = [nSamples, float(nCollisions)/nSamples] | |
171 | |
172 if debug: | |
173 from matplotlib.pyplot import figure, axis, title | |
174 figure() | |
175 for et in predictedTrajectories1: | |
176 et.predictPosition(timeHorizon) | |
177 et.draw('rx') | |
178 | |
179 for et in predictedTrajectories2: | |
180 et.predictPosition(timeHorizon) | |
181 et.draw('bx') | |
182 obj1.draw('r') | |
183 obj2.draw('b') | |
184 title('instant {0}'.format(i)) | |
185 axis('equal') | |
186 | |
187 return collisionProbabilities | |
73 | 188 |
74 class ConstantPredictionParameters(PredictionParameters): | 189 class ConstantPredictionParameters(PredictionParameters): |
75 def __init__(self, maxSpeed): | 190 def __init__(self, maxSpeed): |
76 PredictionParameters.__init__(self, 'constant velocity', maxSpeed) | 191 PredictionParameters.__init__(self, 'constant velocity', maxSpeed) |
77 | 192 |
188 @staticmethod | 303 @staticmethod |
189 def save(out, points, predictionInstant, objNum1, objNum2): | 304 def save(out, points, predictionInstant, objNum1, objNum2): |
190 for p in points: | 305 for p in points: |
191 out.write('{0} {1} {2} {3}\n'.format(objNum1, objNum2, predictionInstant, p)) | 306 out.write('{0} {1} {2} {3}\n'.format(objNum1, objNum2, predictionInstant, p)) |
192 | 307 |
193 def computeExpectedIndicator(points): | 308 @staticmethod |
194 from numpy import sum | 309 def computeExpectedIndicator(points): |
195 return sum([p.indicator*p.probability for p in points])/sum([p.probability for p in points]) | 310 from numpy import sum |
196 | 311 return sum([p.indicator*p.probability for p in points])/sum([p.probability for p in points]) |
197 def computeCollisionTime(predictedTrajectory1, predictedTrajectory2, collisionDistanceThreshold, timeHorizon): | 312 |
198 '''Computes the first instant at which two predicted trajectories are within some distance threshold''' | |
199 t = 1 | |
200 p1 = predictedTrajectory1.predictPosition(t) | |
201 p2 = predictedTrajectory2.predictPosition(t) | |
202 while t <= timeHorizon and (p1-p2).norm2() > collisionDistanceThreshold: | |
203 p1 = predictedTrajectory1.predictPosition(t) | |
204 p2 = predictedTrajectory2.predictPosition(t) | |
205 t += 1 | |
206 return t, p1, p2 | |
207 | |
208 def computeCrossingsCollisionsAtInstant(currentInstant, obj1, obj2, predictionParameters, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False): | |
209 '''returns the lists of collision points and crossing zones | |
210 | |
211 Check: Predicting all the points together, as if they represent the whole vehicle''' | |
212 predictedTrajectories1 = predictionParameters.generatePredictedTrajectories(obj1, currentInstant) | |
213 predictedTrajectories2 = predictionParameters.generatePredictedTrajectories(obj2, currentInstant) | |
214 | |
215 collisionPoints = [] | |
216 crossingZones = [] | |
217 for et1 in predictedTrajectories1: | |
218 for et2 in predictedTrajectories2: | |
219 t, p1, p2 = computeCollisionTime(et1, et2, collisionDistanceThreshold, timeHorizon) | |
220 | |
221 if t <= timeHorizon: | |
222 collisionPoints.append(SafetyPoint((p1+p2).multiply(0.5), et1.probability*et2.probability, t)) | |
223 elif computeCZ: # check if there is a crossing zone | |
224 # TODO? zone should be around the points at which the traj are the closest | |
225 # look for CZ at different times, otherwise it would be a collision | |
226 # an approximation would be to look for close points at different times, ie the complementary of collision points | |
227 cz = None | |
228 t1 = 0 | |
229 while not cz and t1 < timeHorizon: # t1 <= timeHorizon-1 | |
230 t2 = 0 | |
231 while not cz and t2 < timeHorizon: | |
232 #if (et1.predictPosition(t1)-et2.predictPosition(t2)).norm2() < collisionDistanceThreshold: | |
233 # cz = (et1.predictPosition(t1)+et2.predictPosition(t2)).multiply(0.5) | |
234 cz = moving.segmentIntersection(et1.predictPosition(t1), et1.predictPosition(t1+1), et2.predictPosition(t2), et2.predictPosition(t2+1)) | |
235 if cz: | |
236 crossingZones.append(SafetyPoint(cz, et1.probability*et2.probability, abs(t1-t2))) | |
237 t2 += 1 | |
238 t1 += 1 | |
239 | |
240 if debug: | |
241 from matplotlib.pyplot import figure, axis, title | |
242 figure() | |
243 for et in predictedTrajectories1: | |
244 et.predictPosition(timeHorizon) | |
245 et.draw('rx') | |
246 | |
247 for et in predictedTrajectories2: | |
248 et.predictPosition(timeHorizon) | |
249 et.draw('bx') | |
250 obj1.draw('r') | |
251 obj2.draw('b') | |
252 title('instant {0}'.format(currentInstant)) | |
253 axis('equal') | |
254 | |
255 return collisionPoints, crossingZones | |
256 | |
257 def computeCrossingsCollisions(obj1, obj2, predictionParameters, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None): | |
258 '''Computes all crossing and collision points at each common instant for two road users. ''' | |
259 collisionPoints={} | |
260 crossingZones={} | |
261 if timeInterval: | |
262 commonTimeInterval = timeInterval | |
263 else: | |
264 commonTimeInterval = obj1.commonTimeInterval(obj2) | |
265 for i in list(commonTimeInterval)[:-1]: # do not look at the 1 last position/velocities, often with errors | |
266 collisionPoints[i], crossingZones[i] = computeCrossingsCollisionsAtInstant(i, obj1, obj2, predictionParameters, collisionDistanceThreshold, timeHorizon, computeCZ, debug) | |
267 | |
268 return collisionPoints, crossingZones | |
269 | |
270 def computeCollisionProbability(obj1, obj2, predictionParameters, collisionDistanceThreshold, timeHorizon, debug = False, timeInterval = None): | |
271 '''Computes only collision probabilities | |
272 Returns for each instant the collision probability and number of samples drawn''' | |
273 collisionProbabilities = {} | |
274 if timeInterval: | |
275 commonTimeInterval = timeInterval | |
276 else: | |
277 commonTimeInterval = obj1.commonTimeInterval(obj2) | |
278 for i in list(commonTimeInterval)[:-1]: | |
279 nCollisions = 0 | |
280 print(obj1.num, obj2.num, i) | |
281 predictedTrajectories1 = predictionParameters.generatePredictedTrajectories(obj1, i) | |
282 predictedTrajectories2 = predictionParameters.generatePredictedTrajectories(obj2, i) | |
283 for et1 in predictedTrajectories1: | |
284 for et2 in predictedTrajectories2: | |
285 t, p1, p2 = computeCollisionTime(et1, et2, collisionDistanceThreshold, timeHorizon) | |
286 if t <= timeHorizon: | |
287 nCollisions += 1 | |
288 # take into account probabilities ?? | |
289 nSamples = float(len(predictedTrajectories1)*len(predictedTrajectories2)) | |
290 collisionProbabilities[i] = [nSamples, float(nCollisions)/nSamples] | |
291 | |
292 if debug: | |
293 from matplotlib.pyplot import figure, axis, title | |
294 figure() | |
295 for et in predictedTrajectories1: | |
296 et.predictPosition(timeHorizon) | |
297 et.draw('rx') | |
298 | |
299 for et in predictedTrajectories2: | |
300 et.predictPosition(timeHorizon) | |
301 et.draw('bx') | |
302 obj1.draw('r') | |
303 obj2.draw('b') | |
304 title('instant {0}'.format(i)) | |
305 axis('equal') | |
306 | |
307 return collisionProbabilities | |
308 | 313 |
309 #### | 314 #### |
310 # Other Methods | 315 # Other Methods |
311 #### | 316 #### |
312 | 317 |