Mercurial Hosting > traffic-intelligence
comparison python/extrapolation.py @ 268:0c0b92f621f6
reorganized to compute evasive action for multiple positions
author | Nicolas Saunier <nicolas.saunier@polymtl.ca> |
---|---|
date | Sat, 28 Jul 2012 02:58:47 -0400 |
parents | 32e88b513f5c |
children | a9988971aac8 |
comparison
equal
deleted
inserted
replaced
267:32e88b513f5c | 268:0c0b92f621f6 |
---|---|
1 #! /usr/bin/env python | 1 #! /usr/bin/env python |
2 '''Library for moving object extrapolation hypotheses''' | 2 '''Library for moving object extrapolation hypotheses''' |
3 | 3 |
4 import moving | 4 import moving |
5 import math | 5 import math |
6 import random | |
6 | 7 |
7 class ExtrapolatedTrajectory: | 8 class ExtrapolatedTrajectory: |
8 '''Class for extrapolated trajectories with lazy evaluation | 9 '''Class for extrapolated trajectories with lazy evaluation |
9 if the predicted position has not been already computed, compute it | 10 if the predicted position has not been already computed, compute it |
10 | 11 |
68 self.maxSpeed = maxSpeed | 69 self.maxSpeed = maxSpeed |
69 | 70 |
70 def __str__(self): | 71 def __str__(self): |
71 return '{0} {1}'.format(self.name, self.maxSpeed) | 72 return '{0} {1}'.format(self.name, self.maxSpeed) |
72 | 73 |
73 # refactor with classes and subclasses | 74 class ConstantExtrapolationParameters(ExtrapolationParameters): |
74 def createExtrapolatedTrajectories(self, obj, instant): | 75 def __init__(self, maxSpeed): |
75 '''extrapolationParameters specific to each method (in name field) ''' | 76 ExtrapolationParameters.__init__(self, 'constant velocity', maxSpeed) |
77 | |
78 def generateExtrapolatedTrajectories(self, obj, instant): | |
79 return [ExtrapolatedTrajectoryConstant(obj.getPositionAtInstant(instant), obj.getVelocityAtInstant(instant), | |
80 maxSpeed = self.maxSpeed)] | |
81 | |
82 class NormalAdaptationExtrapolationParameters(ExtrapolationParameters): | |
83 def __init__(self, maxSpeed, nExtrapolatedTrajectories, maxAcceleration, maxSteering): | |
84 ExtrapolationParameters.__init__(self, 'normal adaptation', maxSpeed) | |
85 self.nExtrapolatedTrajectories = nExtrapolatedTrajectories | |
86 self.maxAcceleration = maxAcceleration | |
87 self.maxSteering = maxSteering | |
88 self.accelerationDistribution = lambda: random.triangular(-self.maxAcceleration, | |
89 self.maxAcceleration, 0.) | |
90 self.steeringDistribution = lambda: random.triangular(-self.maxSteering, | |
91 self.maxSteering, 0.) | |
92 | |
93 def __str__(self): | |
94 return ExtrapolationParameters.__str__(self)+' {0} {1} {2}'.format(self.nExtrapolatedTrajectories, | |
95 self.maxAcceleration, | |
96 self.maxSteering) | |
97 | |
98 def generateExtrapolatedTrajectories(self, obj, instant): | |
76 extrapolatedTrajectories = [] | 99 extrapolatedTrajectories = [] |
77 if self.name == 'constant velocity': | 100 for i in xrange(self.nExtrapolatedTrajectories): |
78 extrapolatedTrajectories = [ExtrapolatedTrajectoryConstant(obj.getPositionAtInstant(instant), obj.getVelocityAtInstant(instant), | 101 extrapolatedTrajectories.append(ExtrapolatedTrajectoryNormalAdaptation(obj.getPositionAtInstant(instant), |
79 maxSpeed = self.maxSpeed)] | 102 obj.getVelocityAtInstant(instant), |
80 elif self.name == 'normal adaptation': | 103 self.accelerationDistribution, |
81 for i in xrange(self.nExtrapolatedTrajectories): | 104 self.steeringDistribution, |
82 extrapolatedTrajectories.append(ExtrapolatedTrajectoryNormalAdaptation(obj.getPositionAtInstant(instant), | 105 maxSpeed = self.maxSpeed)) |
83 obj.getVelocityAtInstant(instant), | 106 return extrapolatedTrajectories |
84 self.accelerationDistribution, | 107 |
85 self.steeringDistribution, | 108 class PointSetExtrapolationParameters(ExtrapolationParameters): |
86 maxSpeed = self.maxSpeed)) | 109 def __init__(self, maxSpeed): |
87 elif self.name == 'pointset': | 110 ExtrapolationParameters.__init__(self, 'point set', maxSpeed) |
111 | |
112 def generateExtrapolatedTrajectories(self, obj, instant): | |
113 extrapolatedTrajectories = [] | |
114 features = [f for f in obj.features if f.existsAtInstant(instant)] | |
115 positions = [f.getPositionAtInstant(instant) for f in features] | |
116 velocities = [f.getVelocityAtInstant(instant) for f in features] | |
117 for initialPosition,initialVelocity in zip(positions, velocities): | |
118 extrapolatedTrajectories.append(ExtrapolatedTrajectoryConstant(initialPosition, initialVelocity, | |
119 maxSpeed = self.maxSpeed)) | |
120 return extrapolatedTrajectories | |
121 | |
122 class EvasiveActionExtrapolationParameters(ExtrapolationParameters): | |
123 def __init__(self, maxSpeed, nExtrapolatedTrajectories, minAcceleration, maxAcceleration, maxSteering, useFeatures = False): | |
124 if useFeatures: | |
125 name = 'point set evasive action' | |
126 else: | |
127 name = 'evasive action' | |
128 ExtrapolationParameters.__init__(self, name, maxSpeed) | |
129 self.nExtrapolatedTrajectories = nExtrapolatedTrajectories | |
130 self.minAcceleration = minAcceleration | |
131 self.maxAcceleration = maxAcceleration | |
132 self.maxSteering = maxSteering | |
133 self.useFeatures = useFeatures | |
134 self.accelerationDistribution = lambda: random.triangular(self.minAcceleration, | |
135 self.maxAcceleration, 0.) | |
136 self.steeringDistribution = lambda: random.triangular(-self.maxSteering, | |
137 self.maxSteering, 0.) | |
138 | |
139 def __str__(self): | |
140 return ExtrapolationParameters.__str__(self)+' {0} {1} {2} {3}'.format(self.nExtrapolatedTrajectories, self.minAcceleration, self.maxAcceleration, self.maxSteering) | |
141 | |
142 def generateExtrapolatedTrajectories(self, obj, instant): | |
143 extrapolatedTrajectories = [] | |
144 if self.useFeatures: | |
88 features = [f for f in obj.features if f.existsAtInstant(instant)] | 145 features = [f for f in obj.features if f.existsAtInstant(instant)] |
89 positions = [f.getPositionAtInstant(instant) for f in features] | 146 positions = [f.getPositionAtInstant(instant) for f in features] |
90 velocities = [f.getVelocityAtInstant(instant) for f in features] | 147 velocities = [f.getVelocityAtInstant(instant) for f in features] |
148 else: | |
149 positions = [obj.getPositionAtInstant(instant)] | |
150 velocities = [obj.getVelocityAtInstant(instant)] | |
151 for i in xrange(self.nExtrapolatedTrajectories): | |
91 for initialPosition,initialVelocity in zip(positions, velocities): | 152 for initialPosition,initialVelocity in zip(positions, velocities): |
92 extrapolatedTrajectories.append(ExtrapolatedTrajectoryConstant(initialPosition, initialVelocity, | 153 extrapolatedTrajectories.append(ExtrapolatedTrajectoryConstant(initialPosition, |
154 initialVelocity, | |
155 moving.NormAngle(self.accelerationDistribution(), | |
156 self.steeringDistribution()), | |
93 maxSpeed = self.maxSpeed)) | 157 maxSpeed = self.maxSpeed)) |
94 elif self.name == 'evasive action': | |
95 for i in xrange(self.nExtrapolatedTrajectories): | |
96 | |
97 extrapolatedTrajectories.append(ExtrapolatedTrajectoryConstant(obj.getPositionAtInstant(instant), | |
98 obj.getVelocityAtInstant(instant), | |
99 moving.NormAngle(self.accelerationDistribution(), self.steeringDistribution()), | |
100 maxSpeed = self.maxSpeed)) | |
101 else: | |
102 print('Unknown extrapolation hypothesis') | |
103 return extrapolatedTrajectories | 158 return extrapolatedTrajectories |
104 | 159 |
105 class SafetyPoint(moving.Point): | 160 class SafetyPoint(moving.Point): |
106 '''Can represent a collision point or crossing zone | 161 '''Can represent a collision point or crossing zone |
107 with respective safety indicator, TTC or pPET''' | 162 with respective safety indicator, TTC or pPET''' |
130 t += 1 | 185 t += 1 |
131 return t, p1, p2 | 186 return t, p1, p2 |
132 | 187 |
133 def computeCrossingsCollisionsAtInstant(i, obj1, obj2, extrapolationParameters, collisionDistanceThreshold, timeHorizon): | 188 def computeCrossingsCollisionsAtInstant(i, obj1, obj2, extrapolationParameters, collisionDistanceThreshold, timeHorizon): |
134 '''returns the lists of collision points and crossing zones ''' | 189 '''returns the lists of collision points and crossing zones ''' |
135 extrapolatedTrajectories1 = extrapolationParameters.createExtrapolatedTrajectories(obj1, i) | 190 extrapolatedTrajectories1 = extrapolationParameters.generateExtrapolatedTrajectories(obj1, i) |
136 extrapolatedTrajectories2 = extrapolationParameters.createExtrapolatedTrajectories(obj2, i) | 191 extrapolatedTrajectories2 = extrapolationParameters.generateExtrapolatedTrajectories(obj2, i) |
137 | 192 |
138 collisionPoints = [] | 193 collisionPoints = [] |
139 crossingZones = [] | 194 crossingZones = [] |
140 for et1 in extrapolatedTrajectories1: | 195 for et1 in extrapolatedTrajectories1: |
141 for et2 in extrapolatedTrajectories2: | 196 for et2 in extrapolatedTrajectories2: |
162 return collisionPoints, crossingZones | 217 return collisionPoints, crossingZones |
163 | 218 |
164 def computeCrossingsCollisions(obj1, obj2, extrapolationParameters, collisionDistanceThreshold, timeHorizon, outCP, outCZ, debug = False, timeInterval = None): | 219 def computeCrossingsCollisions(obj1, obj2, extrapolationParameters, collisionDistanceThreshold, timeHorizon, outCP, outCZ, debug = False, timeInterval = None): |
165 collisionPoints={} | 220 collisionPoints={} |
166 crossingZones={} | 221 crossingZones={} |
167 #TTCs = {} | |
168 #pPETs = {} | |
169 if timeInterval: | 222 if timeInterval: |
170 commonTimeInterval = timeInterval | 223 commonTimeInterval = timeInterval |
171 else: | 224 else: |
172 commonTimeInterval = obj1.commonTimeInterval(obj2) | 225 commonTimeInterval = obj1.commonTimeInterval(obj2) |
173 for i in list(commonTimeInterval)[:-1]: # do not look at the 1 last position/velocities, often with errors | 226 for i in list(commonTimeInterval)[:-1]: # do not look at the 1 last position/velocities, often with errors |
174 print(obj1.num, obj2.num, i) | 227 print(obj1.num, obj2.num, i) |
175 collisionPoints[i], crossingZones[i] = computeCrossingsCollisionsAtInstant(i, obj1, obj2, extrapolationParameters, collisionDistanceThreshold, timeHorizon) | 228 collisionPoints[i], crossingZones[i] = computeCrossingsCollisionsAtInstant(i, obj1, obj2, extrapolationParameters, collisionDistanceThreshold, timeHorizon) |
176 # if len(collisionPoints[i]) > 0: | |
177 # TTCs[i] = extrapolation.computeExpectedIndicator(collisionPoints[i]) | |
178 # if len(crossingZones[i]) > 0: | |
179 # pPETs[i] = extrapolation.computeExpectedIndicator(crossingZones[i]) | |
180 SafetyPoint.save(outCP, collisionPoints[i], obj1.num, obj2.num, i) | 229 SafetyPoint.save(outCP, collisionPoints[i], obj1.num, obj2.num, i) |
181 SafetyPoint.save(outCZ, crossingZones[i], obj1.num, obj2.num, i) | 230 SafetyPoint.save(outCZ, crossingZones[i], obj1.num, obj2.num, i) |
182 | 231 |
183 if debug: | 232 if debug: |
184 from matplotlib.pyplot import figure, axis, title | 233 from matplotlib.pyplot import figure, axis, title |
195 title('instant {0}'.format(i)) | 244 title('instant {0}'.format(i)) |
196 axis('equal') | 245 axis('equal') |
197 | 246 |
198 return collisionPoints, crossingZones | 247 return collisionPoints, crossingZones |
199 | 248 |
200 def computeCollisionProbability(obj1, obj2, extrapolationParameters, collisionDistanceThreshold, timeHorizon, out, debug = False): | 249 def computeCollisionProbability(obj1, obj2, extrapolationParameters, collisionDistanceThreshold, timeHorizon, out, debug = False, timeInterval = None): |
201 collisionProbabilities = {} | 250 collisionProbabilities = {} |
202 for i in list(obj1.commonTimeInterval(obj2))[:-1]: | 251 if timeInterval: |
252 commonTimeInterval = timeInterval | |
253 else: | |
254 commonTimeInterval = obj1.commonTimeInterval(obj2) | |
255 for i in list(commonTimeInterval)[:-1]: | |
203 nCollisions = 0 | 256 nCollisions = 0 |
204 print(obj1.num, obj2.num, i) | 257 print(obj1.num, obj2.num, i) |
205 extrapolatedTrajectories1 = extrapolationParameters.createExtrapolatedTrajectories(obj1, i) | 258 extrapolatedTrajectories1 = extrapolationParameters.generateExtrapolatedTrajectories(obj1, i) |
206 extrapolatedTrajectories2 = extrapolationParameters.createExtrapolatedTrajectories(obj2, i) | 259 extrapolatedTrajectories2 = extrapolationParameters.generateExtrapolatedTrajectories(obj2, i) |
207 for et1 in extrapolatedTrajectories1: | 260 for et1 in extrapolatedTrajectories1: |
208 for et2 in extrapolatedTrajectories2: | 261 for et2 in extrapolatedTrajectories2: |
209 t, p1, p2 = computeCollisionTime(et1, et2, collisionDistanceThreshold, timeHorizon) | 262 t, p1, p2 = computeCollisionTime(et1, et2, collisionDistanceThreshold, timeHorizon) |
210 if t <= timeHorizon: | 263 if t <= timeHorizon: |
211 nCollisions += 1 | 264 nCollisions += 1 |
212 # take into account probabilities ?? | 265 # take into account probabilities ?? |
213 collisionProbabilities[i] = float(nCollisions)/extrapolationParameters.nExtrapolatedTrajectories**2 | 266 nSamples = float(len(extrapolatedTrajectories1)*len(extrapolatedTrajectories2)) |
214 out.write('{0} {1} {2} {3} {4}\n'.format(obj1.num, obj2.num, extrapolationParameters.nExtrapolatedTrajectories, i, collisionProbabilities[i])) | 267 collisionProbabilities[i] = float(nCollisions)/nSamples |
268 out.write('{0} {1} {2} {3} {4}\n'.format(obj1.num, obj2.num, nSamples, i, collisionProbabilities[i])) | |
215 | 269 |
216 if debug: | 270 if debug: |
217 from matplotlib.pyplot import figure, axis, title | 271 from matplotlib.pyplot import figure, axis, title |
218 figure() | 272 figure() |
219 obj1.draw('r') | 273 obj1.draw('r') |