comparison trafficintelligence/events.py @ 1028:cc5cb04b04b0

major update using the trafficintelligence package name and install through pip
author Nicolas Saunier <nicolas.saunier@polymtl.ca>
date Fri, 15 Jun 2018 11:19:10 -0400
parents python/events.py@933670761a57
children 6a8fe3ed3bc6
comparison
equal deleted inserted replaced
1027:6129296848d3 1028:cc5cb04b04b0
1 #! /usr/bin/env python
2 '''Libraries for events
3 Interactions, pedestrian crossing...'''
4
5 from trafficintelligence import moving, prediction, indicators, utils, cvutils, ml
6 from trafficintelligence.base import VideoFilenameAddable
7
8 import numpy as np
9
10 import multiprocessing
11 import itertools, logging
12
13
14 def findRoute(prototypes,objects,i,j,noiseEntryNums,noiseExitNums,minSimilarity= 0.3, spatialThreshold=1.0, delta=180):
15 if i[0] not in noiseEntryNums:
16 prototypesRoutes= [ x for x in sorted(prototypes.keys()) if i[0]==x[0]]
17 elif i[1] not in noiseExitNums:
18 prototypesRoutes=[ x for x in sorted(prototypes.keys()) if i[1]==x[1]]
19 else:
20 prototypesRoutes=[x for x in sorted(prototypes.keys())]
21 routeSim={}
22 lcss = utils.LCSS(similarityFunc=lambda x,y: (distanceForLCSS(x,y) <= spatialThreshold),delta=delta)
23 for y in prototypesRoutes:
24 if y in prototypes:
25 prototypesIDs=prototypes[y]
26 similarity=[]
27 for x in prototypesIDs:
28 s=lcss.computeNormalized(objects[j].positions, objects[x].positions)
29 similarity.append(s)
30 routeSim[y]=max(similarity)
31 route=max(routeSim, key=routeSim.get)
32 if routeSim[route]>=minSimilarity:
33 return route
34 else:
35 return i
36
37 def getRoute(obj,prototypes,objects,noiseEntryNums,noiseExitNums,useDestination=True):
38 route=(obj.startRouteID,obj.endRouteID)
39 if useDestination:
40 if route not in prototypes:
41 route= findRoute(prototypes,objects,route,obj.getNum(),noiseEntryNums,noiseExitNums)
42 return route
43
44 class Interaction(moving.STObject, VideoFilenameAddable):
45 '''Class for an interaction between two road users
46 or a road user and an obstacle
47
48 link to the moving objects
49 contains the indicators in a dictionary with the names as keys
50 '''
51
52 categories = {'Head On': 0,
53 'rearend': 1,
54 'side': 2,
55 'parallel': 3}
56
57 indicatorNames = ['Collision Course Dot Product',
58 'Collision Course Angle',
59 'Distance',
60 'Minimum Distance',
61 'Velocity Angle',
62 'Speed Differential',
63 'Collision Probability',
64 'Time to Collision', # 7
65 'Probability of Successful Evasive Action',
66 'predicted Post Encroachment Time',
67 'Post Encroachment Time']
68
69 indicatorNameToIndices = utils.inverseEnumeration(indicatorNames)
70
71 indicatorShortNames = ['CCDP',
72 'CCA',
73 'Dist',
74 'MinDist',
75 'VA',
76 'SD',
77 'PoC',
78 'TTC',
79 'P(SEA)',
80 'pPET',
81 'PET']
82
83 indicatorUnits = ['',
84 'rad',
85 'm',
86 'm',
87 'rad',
88 'm/s',
89 '',
90 's',
91 '',
92 's',
93 's']
94
95 timeIndicators = ['Time to Collision', 'predicted Post Encroachment Time']
96
97 def __init__(self, num = None, timeInterval = None, roaduserNum1 = None, roaduserNum2 = None, roadUser1 = None, roadUser2 = None, categoryNum = None):
98 moving.STObject.__init__(self, num, timeInterval)
99 if timeInterval is None and roadUser1 is not None and roadUser2 is not None:
100 self.timeInterval = roadUser1.commonTimeInterval(roadUser2)
101 self.roadUser1 = roadUser1
102 self.roadUser2 = roadUser2
103 if roaduserNum1 is not None and roaduserNum2 is not None:
104 self.roadUserNumbers = set([roaduserNum1, roaduserNum2])
105 elif roadUser1 is not None and roadUser2 is not None:
106 self.roadUserNumbers = set([roadUser1.getNum(), roadUser2.getNum()])
107 else:
108 self.roadUserNumbers = None
109 self.categoryNum = categoryNum
110 self.indicators = {}
111 self.interactionInterval = None
112 # list for collison points and crossing zones
113 self.collisionPoints = None
114 self.crossingZones = None
115
116 def getRoadUserNumbers(self):
117 return self.roadUserNumbers
118
119 def setRoadUsers(self, objects):
120 nums = sorted(list(self.getRoadUserNumbers()))
121 if nums[0]<len(objects) and objects[nums[0]].getNum() == nums[0]:
122 self.roadUser1 = objects[nums[0]]
123 if nums[1]<len(objects) and objects[nums[1]].getNum() == nums[1]:
124 self.roadUser2 = objects[nums[1]]
125
126 if self.roadUser1 is None or self.roadUser2 is None:
127 self.roadUser1 = None
128 self.roadUser2 = None
129 i = 0
130 while i < len(objects) and self.roadUser2 is None:
131 if objects[i].getNum() in nums:
132 if self.roadUser1 is None:
133 self.roadUser1 = objects[i]
134 else:
135 self.roadUser2 = objects[i]
136 i += 1
137
138 def getIndicator(self, indicatorName):
139 return self.indicators.get(indicatorName, None)
140
141 def addIndicator(self, indicator):
142 if indicator is not None:
143 self.indicators[indicator.name] = indicator
144
145 def getIndicatorValueAtInstant(self, indicatorName, instant):
146 indicator = self.getIndicator(indicatorName)
147 if indicator is not None:
148 return indicator[instant]
149 else:
150 return None
151
152 def getIndicatorValuesAtInstant(self, instant):
153 '''Returns list of indicator values at instant
154 as dict (with keys from indicators dict)'''
155 values = {}
156 for k, indicator in self.indicators.items():
157 values[k] = indicator[instant]
158 return values
159
160 def plot(self, options = '', withOrigin = False, timeStep = 1, withFeatures = False, **kwargs):
161 self.roadUser1.plot(options, withOrigin, timeStep, withFeatures, **kwargs)
162 self.roadUser2.plot(options, withOrigin, timeStep, withFeatures, **kwargs)
163
164 def plotOnWorldImage(self, nPixelsPerUnitDistance, options = '', withOrigin = False, timeStep = 1, **kwargs):
165 self.roadUser1.plotOnWorldImage(nPixelsPerUnitDistance, options, withOrigin, timeStep, **kwargs)
166 self.roadUser2.plotOnWorldImage(nPixelsPerUnitDistance, options, withOrigin, timeStep, **kwargs)
167
168 def play(self, videoFilename, homography = None, undistort = False, intrinsicCameraMatrix = None, distortionCoefficients = None, undistortedImageMultiplication = 1., allUserInstants = False):
169 if self.roadUser1 is not None and self.roadUser2 is not None:
170 if allUserInstants:
171 firstFrameNum = min(self.roadUser1.getFirstInstant(), self.roadUser2.getFirstInstant())
172 lastFrameNum = max(self.roadUser1.getLastInstant(), self.roadUser2.getLastInstant())
173 else:
174 firstFrameNum = self.getFirstInstant()
175 lastFrameNum = self.getLastInstant()
176 cvutils.displayTrajectories(videoFilename, [self.roadUser1, self.roadUser2], homography = homography, firstFrameNum = firstFrameNum, lastFrameNumArg = lastFrameNum, undistort = undistort, intrinsicCameraMatrix = intrinsicCameraMatrix, distortionCoefficients = distortionCoefficients, undistortedImageMultiplication = undistortedImageMultiplication)
177 else:
178 print('Please set the interaction road user attributes roadUser1 and roadUser1 through the method setRoadUsers')
179
180 def computeIndicators(self):
181 '''Computes the collision course cosine only if the cosine is positive'''
182 collisionCourseDotProducts = {}#[0]*int(self.timeInterval.length())
183 collisionCourseAngles = {}
184 velocityAngles = {}
185 distances = {}#[0]*int(self.timeInterval.length())
186 speedDifferentials = {}
187 interactionInstants = []
188 for instant in self.timeInterval:
189 deltap = self.roadUser1.getPositionAtInstant(instant)-self.roadUser2.getPositionAtInstant(instant)
190 v1 = self.roadUser1.getVelocityAtInstant(instant)
191 v2 = self.roadUser2.getVelocityAtInstant(instant)
192 deltav = v2-v1
193 velocityAngles[instant] = np.arccos(moving.Point.dot(v1, v2)/(v1.norm2()*v2.norm2()))
194 collisionCourseDotProducts[instant] = moving.Point.dot(deltap, deltav)
195 distances[instant] = deltap.norm2()
196 speedDifferentials[instant] = deltav.norm2()
197 if collisionCourseDotProducts[instant] > 0:
198 interactionInstants.append(instant)
199 if distances[instant] != 0 and speedDifferentials[instant] != 0:
200 collisionCourseAngles[instant] = np.arccos(collisionCourseDotProducts[instant]/(distances[instant]*speedDifferentials[instant]))
201
202 if len(interactionInstants) >= 2:
203 self.interactionInterval = moving.TimeInterval(interactionInstants[0], interactionInstants[-1])
204 else:
205 self.interactionInterval = moving.TimeInterval()
206 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[0], collisionCourseDotProducts))
207 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[1], collisionCourseAngles))
208 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[2], distances, mostSevereIsMax = False))
209 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[4], velocityAngles))
210 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[5], speedDifferentials))
211
212 # if we have features, compute other indicators
213 if self.roadUser1.hasFeatures() and self.roadUser2.hasFeatures():
214 minDistances={}
215 for instant in self.timeInterval:
216 minDistances[instant] = moving.MovingObject.minDistance(self.roadUser1, self.roadUser2, instant)
217 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[3], minDistances, mostSevereIsMax = False))
218
219 def computeCrossingsCollisions(self, predictionParameters, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None):
220 '''Computes all crossing and collision points at each common instant for two road users. '''
221 TTCs = {}
222 collisionProbabilities = {}
223 if timeInterval is not None:
224 commonTimeInterval = timeInterval
225 else:
226 commonTimeInterval = self.timeInterval
227 self.collisionPoints, crossingZones = predictionParameters.computeCrossingsCollisions(self.roadUser1, self.roadUser2, collisionDistanceThreshold, timeHorizon, computeCZ, debug, commonTimeInterval)
228 for i, cps in self.collisionPoints.items():
229 TTCs[i] = prediction.SafetyPoint.computeExpectedIndicator(cps)
230 collisionProbabilities[i] = sum([p.probability for p in cps])
231 if len(TTCs) > 0:
232 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[7], TTCs, mostSevereIsMax=False))
233 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[6], collisionProbabilities))
234
235 # crossing zones and pPET
236 if computeCZ:
237 self.crossingZones = crossingZones
238 pPETs = {}
239 for i, cz in self.crossingZones.items():
240 pPETs[i] = prediction.SafetyPoint.computeExpectedIndicator(cz)
241 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[9], pPETs, mostSevereIsMax=False))
242 # TODO add probability of collision, and probability of successful evasive action
243
244 def computePET(self, collisionDistanceThreshold):
245 pet, t1, t2= moving.MovingObject.computePET(self.roadUser1, self.roadUser2, collisionDistanceThreshold)
246 if pet is not None:
247 self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[10], {min(t1, t2): pet}, mostSevereIsMax = False))
248
249 def setCollision(self, collision):
250 '''indicates if it is a collision: argument should be boolean'''
251 self.collision = collision
252
253 def isCollision(self):
254 if hasattr(self, 'collision'):
255 return self.collision
256 else:
257 return None
258
259 def getCollisionPoints(self):
260 return self.collisionPoints
261
262 def getCrossingZones(self):
263 return self.crossingZones
264
265 def createInteractions(objects, _others = None):
266 '''Create all interactions of two co-existing road users'''
267 if _others is not None:
268 others = _others
269
270 interactions = []
271 num = 0
272 for i in range(len(objects)):
273 if _others is None:
274 others = objects[:i]
275 for j in range(len(others)):
276 commonTimeInterval = objects[i].commonTimeInterval(others[j])
277 if not commonTimeInterval.empty():
278 interactions.append(Interaction(num, commonTimeInterval, objects[i].num, others[j].num, objects[i], others[j]))
279 num += 1
280 return interactions
281
282 def findInteraction(interactions, roadUserNum1, roadUserNum2):
283 'Returns the right interaction in the set'
284 i=0
285 while i<len(interactions) and set([roadUserNum1, roadUserNum2]) != interactions[i].getRoadUserNumbers():
286 i+=1
287 if i<len(interactions):
288 return interactions[i]
289 else:
290 return None
291
292 def computeIndicators(interactions, computeMotionPrediction, computePET, predictionParameters, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None):
293 for inter in interactions:
294 print('processing interaction {}'.format(inter.getNum())) # logging.debug('processing interaction {}'.format(inter.getNum()))
295 inter.computeIndicators()
296 if computeMotionPrediction:
297 inter.computeCrossingsCollisions(predictionParameters, collisionDistanceThreshold, timeHorizon, computeCZ, debug, timeInterval)
298 if computePET:
299 inter.computePET(collisionDistanceThreshold)
300 return interactions
301
302 def aggregateSafetyPoints(interactions, pointType = 'collision'):
303 '''Put all collision points or crossing zones in a list for display'''
304 allPoints = []
305 if pointType == 'collision':
306 for i in interactions:
307 for points in i.collisionPoints.values():
308 allPoints += points
309 elif pointType == 'crossing':
310 for i in interactions:
311 for points in i.crossingZones.values():
312 allPoints += points
313 else:
314 print('unknown type of point: '+pointType)
315 return allPoints
316
317 def prototypeCluster(interactions, similarities, indicatorName, minSimilarity, similarityFunc = None, minClusterSize = None, randomInitialization = False):
318 return ml.prototypeCluster([inter.getIndicator(indicatorName) for inter in interactions], similarities, minSimilarity, similarityFunc, minClusterSize, randomInitialization)
319
320 class Crossing(moving.STObject):
321 '''Class for the event of a street crossing
322
323 TODO: detecter passage sur la chaussee
324 identifier origines et destination (ou uniquement chaussee dans FOV)
325 carac traversee
326 detecter proximite veh (retirer si trop similaire simultanement
327 carac interaction'''
328
329 def __init__(self, roaduserNum = None, num = None, timeInterval = None):
330 moving.STObject.__init__(self, num, timeInterval)
331 self.roaduserNum = roaduserNum
332
333
334
335 if __name__ == "__main__":
336 import doctest
337 import unittest
338 suite = doctest.DocFileSuite('tests/events.txt')
339 #suite = doctest.DocTestSuite()
340 unittest.TextTestRunner().run(suite)
341