Mercurial Hosting > traffic-intelligence
comparison trafficintelligence/events.py @ 1028:cc5cb04b04b0
major update using the trafficintelligence package name and install through pip
author | Nicolas Saunier <nicolas.saunier@polymtl.ca> |
---|---|
date | Fri, 15 Jun 2018 11:19:10 -0400 |
parents | python/events.py@933670761a57 |
children | 6a8fe3ed3bc6 |
comparison
equal
deleted
inserted
replaced
1027:6129296848d3 | 1028:cc5cb04b04b0 |
---|---|
1 #! /usr/bin/env python | |
2 '''Libraries for events | |
3 Interactions, pedestrian crossing...''' | |
4 | |
5 from trafficintelligence import moving, prediction, indicators, utils, cvutils, ml | |
6 from trafficintelligence.base import VideoFilenameAddable | |
7 | |
8 import numpy as np | |
9 | |
10 import multiprocessing | |
11 import itertools, logging | |
12 | |
13 | |
14 def findRoute(prototypes,objects,i,j,noiseEntryNums,noiseExitNums,minSimilarity= 0.3, spatialThreshold=1.0, delta=180): | |
15 if i[0] not in noiseEntryNums: | |
16 prototypesRoutes= [ x for x in sorted(prototypes.keys()) if i[0]==x[0]] | |
17 elif i[1] not in noiseExitNums: | |
18 prototypesRoutes=[ x for x in sorted(prototypes.keys()) if i[1]==x[1]] | |
19 else: | |
20 prototypesRoutes=[x for x in sorted(prototypes.keys())] | |
21 routeSim={} | |
22 lcss = utils.LCSS(similarityFunc=lambda x,y: (distanceForLCSS(x,y) <= spatialThreshold),delta=delta) | |
23 for y in prototypesRoutes: | |
24 if y in prototypes: | |
25 prototypesIDs=prototypes[y] | |
26 similarity=[] | |
27 for x in prototypesIDs: | |
28 s=lcss.computeNormalized(objects[j].positions, objects[x].positions) | |
29 similarity.append(s) | |
30 routeSim[y]=max(similarity) | |
31 route=max(routeSim, key=routeSim.get) | |
32 if routeSim[route]>=minSimilarity: | |
33 return route | |
34 else: | |
35 return i | |
36 | |
37 def getRoute(obj,prototypes,objects,noiseEntryNums,noiseExitNums,useDestination=True): | |
38 route=(obj.startRouteID,obj.endRouteID) | |
39 if useDestination: | |
40 if route not in prototypes: | |
41 route= findRoute(prototypes,objects,route,obj.getNum(),noiseEntryNums,noiseExitNums) | |
42 return route | |
43 | |
44 class Interaction(moving.STObject, VideoFilenameAddable): | |
45 '''Class for an interaction between two road users | |
46 or a road user and an obstacle | |
47 | |
48 link to the moving objects | |
49 contains the indicators in a dictionary with the names as keys | |
50 ''' | |
51 | |
52 categories = {'Head On': 0, | |
53 'rearend': 1, | |
54 'side': 2, | |
55 'parallel': 3} | |
56 | |
57 indicatorNames = ['Collision Course Dot Product', | |
58 'Collision Course Angle', | |
59 'Distance', | |
60 'Minimum Distance', | |
61 'Velocity Angle', | |
62 'Speed Differential', | |
63 'Collision Probability', | |
64 'Time to Collision', # 7 | |
65 'Probability of Successful Evasive Action', | |
66 'predicted Post Encroachment Time', | |
67 'Post Encroachment Time'] | |
68 | |
69 indicatorNameToIndices = utils.inverseEnumeration(indicatorNames) | |
70 | |
71 indicatorShortNames = ['CCDP', | |
72 'CCA', | |
73 'Dist', | |
74 'MinDist', | |
75 'VA', | |
76 'SD', | |
77 'PoC', | |
78 'TTC', | |
79 'P(SEA)', | |
80 'pPET', | |
81 'PET'] | |
82 | |
83 indicatorUnits = ['', | |
84 'rad', | |
85 'm', | |
86 'm', | |
87 'rad', | |
88 'm/s', | |
89 '', | |
90 's', | |
91 '', | |
92 's', | |
93 's'] | |
94 | |
95 timeIndicators = ['Time to Collision', 'predicted Post Encroachment Time'] | |
96 | |
97 def __init__(self, num = None, timeInterval = None, roaduserNum1 = None, roaduserNum2 = None, roadUser1 = None, roadUser2 = None, categoryNum = None): | |
98 moving.STObject.__init__(self, num, timeInterval) | |
99 if timeInterval is None and roadUser1 is not None and roadUser2 is not None: | |
100 self.timeInterval = roadUser1.commonTimeInterval(roadUser2) | |
101 self.roadUser1 = roadUser1 | |
102 self.roadUser2 = roadUser2 | |
103 if roaduserNum1 is not None and roaduserNum2 is not None: | |
104 self.roadUserNumbers = set([roaduserNum1, roaduserNum2]) | |
105 elif roadUser1 is not None and roadUser2 is not None: | |
106 self.roadUserNumbers = set([roadUser1.getNum(), roadUser2.getNum()]) | |
107 else: | |
108 self.roadUserNumbers = None | |
109 self.categoryNum = categoryNum | |
110 self.indicators = {} | |
111 self.interactionInterval = None | |
112 # list for collison points and crossing zones | |
113 self.collisionPoints = None | |
114 self.crossingZones = None | |
115 | |
116 def getRoadUserNumbers(self): | |
117 return self.roadUserNumbers | |
118 | |
119 def setRoadUsers(self, objects): | |
120 nums = sorted(list(self.getRoadUserNumbers())) | |
121 if nums[0]<len(objects) and objects[nums[0]].getNum() == nums[0]: | |
122 self.roadUser1 = objects[nums[0]] | |
123 if nums[1]<len(objects) and objects[nums[1]].getNum() == nums[1]: | |
124 self.roadUser2 = objects[nums[1]] | |
125 | |
126 if self.roadUser1 is None or self.roadUser2 is None: | |
127 self.roadUser1 = None | |
128 self.roadUser2 = None | |
129 i = 0 | |
130 while i < len(objects) and self.roadUser2 is None: | |
131 if objects[i].getNum() in nums: | |
132 if self.roadUser1 is None: | |
133 self.roadUser1 = objects[i] | |
134 else: | |
135 self.roadUser2 = objects[i] | |
136 i += 1 | |
137 | |
138 def getIndicator(self, indicatorName): | |
139 return self.indicators.get(indicatorName, None) | |
140 | |
141 def addIndicator(self, indicator): | |
142 if indicator is not None: | |
143 self.indicators[indicator.name] = indicator | |
144 | |
145 def getIndicatorValueAtInstant(self, indicatorName, instant): | |
146 indicator = self.getIndicator(indicatorName) | |
147 if indicator is not None: | |
148 return indicator[instant] | |
149 else: | |
150 return None | |
151 | |
152 def getIndicatorValuesAtInstant(self, instant): | |
153 '''Returns list of indicator values at instant | |
154 as dict (with keys from indicators dict)''' | |
155 values = {} | |
156 for k, indicator in self.indicators.items(): | |
157 values[k] = indicator[instant] | |
158 return values | |
159 | |
160 def plot(self, options = '', withOrigin = False, timeStep = 1, withFeatures = False, **kwargs): | |
161 self.roadUser1.plot(options, withOrigin, timeStep, withFeatures, **kwargs) | |
162 self.roadUser2.plot(options, withOrigin, timeStep, withFeatures, **kwargs) | |
163 | |
164 def plotOnWorldImage(self, nPixelsPerUnitDistance, options = '', withOrigin = False, timeStep = 1, **kwargs): | |
165 self.roadUser1.plotOnWorldImage(nPixelsPerUnitDistance, options, withOrigin, timeStep, **kwargs) | |
166 self.roadUser2.plotOnWorldImage(nPixelsPerUnitDistance, options, withOrigin, timeStep, **kwargs) | |
167 | |
168 def play(self, videoFilename, homography = None, undistort = False, intrinsicCameraMatrix = None, distortionCoefficients = None, undistortedImageMultiplication = 1., allUserInstants = False): | |
169 if self.roadUser1 is not None and self.roadUser2 is not None: | |
170 if allUserInstants: | |
171 firstFrameNum = min(self.roadUser1.getFirstInstant(), self.roadUser2.getFirstInstant()) | |
172 lastFrameNum = max(self.roadUser1.getLastInstant(), self.roadUser2.getLastInstant()) | |
173 else: | |
174 firstFrameNum = self.getFirstInstant() | |
175 lastFrameNum = self.getLastInstant() | |
176 cvutils.displayTrajectories(videoFilename, [self.roadUser1, self.roadUser2], homography = homography, firstFrameNum = firstFrameNum, lastFrameNumArg = lastFrameNum, undistort = undistort, intrinsicCameraMatrix = intrinsicCameraMatrix, distortionCoefficients = distortionCoefficients, undistortedImageMultiplication = undistortedImageMultiplication) | |
177 else: | |
178 print('Please set the interaction road user attributes roadUser1 and roadUser1 through the method setRoadUsers') | |
179 | |
180 def computeIndicators(self): | |
181 '''Computes the collision course cosine only if the cosine is positive''' | |
182 collisionCourseDotProducts = {}#[0]*int(self.timeInterval.length()) | |
183 collisionCourseAngles = {} | |
184 velocityAngles = {} | |
185 distances = {}#[0]*int(self.timeInterval.length()) | |
186 speedDifferentials = {} | |
187 interactionInstants = [] | |
188 for instant in self.timeInterval: | |
189 deltap = self.roadUser1.getPositionAtInstant(instant)-self.roadUser2.getPositionAtInstant(instant) | |
190 v1 = self.roadUser1.getVelocityAtInstant(instant) | |
191 v2 = self.roadUser2.getVelocityAtInstant(instant) | |
192 deltav = v2-v1 | |
193 velocityAngles[instant] = np.arccos(moving.Point.dot(v1, v2)/(v1.norm2()*v2.norm2())) | |
194 collisionCourseDotProducts[instant] = moving.Point.dot(deltap, deltav) | |
195 distances[instant] = deltap.norm2() | |
196 speedDifferentials[instant] = deltav.norm2() | |
197 if collisionCourseDotProducts[instant] > 0: | |
198 interactionInstants.append(instant) | |
199 if distances[instant] != 0 and speedDifferentials[instant] != 0: | |
200 collisionCourseAngles[instant] = np.arccos(collisionCourseDotProducts[instant]/(distances[instant]*speedDifferentials[instant])) | |
201 | |
202 if len(interactionInstants) >= 2: | |
203 self.interactionInterval = moving.TimeInterval(interactionInstants[0], interactionInstants[-1]) | |
204 else: | |
205 self.interactionInterval = moving.TimeInterval() | |
206 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[0], collisionCourseDotProducts)) | |
207 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[1], collisionCourseAngles)) | |
208 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[2], distances, mostSevereIsMax = False)) | |
209 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[4], velocityAngles)) | |
210 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[5], speedDifferentials)) | |
211 | |
212 # if we have features, compute other indicators | |
213 if self.roadUser1.hasFeatures() and self.roadUser2.hasFeatures(): | |
214 minDistances={} | |
215 for instant in self.timeInterval: | |
216 minDistances[instant] = moving.MovingObject.minDistance(self.roadUser1, self.roadUser2, instant) | |
217 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[3], minDistances, mostSevereIsMax = False)) | |
218 | |
219 def computeCrossingsCollisions(self, predictionParameters, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None): | |
220 '''Computes all crossing and collision points at each common instant for two road users. ''' | |
221 TTCs = {} | |
222 collisionProbabilities = {} | |
223 if timeInterval is not None: | |
224 commonTimeInterval = timeInterval | |
225 else: | |
226 commonTimeInterval = self.timeInterval | |
227 self.collisionPoints, crossingZones = predictionParameters.computeCrossingsCollisions(self.roadUser1, self.roadUser2, collisionDistanceThreshold, timeHorizon, computeCZ, debug, commonTimeInterval) | |
228 for i, cps in self.collisionPoints.items(): | |
229 TTCs[i] = prediction.SafetyPoint.computeExpectedIndicator(cps) | |
230 collisionProbabilities[i] = sum([p.probability for p in cps]) | |
231 if len(TTCs) > 0: | |
232 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[7], TTCs, mostSevereIsMax=False)) | |
233 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[6], collisionProbabilities)) | |
234 | |
235 # crossing zones and pPET | |
236 if computeCZ: | |
237 self.crossingZones = crossingZones | |
238 pPETs = {} | |
239 for i, cz in self.crossingZones.items(): | |
240 pPETs[i] = prediction.SafetyPoint.computeExpectedIndicator(cz) | |
241 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[9], pPETs, mostSevereIsMax=False)) | |
242 # TODO add probability of collision, and probability of successful evasive action | |
243 | |
244 def computePET(self, collisionDistanceThreshold): | |
245 pet, t1, t2= moving.MovingObject.computePET(self.roadUser1, self.roadUser2, collisionDistanceThreshold) | |
246 if pet is not None: | |
247 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[10], {min(t1, t2): pet}, mostSevereIsMax = False)) | |
248 | |
249 def setCollision(self, collision): | |
250 '''indicates if it is a collision: argument should be boolean''' | |
251 self.collision = collision | |
252 | |
253 def isCollision(self): | |
254 if hasattr(self, 'collision'): | |
255 return self.collision | |
256 else: | |
257 return None | |
258 | |
259 def getCollisionPoints(self): | |
260 return self.collisionPoints | |
261 | |
262 def getCrossingZones(self): | |
263 return self.crossingZones | |
264 | |
265 def createInteractions(objects, _others = None): | |
266 '''Create all interactions of two co-existing road users''' | |
267 if _others is not None: | |
268 others = _others | |
269 | |
270 interactions = [] | |
271 num = 0 | |
272 for i in range(len(objects)): | |
273 if _others is None: | |
274 others = objects[:i] | |
275 for j in range(len(others)): | |
276 commonTimeInterval = objects[i].commonTimeInterval(others[j]) | |
277 if not commonTimeInterval.empty(): | |
278 interactions.append(Interaction(num, commonTimeInterval, objects[i].num, others[j].num, objects[i], others[j])) | |
279 num += 1 | |
280 return interactions | |
281 | |
282 def findInteraction(interactions, roadUserNum1, roadUserNum2): | |
283 'Returns the right interaction in the set' | |
284 i=0 | |
285 while i<len(interactions) and set([roadUserNum1, roadUserNum2]) != interactions[i].getRoadUserNumbers(): | |
286 i+=1 | |
287 if i<len(interactions): | |
288 return interactions[i] | |
289 else: | |
290 return None | |
291 | |
292 def computeIndicators(interactions, computeMotionPrediction, computePET, predictionParameters, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None): | |
293 for inter in interactions: | |
294 print('processing interaction {}'.format(inter.getNum())) # logging.debug('processing interaction {}'.format(inter.getNum())) | |
295 inter.computeIndicators() | |
296 if computeMotionPrediction: | |
297 inter.computeCrossingsCollisions(predictionParameters, collisionDistanceThreshold, timeHorizon, computeCZ, debug, timeInterval) | |
298 if computePET: | |
299 inter.computePET(collisionDistanceThreshold) | |
300 return interactions | |
301 | |
302 def aggregateSafetyPoints(interactions, pointType = 'collision'): | |
303 '''Put all collision points or crossing zones in a list for display''' | |
304 allPoints = [] | |
305 if pointType == 'collision': | |
306 for i in interactions: | |
307 for points in i.collisionPoints.values(): | |
308 allPoints += points | |
309 elif pointType == 'crossing': | |
310 for i in interactions: | |
311 for points in i.crossingZones.values(): | |
312 allPoints += points | |
313 else: | |
314 print('unknown type of point: '+pointType) | |
315 return allPoints | |
316 | |
317 def prototypeCluster(interactions, similarities, indicatorName, minSimilarity, similarityFunc = None, minClusterSize = None, randomInitialization = False): | |
318 return ml.prototypeCluster([inter.getIndicator(indicatorName) for inter in interactions], similarities, minSimilarity, similarityFunc, minClusterSize, randomInitialization) | |
319 | |
320 class Crossing(moving.STObject): | |
321 '''Class for the event of a street crossing | |
322 | |
323 TODO: detecter passage sur la chaussee | |
324 identifier origines et destination (ou uniquement chaussee dans FOV) | |
325 carac traversee | |
326 detecter proximite veh (retirer si trop similaire simultanement | |
327 carac interaction''' | |
328 | |
329 def __init__(self, roaduserNum = None, num = None, timeInterval = None): | |
330 moving.STObject.__init__(self, num, timeInterval) | |
331 self.roaduserNum = roaduserNum | |
332 | |
333 | |
334 | |
335 if __name__ == "__main__": | |
336 import doctest | |
337 import unittest | |
338 suite = doctest.DocFileSuite('tests/events.txt') | |
339 #suite = doctest.DocTestSuite() | |
340 unittest.TextTestRunner().run(suite) | |
341 |