comparison python/events.py @ 299:7e5fb4abd070

renaming event to events and correcting errors in indicator computation
author Nicolas Saunier <nicolas.saunier@polymtl.ca>
date Tue, 12 Feb 2013 18:08:00 -0500
parents python/event.py@1f253f218b9f
children 93d851d0d21e
comparison
equal deleted inserted replaced
298:4a8b6a2de82f 299:7e5fb4abd070
1 #! /usr/bin/env python
2 '''Libraries for events
3 Interactions, pedestrian crossing...'''
4
5 import numpy as np
6
7 import multiprocessing
8 import itertools
9
10 import moving
11 import prediction
12 import indicators
13
14 __metaclass__ = type
15
16 class Interaction(moving.STObject):
17 '''Class for an interaction between two road users
18 or a road user and an obstacle
19
20 link to the moving objects
21 contains the indicators in a dictionary with the names as keys
22 '''
23
24 categories = {'headon': 0,
25 'rearend': 1,
26 'side': 2,
27 'parallel': 3}
28
29 def __init__(self, num = None, timeInterval = None, roaduserNum1 = None, roaduserNum2 = None, movingObject1 = None, movingObject2 = None, categoryNum = None):
30 moving.STObject.__init__(self, num, timeInterval)
31 self.roaduserNumbers = set([roaduserNum1, roaduserNum2])
32 self.movingObject1 = movingObject1
33 self.movingObject2 = movingObject2
34 self.categoryNum = categoryNum
35 self.indicators = {}
36
37 def getIndicator(self, indicatorName):
38 return self.indicators[indicatorName]
39
40 def addIndicator(self, indicator):
41 self.indicators[indicator.name] = indicator
42
43 def computeIndicators(self):
44 '''Computes the collision course cosine only if the cosine is positive'''
45 collisionCourseDotProducts = {}#[0]*int(self.timeInterval.length())
46 collisionCourseCosines = {}
47 distances = {}#[0]*int(self.timeInterval.length())
48 speedDifferentials = {}
49 for instant in self.timeInterval:
50 deltap = self.movingObject1.getPositionAtInstant(instant)-self.movingObject2.getPositionAtInstant(instant)
51 deltav = self.movingObject2.getVelocityAtInstant(instant)-self.movingObject1.getVelocityAtInstant(instant)
52 collisionCourseDotProducts[instant] = moving.Point.dot(deltap, deltav)
53 distances[instant] = deltap.norm2()
54 speedDifferentials[instant] = deltav.norm2()
55 if collisionCourseDotProducts[instant] > 0:
56 collisionCourseCosines[instant] = collisionCourseDotProducts[instant]/(distances[instant]*speedDifferentials[instant])
57 # todo shorten the time intervals based on the interaction definition
58 # todos change cosines to angles
59 self.addIndicator(indicators.SeverityIndicator('Collision Course Dot Product', collisionCourseDotProducts))
60 self.addIndicator(indicators.SeverityIndicator('Distance', distances))
61 self.addIndicator(indicators.SeverityIndicator('Speed Differential', speedDifferentials))
62 self.addIndicator(indicators.SeverityIndicator('Collision Course Cosine', collisionCourseCosines))
63
64
65 def createInteractions(objects):
66 '''Create all interactions of two co-existing road users
67
68 todo add test to compute categories?'''
69 interactions = []
70 num = 0
71 for i in xrange(len(objects)):
72 for j in xrange(i):
73 commonTimeInterval = objects[i].commonTimeInterval(objects[j])
74 if not commonTimeInterval.empty():
75 interactions.append(Interaction(num, commonTimeInterval, objects[i].num, objects[j].num, objects[i], objects[j]))
76 num += 1
77 return interactions
78
79
80 # TODO:
81 #http://stackoverflow.com/questions/3288595/multiprocessing-using-pool-map-on-a-function-defined-in-a-class
82 #http://www.rueckstiess.net/research/snippets/show/ca1d7d90
83 def calculateIndicatorPipe(pairs, predParam, timeHorizon=75,collisionDistanceThreshold=1.8):
84 collisionPoints, crossingZones = prediction.computeCrossingsCollisions(pairs.movingObject1, pairs.movingObject2, predParam, collisionDistanceThreshold, timeHorizon)
85 #print pairs.num
86 # Ignore empty collision points
87 empty = 1
88 for i in collisionPoints:
89 if(collisionPoints[i] != []):
90 empty = 0
91 if(empty == 1):
92 pairs.hasCP = 0
93 else:
94 pairs.hasCP = 1
95 pairs.CP = collisionPoints
96
97 # Ignore empty crossing zones
98 empty = 1
99 for i in crossingZones:
100 if(crossingZones[i] != []):
101 empty = 0
102 if(empty == 1):
103 pairs.hasCZ = 0
104 else:
105 pairs.hasCZ = 1
106 pairs.CZ = crossingZones
107 return pairs
108
109 def calculateIndicatorPipe_star(a_b):
110 """Convert `f([1,2])` to `f(1,2)` call."""
111 return calculateIndicatorPipe(*a_b)
112
113 class VehPairs():
114 '''Create a veh-pairs object from objects list'''
115 def __init__(self,objects):
116 self.pairs = createInteractions(objects)
117 self.interactionCount = 0
118 self.CPcount = 0
119 self.CZcount = 0
120
121 # Process indicator calculation with support for multi-threading
122 def calculateIndicators(self,predParam,threads=1,timeHorizon=75,collisionDistanceThreshold=1.8):
123 if(threads > 1):
124 pool = multiprocessing.Pool(threads)
125 self.pairs = pool.map(calculateIndicatorPipe_star, itertools.izip(self.pairs, itertools.repeat(predParam)))
126 pool.close()
127 else:
128 #prog = Tools.ProgressBar(0, len(self.pairs), 77) #Removed in traffic-intelligenc port
129 for j in xrange(len(self.pairs)):
130 #prog.updateAmount(j) #Removed in traffic-intelligenc port
131 collisionPoints, crossingZones = prediction.computeCrossingsCollisions(self.pairs[j].movingObject1, self.pairs[j].movingObject2, predParam, collisionDistanceThreshold, timeHorizon)
132
133 # Ignore empty collision points
134 empty = 1
135 for i in collisionPoints:
136 if(collisionPoints[i] != []):
137 empty = 0
138 if(empty == 1):
139 self.pairs[j].hasCP = 0
140 else:
141 self.pairs[j].hasCP = 1
142 self.pairs[j].CP = collisionPoints
143
144 # Ignore empty crossing zones
145 empty = 1
146 for i in crossingZones:
147 if(crossingZones[i] != []):
148 empty = 0
149 if(empty == 1):
150 self.pairs[j].hasCZ = 0
151 else:
152 self.pairs[j].hasCZ = 1
153 self.pairs[j].CZ = crossingZones
154
155 for j in self.pairs:
156 self.interactionCount = self.interactionCount + len(j.CP)
157 self.CPcount = len(self.getCPlist())
158 self.Czcount = len(self.getCZlist())
159
160
161 def getPairsWCP(self):
162 lists = []
163 for j in self.pairs:
164 if(j.hasCP):
165 lists.append(j.num)
166 return lists
167
168 def getPairsWCZ(self):
169 lists = []
170 for j in self.pairs:
171 if(j.hasCZ):
172 lists.append(j.num)
173 return lists
174
175 def getCPlist(self,indicatorThreshold=99999):
176 lists = []
177 for j in self.pairs:
178 if(j.hasCP):
179 for k in j.CP:
180 if(j.CP[k] != [] and j.CP[k][0].indicator < indicatorThreshold):
181 lists.append([k,j.CP[k][0]])
182 return lists
183
184 def getCZlist(self,indicatorThreshold=99999):
185 lists = []
186 for j in self.pairs:
187 if(j.hasCZ):
188 for k in j.CZ:
189 if(j.CZ[k] != [] and j.CZ[k][0].indicator < indicatorThreshold):
190 lists.append([k,j.CZ[k][0]])
191 return lists
192
193 def genIndicatorHistogram(self, CPlist=False, bins=range(0,100,1)):
194 if(not CPlist):
195 CPlist = self.getCPlist()
196 if(not CPlist):
197 return False
198 TTC_list = []
199 for i in CPlist:
200 TTC_list.append(i[1].indicator)
201 histo = np.histogram(TTC_list,bins=bins)
202 histo += (histo[0].astype(float)/np.sum(histo[0]),)
203 return histo
204
205 class Crossing(moving.STObject):
206 '''Class for the event of a street crossing
207
208 TODO: detecter passage sur la chaussee
209 identifier origines et destination (ou uniquement chaussee dans FOV)
210 carac traversee
211 detecter proximite veh (retirer si trop similaire simultanement
212 carac interaction'''
213
214 def __init__(self, roaduserNum = None, num = None, timeInterval = None):
215 moving.STObject.__init__(self, num, timeInterval)
216 self.roaduserNum = roaduserNum
217
218
219
220 if __name__ == "__main__":
221 import doctest
222 import unittest
223 #suite = doctest.DocFileSuite('tests/moving.txt')
224 suite = doctest.DocTestSuite()
225 unittest.TextTestRunner().run(suite)
226