Mercurial Hosting > traffic-intelligence
comparison python/extrapolation.py @ 267:32e88b513f5c
added code to compute probability of collision
author | Nicolas Saunier <nicolas.saunier@polymtl.ca> |
---|---|
date | Fri, 27 Jul 2012 20:32:14 -0400 |
parents | aba9711b3149 |
children | 0c0b92f621f6 |
comparison
equal
deleted
inserted
replaced
266:aba9711b3149 | 267:32e88b513f5c |
---|---|
68 self.maxSpeed = maxSpeed | 68 self.maxSpeed = maxSpeed |
69 | 69 |
70 def __str__(self): | 70 def __str__(self): |
71 return '{0} {1}'.format(self.name, self.maxSpeed) | 71 return '{0} {1}'.format(self.name, self.maxSpeed) |
72 | 72 |
73 def createExtrapolatedTrajectories(extrapolationParameters, obj, instant): | 73 # refactor with classes and subclasses |
74 '''extrapolationParameters specific to each method (in name field) ''' | 74 def createExtrapolatedTrajectories(self, obj, instant): |
75 if extrapolationParameters.name == 'constant velocity': | 75 '''extrapolationParameters specific to each method (in name field) ''' |
76 return [ExtrapolatedTrajectoryConstant(obj.getPositionAtInstant(instant), obj.getVelocityAtInstant(instant), | |
77 maxSpeed = extrapolationParameters.maxSpeed)] | |
78 elif extrapolationParameters.name == 'normal adaptation': | |
79 extrapolatedTrajectories = [] | 76 extrapolatedTrajectories = [] |
80 for i in xrange(extrapolationParameters.nExtrapolatedTrajectories): | 77 if self.name == 'constant velocity': |
81 extrapolatedTrajectories.append(ExtrapolatedTrajectoryNormalAdaptation(obj.getPositionAtInstant(instant), | 78 extrapolatedTrajectories = [ExtrapolatedTrajectoryConstant(obj.getPositionAtInstant(instant), obj.getVelocityAtInstant(instant), |
82 obj.getVelocityAtInstant(instant), | 79 maxSpeed = self.maxSpeed)] |
83 extrapolationParameters.accelerationDistribution, | 80 elif self.name == 'normal adaptation': |
84 extrapolationParameters.steeringDistribution, | 81 for i in xrange(self.nExtrapolatedTrajectories): |
85 maxSpeed = extrapolationParameters.maxSpeed)) | 82 extrapolatedTrajectories.append(ExtrapolatedTrajectoryNormalAdaptation(obj.getPositionAtInstant(instant), |
83 obj.getVelocityAtInstant(instant), | |
84 self.accelerationDistribution, | |
85 self.steeringDistribution, | |
86 maxSpeed = self.maxSpeed)) | |
87 elif self.name == 'pointset': | |
88 features = [f for f in obj.features if f.existsAtInstant(instant)] | |
89 positions = [f.getPositionAtInstant(instant) for f in features] | |
90 velocities = [f.getVelocityAtInstant(instant) for f in features] | |
91 for initialPosition,initialVelocity in zip(positions, velocities): | |
92 extrapolatedTrajectories.append(ExtrapolatedTrajectoryConstant(initialPosition, initialVelocity, | |
93 maxSpeed = self.maxSpeed)) | |
94 elif self.name == 'evasive action': | |
95 for i in xrange(self.nExtrapolatedTrajectories): | |
96 | |
97 extrapolatedTrajectories.append(ExtrapolatedTrajectoryConstant(obj.getPositionAtInstant(instant), | |
98 obj.getVelocityAtInstant(instant), | |
99 moving.NormAngle(self.accelerationDistribution(), self.steeringDistribution()), | |
100 maxSpeed = self.maxSpeed)) | |
101 else: | |
102 print('Unknown extrapolation hypothesis') | |
86 return extrapolatedTrajectories | 103 return extrapolatedTrajectories |
87 elif extrapolationParameters.name == 'pointset': | |
88 extrapolatedTrajectories = [] | |
89 features = [f for f in obj.features if f.existsAtInstant(instant)] | |
90 positions = [f.getPositionAtInstant(instant) for f in features] | |
91 velocities = [f.getVelocityAtInstant(instant) for f in features] | |
92 for initialPosition,initialVelocity in zip(positions, velocities): | |
93 extrapolatedTrajectories.append(ExtrapolatedTrajectoryConstant(initialPosition, initialVelocity, | |
94 maxSpeed = extrapolationParameters.maxSpeed)) | |
95 return extrapolatedTrajectories | |
96 else: | |
97 print('Unknown extrapolation hypothesis') | |
98 return [] | |
99 | 104 |
100 class SafetyPoint(moving.Point): | 105 class SafetyPoint(moving.Point): |
101 '''Can represent a collision point or crossing zone | 106 '''Can represent a collision point or crossing zone |
102 with respective safety indicator, TTC or pPET''' | 107 with respective safety indicator, TTC or pPET''' |
103 def __init__(self, p, probability = 1., indicator = -1): | 108 def __init__(self, p, probability = 1., indicator = -1): |
113 | 118 |
114 def computeExpectedIndicator(points): | 119 def computeExpectedIndicator(points): |
115 from numpy import sum | 120 from numpy import sum |
116 return sum([p.indicator*p.probability for p in points])/sum([p.probability for p in points]) | 121 return sum([p.indicator*p.probability for p in points])/sum([p.probability for p in points]) |
117 | 122 |
123 def computeCollisionTime(extrapolatedTrajectory1, extrapolatedTrajectory2, collisionDistanceThreshold, timeHorizon): | |
124 t = 1 | |
125 p1 = extrapolatedTrajectory1.predictPosition(t) | |
126 p2 = extrapolatedTrajectory2.predictPosition(t) | |
127 while t <= timeHorizon and (p1-p2).norm2() > collisionDistanceThreshold: | |
128 p1 = extrapolatedTrajectory1.predictPosition(t) | |
129 p2 = extrapolatedTrajectory2.predictPosition(t) | |
130 t += 1 | |
131 return t, p1, p2 | |
132 | |
118 def computeCrossingsCollisionsAtInstant(i, obj1, obj2, extrapolationParameters, collisionDistanceThreshold, timeHorizon): | 133 def computeCrossingsCollisionsAtInstant(i, obj1, obj2, extrapolationParameters, collisionDistanceThreshold, timeHorizon): |
119 '''returns the lists of collision points and crossing zones ''' | 134 '''returns the lists of collision points and crossing zones ''' |
120 extrapolatedTrajectories1 = createExtrapolatedTrajectories(extrapolationParameters, obj1, i) | 135 extrapolatedTrajectories1 = extrapolationParameters.createExtrapolatedTrajectories(obj1, i) |
121 extrapolatedTrajectories2 = createExtrapolatedTrajectories(extrapolationParameters, obj2, i) | 136 extrapolatedTrajectories2 = extrapolationParameters.createExtrapolatedTrajectories(obj2, i) |
122 | 137 |
123 collisionPoints = [] | 138 collisionPoints = [] |
124 crossingZones = [] | 139 crossingZones = [] |
125 for et1 in extrapolatedTrajectories1: | 140 for et1 in extrapolatedTrajectories1: |
126 for et2 in extrapolatedTrajectories2: | 141 for et2 in extrapolatedTrajectories2: |
127 t = 1 | 142 t, p1, p2 = computeCollisionTime(et1, et2, collisionDistanceThreshold, timeHorizon) |
128 p1 = et1.predictPosition(t) | 143 |
129 p2 = et2.predictPosition(t) | |
130 while t <= timeHorizon and (p1-p2).norm2() > collisionDistanceThreshold: | |
131 p1 = et1.predictPosition(t) | |
132 p2 = et2.predictPosition(t) | |
133 t += 1 | |
134 | |
135 if t <= timeHorizon: | 144 if t <= timeHorizon: |
136 collisionPoints.append(SafetyPoint((p1+p2).multiply(0.5), et1.probability*et2.probability, t)) | 145 collisionPoints.append(SafetyPoint((p1+p2).multiply(0.5), et1.probability*et2.probability, t)) |
137 else: # check if there is a crossing zone | 146 else: # check if there is a crossing zone |
138 # TODO? zone should be around the points at which the traj are the closest | 147 # TODO? zone should be around the points at which the traj are the closest |
139 # look for CZ at different times, otherwise it would be a collision | 148 # look for CZ at different times, otherwise it would be a collision |
183 for et in extrapolatedTrajectories2: | 192 for et in extrapolatedTrajectories2: |
184 et.predictPosition(timeHorizon) | 193 et.predictPosition(timeHorizon) |
185 et.draw('bx') | 194 et.draw('bx') |
186 title('instant {0}'.format(i)) | 195 title('instant {0}'.format(i)) |
187 axis('equal') | 196 axis('equal') |
197 | |
198 return collisionPoints, crossingZones | |
199 | |
200 def computeCollisionProbability(obj1, obj2, extrapolationParameters, collisionDistanceThreshold, timeHorizon, out, debug = False): | |
201 collisionProbabilities = {} | |
202 for i in list(obj1.commonTimeInterval(obj2))[:-1]: | |
203 nCollisions = 0 | |
204 print(obj1.num, obj2.num, i) | |
205 extrapolatedTrajectories1 = extrapolationParameters.createExtrapolatedTrajectories(obj1, i) | |
206 extrapolatedTrajectories2 = extrapolationParameters.createExtrapolatedTrajectories(obj2, i) | |
207 for et1 in extrapolatedTrajectories1: | |
208 for et2 in extrapolatedTrajectories2: | |
209 t, p1, p2 = computeCollisionTime(et1, et2, collisionDistanceThreshold, timeHorizon) | |
210 if t <= timeHorizon: | |
211 nCollisions += 1 | |
212 # take into account probabilities ?? | |
213 collisionProbabilities[i] = float(nCollisions)/extrapolationParameters.nExtrapolatedTrajectories**2 | |
214 out.write('{0} {1} {2} {3} {4}\n'.format(obj1.num, obj2.num, extrapolationParameters.nExtrapolatedTrajectories, i, collisionProbabilities[i])) | |
215 | |
216 if debug: | |
217 from matplotlib.pyplot import figure, axis, title | |
218 figure() | |
219 obj1.draw('r') | |
220 obj2.draw('b') | |
221 for et in extrapolatedTrajectories1: | |
222 et.predictPosition(timeHorizon) | |
223 et.draw('rx') | |
188 | 224 |
189 | 225 for et in extrapolatedTrajectories2: |
190 # probability of successful evasive action = 1-P(Collision) using larger control values | 226 et.predictPosition(timeHorizon) |
191 | 227 et.draw('bx') |
192 return collisionPoints, crossingZones | 228 title('instant {0}'.format(i)) |
229 axis('equal') | |
230 | |
231 return collisionProbabilities | |
193 | 232 |
194 ############### | 233 ############### |
195 | 234 |
196 # Default values: to remove because we cannot tweak that from a script where the value may be different | 235 # Default values: to remove because we cannot tweak that from a script where the value may be different |
197 FPS= 25 # No. of frame per second (FPS) | 236 FPS= 25 # No. of frame per second (FPS) |