Mercurial Hosting > traffic-intelligence
comparison python/events.py @ 299:7e5fb4abd070
renaming event to events and correcting errors in indicator computation
author | Nicolas Saunier <nicolas.saunier@polymtl.ca> |
---|---|
date | Tue, 12 Feb 2013 18:08:00 -0500 |
parents | python/event.py@1f253f218b9f |
children | 93d851d0d21e |
comparison
equal
deleted
inserted
replaced
298:4a8b6a2de82f | 299:7e5fb4abd070 |
---|---|
1 #! /usr/bin/env python | |
2 '''Libraries for events | |
3 Interactions, pedestrian crossing...''' | |
4 | |
5 import numpy as np | |
6 | |
7 import multiprocessing | |
8 import itertools | |
9 | |
10 import moving | |
11 import prediction | |
12 import indicators | |
13 | |
14 __metaclass__ = type | |
15 | |
16 class Interaction(moving.STObject): | |
17 '''Class for an interaction between two road users | |
18 or a road user and an obstacle | |
19 | |
20 link to the moving objects | |
21 contains the indicators in a dictionary with the names as keys | |
22 ''' | |
23 | |
24 categories = {'headon': 0, | |
25 'rearend': 1, | |
26 'side': 2, | |
27 'parallel': 3} | |
28 | |
29 def __init__(self, num = None, timeInterval = None, roaduserNum1 = None, roaduserNum2 = None, movingObject1 = None, movingObject2 = None, categoryNum = None): | |
30 moving.STObject.__init__(self, num, timeInterval) | |
31 self.roaduserNumbers = set([roaduserNum1, roaduserNum2]) | |
32 self.movingObject1 = movingObject1 | |
33 self.movingObject2 = movingObject2 | |
34 self.categoryNum = categoryNum | |
35 self.indicators = {} | |
36 | |
37 def getIndicator(self, indicatorName): | |
38 return self.indicators[indicatorName] | |
39 | |
40 def addIndicator(self, indicator): | |
41 self.indicators[indicator.name] = indicator | |
42 | |
43 def computeIndicators(self): | |
44 '''Computes the collision course cosine only if the cosine is positive''' | |
45 collisionCourseDotProducts = {}#[0]*int(self.timeInterval.length()) | |
46 collisionCourseCosines = {} | |
47 distances = {}#[0]*int(self.timeInterval.length()) | |
48 speedDifferentials = {} | |
49 for instant in self.timeInterval: | |
50 deltap = self.movingObject1.getPositionAtInstant(instant)-self.movingObject2.getPositionAtInstant(instant) | |
51 deltav = self.movingObject2.getVelocityAtInstant(instant)-self.movingObject1.getVelocityAtInstant(instant) | |
52 collisionCourseDotProducts[instant] = moving.Point.dot(deltap, deltav) | |
53 distances[instant] = deltap.norm2() | |
54 speedDifferentials[instant] = deltav.norm2() | |
55 if collisionCourseDotProducts[instant] > 0: | |
56 collisionCourseCosines[instant] = collisionCourseDotProducts[instant]/(distances[instant]*speedDifferentials[instant]) | |
57 # todo shorten the time intervals based on the interaction definition | |
58 # todos change cosines to angles | |
59 self.addIndicator(indicators.SeverityIndicator('Collision Course Dot Product', collisionCourseDotProducts)) | |
60 self.addIndicator(indicators.SeverityIndicator('Distance', distances)) | |
61 self.addIndicator(indicators.SeverityIndicator('Speed Differential', speedDifferentials)) | |
62 self.addIndicator(indicators.SeverityIndicator('Collision Course Cosine', collisionCourseCosines)) | |
63 | |
64 | |
65 def createInteractions(objects): | |
66 '''Create all interactions of two co-existing road users | |
67 | |
68 todo add test to compute categories?''' | |
69 interactions = [] | |
70 num = 0 | |
71 for i in xrange(len(objects)): | |
72 for j in xrange(i): | |
73 commonTimeInterval = objects[i].commonTimeInterval(objects[j]) | |
74 if not commonTimeInterval.empty(): | |
75 interactions.append(Interaction(num, commonTimeInterval, objects[i].num, objects[j].num, objects[i], objects[j])) | |
76 num += 1 | |
77 return interactions | |
78 | |
79 | |
80 # TODO: | |
81 #http://stackoverflow.com/questions/3288595/multiprocessing-using-pool-map-on-a-function-defined-in-a-class | |
82 #http://www.rueckstiess.net/research/snippets/show/ca1d7d90 | |
83 def calculateIndicatorPipe(pairs, predParam, timeHorizon=75,collisionDistanceThreshold=1.8): | |
84 collisionPoints, crossingZones = prediction.computeCrossingsCollisions(pairs.movingObject1, pairs.movingObject2, predParam, collisionDistanceThreshold, timeHorizon) | |
85 #print pairs.num | |
86 # Ignore empty collision points | |
87 empty = 1 | |
88 for i in collisionPoints: | |
89 if(collisionPoints[i] != []): | |
90 empty = 0 | |
91 if(empty == 1): | |
92 pairs.hasCP = 0 | |
93 else: | |
94 pairs.hasCP = 1 | |
95 pairs.CP = collisionPoints | |
96 | |
97 # Ignore empty crossing zones | |
98 empty = 1 | |
99 for i in crossingZones: | |
100 if(crossingZones[i] != []): | |
101 empty = 0 | |
102 if(empty == 1): | |
103 pairs.hasCZ = 0 | |
104 else: | |
105 pairs.hasCZ = 1 | |
106 pairs.CZ = crossingZones | |
107 return pairs | |
108 | |
109 def calculateIndicatorPipe_star(a_b): | |
110 """Convert `f([1,2])` to `f(1,2)` call.""" | |
111 return calculateIndicatorPipe(*a_b) | |
112 | |
113 class VehPairs(): | |
114 '''Create a veh-pairs object from objects list''' | |
115 def __init__(self,objects): | |
116 self.pairs = createInteractions(objects) | |
117 self.interactionCount = 0 | |
118 self.CPcount = 0 | |
119 self.CZcount = 0 | |
120 | |
121 # Process indicator calculation with support for multi-threading | |
122 def calculateIndicators(self,predParam,threads=1,timeHorizon=75,collisionDistanceThreshold=1.8): | |
123 if(threads > 1): | |
124 pool = multiprocessing.Pool(threads) | |
125 self.pairs = pool.map(calculateIndicatorPipe_star, itertools.izip(self.pairs, itertools.repeat(predParam))) | |
126 pool.close() | |
127 else: | |
128 #prog = Tools.ProgressBar(0, len(self.pairs), 77) #Removed in traffic-intelligenc port | |
129 for j in xrange(len(self.pairs)): | |
130 #prog.updateAmount(j) #Removed in traffic-intelligenc port | |
131 collisionPoints, crossingZones = prediction.computeCrossingsCollisions(self.pairs[j].movingObject1, self.pairs[j].movingObject2, predParam, collisionDistanceThreshold, timeHorizon) | |
132 | |
133 # Ignore empty collision points | |
134 empty = 1 | |
135 for i in collisionPoints: | |
136 if(collisionPoints[i] != []): | |
137 empty = 0 | |
138 if(empty == 1): | |
139 self.pairs[j].hasCP = 0 | |
140 else: | |
141 self.pairs[j].hasCP = 1 | |
142 self.pairs[j].CP = collisionPoints | |
143 | |
144 # Ignore empty crossing zones | |
145 empty = 1 | |
146 for i in crossingZones: | |
147 if(crossingZones[i] != []): | |
148 empty = 0 | |
149 if(empty == 1): | |
150 self.pairs[j].hasCZ = 0 | |
151 else: | |
152 self.pairs[j].hasCZ = 1 | |
153 self.pairs[j].CZ = crossingZones | |
154 | |
155 for j in self.pairs: | |
156 self.interactionCount = self.interactionCount + len(j.CP) | |
157 self.CPcount = len(self.getCPlist()) | |
158 self.Czcount = len(self.getCZlist()) | |
159 | |
160 | |
161 def getPairsWCP(self): | |
162 lists = [] | |
163 for j in self.pairs: | |
164 if(j.hasCP): | |
165 lists.append(j.num) | |
166 return lists | |
167 | |
168 def getPairsWCZ(self): | |
169 lists = [] | |
170 for j in self.pairs: | |
171 if(j.hasCZ): | |
172 lists.append(j.num) | |
173 return lists | |
174 | |
175 def getCPlist(self,indicatorThreshold=99999): | |
176 lists = [] | |
177 for j in self.pairs: | |
178 if(j.hasCP): | |
179 for k in j.CP: | |
180 if(j.CP[k] != [] and j.CP[k][0].indicator < indicatorThreshold): | |
181 lists.append([k,j.CP[k][0]]) | |
182 return lists | |
183 | |
184 def getCZlist(self,indicatorThreshold=99999): | |
185 lists = [] | |
186 for j in self.pairs: | |
187 if(j.hasCZ): | |
188 for k in j.CZ: | |
189 if(j.CZ[k] != [] and j.CZ[k][0].indicator < indicatorThreshold): | |
190 lists.append([k,j.CZ[k][0]]) | |
191 return lists | |
192 | |
193 def genIndicatorHistogram(self, CPlist=False, bins=range(0,100,1)): | |
194 if(not CPlist): | |
195 CPlist = self.getCPlist() | |
196 if(not CPlist): | |
197 return False | |
198 TTC_list = [] | |
199 for i in CPlist: | |
200 TTC_list.append(i[1].indicator) | |
201 histo = np.histogram(TTC_list,bins=bins) | |
202 histo += (histo[0].astype(float)/np.sum(histo[0]),) | |
203 return histo | |
204 | |
205 class Crossing(moving.STObject): | |
206 '''Class for the event of a street crossing | |
207 | |
208 TODO: detecter passage sur la chaussee | |
209 identifier origines et destination (ou uniquement chaussee dans FOV) | |
210 carac traversee | |
211 detecter proximite veh (retirer si trop similaire simultanement | |
212 carac interaction''' | |
213 | |
214 def __init__(self, roaduserNum = None, num = None, timeInterval = None): | |
215 moving.STObject.__init__(self, num, timeInterval) | |
216 self.roaduserNum = roaduserNum | |
217 | |
218 | |
219 | |
220 if __name__ == "__main__": | |
221 import doctest | |
222 import unittest | |
223 #suite = doctest.DocFileSuite('tests/moving.txt') | |
224 suite = doctest.DocTestSuite() | |
225 unittest.TextTestRunner().run(suite) | |
226 |