comparison trafficintelligence/indicators.py @ 1028:cc5cb04b04b0

major update using the trafficintelligence package name and install through pip
author Nicolas Saunier <nicolas.saunier@polymtl.ca>
date Fri, 15 Jun 2018 11:19:10 -0400
parents python/indicators.py@933670761a57
children c6cf75a2ed08
comparison
equal deleted inserted replaced
1027:6129296848d3 1028:cc5cb04b04b0
1 #! /usr/bin/env python
2 '''Class for indicators, temporal indicators, and safety indicators'''
3
4 from trafficintelligence import moving
5 #import matplotlib.nxutils as nx
6 from matplotlib.pyplot import plot, ylim
7 from matplotlib.pylab import find
8 from numpy import array, arange, mean, floor, mean
9 from scipy import percentile
10
11 def multivariateName(indicatorNames):
12 return '_'.join(indicatorNames)
13
14 # need for a class representing the indicators, their units, how to print them in graphs...
15 class TemporalIndicator(object):
16 '''Class for temporal indicators
17 i.e. indicators that take a value at specific instants
18
19 values should be
20 * a dict, for the values at specific time instants
21 * or a list with a time interval object if continuous measurements
22
23 it should have more information like name, unit'''
24
25 def __init__(self, name, values, timeInterval = None, maxValue = None):
26 self.name = name
27 if timeInterval is None:
28 self.values = values
29 instants = sorted(self.values.keys())
30 if len(instants) > 0:
31 self.timeInterval = moving.TimeInterval(instants[0], instants[-1])
32 else:
33 self.timeInterval = moving.TimeInterval()
34 else:
35 assert len(values) == timeInterval.length()
36 self.timeInterval = timeInterval
37 self.values = {}
38 for i in range(int(round(self.timeInterval.length()))):
39 self.values[self.timeInterval[i]] = values[i]
40 self.maxValue = maxValue
41
42 def __len__(self):
43 return len(self.values)
44
45 def empty(self):
46 return len(self.values) == 0
47
48 def __getitem__(self, t):
49 'Returns the value at time t'
50 return self.values.get(t)
51
52 def getIthValue(self, i):
53 sortedKeys = sorted(self.values.keys())
54 if 0<=i<len(sortedKeys):
55 return self.values[sortedKeys[i]]
56 else:
57 return None
58
59 def __iter__(self):
60 self.iterInstantNum = 0 # index in the interval or keys of the dict
61 return self
62
63 def __next__(self):
64 if self.iterInstantNum >= len(self.values):#(self.timeInterval and self.iterInstantNum>=self.timeInterval.length())\
65 # or (self.iterInstantNum >= self.values)
66 raise StopIteration
67 else:
68 self.iterInstantNum += 1
69 return self.getIthValue(self.iterInstantNum-1)
70
71 def getTimeInterval(self):
72 return self.timeInterval
73
74 def getName(self):
75 return self.name
76
77 def getValues(self):
78 return [self.__getitem__(t) for t in self.timeInterval]
79
80 def plot(self, options = '', xfactor = 1., yfactor = 1., timeShift = 0, **kwargs):
81 if self.getTimeInterval().length() == 1:
82 marker = 'o'
83 else:
84 marker = ''
85 time = sorted(self.values.keys())
86 plot([(x+timeShift)/xfactor for x in time], [self.values[i]/yfactor for i in time], options+marker, **kwargs)
87 if self.maxValue:
88 ylim(ymax = self.maxValue)
89
90 @classmethod
91 def createMultivariate(cls, indicators):
92 '''Creates a new temporal indicator where the value at each instant is a list
93 of the indicator values at the instant, in the same order
94 the time interval will be the union of the time intervals of the indicators
95 name is concatenation of the indicator names'''
96 if len(indicators) < 2:
97 print('Error creating multivariate indicator with only {} indicator'.format(len(indicators)))
98 return None
99
100 timeInterval = moving.TimeInterval.unionIntervals([indic.getTimeInterval() for indic in indicators])
101 values = {}
102 for t in timeInterval:
103 tmpValues = [indic[t] for indic in indicators]
104 uniqueValues = set(tmpValues)
105 if len(uniqueValues) >= 2 or uniqueValues.pop() is not None:
106 values[t] = tmpValues
107 return cls(multivariateName([indic.name for indic in indicators]), values)
108
109 # TODO static method avec class en parametre pour faire des indicateurs agrege, list par instant
110
111 def l1Distance(x, y): # lambda x,y:abs(x-y)
112 if x is None or y is None:
113 return float('inf')
114 else:
115 return abs(x-y)
116
117 def multiL1Matching(x, y, thresholds, proportionMatching=1.):
118 n = 0
119 nDimensions = len(x)
120 for i in range(nDimensions):
121 if l1Distance(x[i], y[i]) <= thresholds[i]:
122 n += 1
123 return n >= nDimensions*proportionMatching
124
125 from utils import LCSS as utilsLCSS
126
127 class LCSS(utilsLCSS):
128 '''Adapted LCSS class for indicators, same pattern'''
129 def __init__(self, similarityFunc, delta = float('inf'), minLength = 0, aligned = False, lengthFunc = min):
130 utilsLCSS.__init__(self, similarityFunc = similarityFunc, delta = delta, aligned = aligned, lengthFunc = lengthFunc)
131 self.minLength = minLength
132
133 def checkIndicator(self, indicator):
134 return indicator is not None and len(indicator) >= self.minLength
135
136 def compute(self, indicator1, indicator2, computeSubSequence = False):
137 if self.checkIndicator(indicator1) and self.checkIndicator(indicator2):
138 return self._compute(indicator1.getValues(), indicator2.getValues(), computeSubSequence)
139 else:
140 return 0
141
142 def computeNormalized(self, indicator1, indicator2, computeSubSequence = False):
143 if self.checkIndicator(indicator1) and self.checkIndicator(indicator2):
144 return self._computeNormalized(indicator1.getValues(), indicator2.getValues(), computeSubSequence)
145 else:
146 return 0.
147
148 def computeDistance(self, indicator1, indicator2, computeSubSequence = False):
149 if self.checkIndicator(indicator1) and self.checkIndicator(indicator2):
150 return self._computeDistance(indicator1.getValues(), indicator2.getValues(), computeSubSequence)
151 else:
152 return 1.
153
154 class SeverityIndicator(TemporalIndicator):
155 '''Class for severity indicators
156 field mostSevereIsMax is True
157 if the most severe value taken by the indicator is the maximum'''
158
159 def __init__(self, name, values, timeInterval=None, mostSevereIsMax=True, maxValue = None):
160 TemporalIndicator.__init__(self, name, values, timeInterval, maxValue)
161 self.mostSevereIsMax = mostSevereIsMax
162
163 def getMostSevereValue(self, minNInstants=1, centile=15.):
164 '''if there are more than minNInstants observations,
165 returns either the average of these maximum values
166 or if centile is not None the n% centile from the most severe value
167
168 eg for TTC, 15 returns the 15th centile (value such that 15% of observations are lower)'''
169 if self.__len__() < minNInstants:
170 return None
171 else:
172 values = list(self.values.values())
173 if centile is not None:
174 if self.mostSevereIsMax:
175 c = 100-centile
176 else:
177 c = centile
178 return percentile(values, c)
179 else:
180 values = sorted(values, reverse = self.mostSevereIsMax) # inverted if most severe is max -> take the first values
181 return mean(values[:minNInstants])
182
183 def getInstantOfMostSevereValue(self):
184 '''Returns the instant at which the indicator reaches its most severe value'''
185 if self.mostSevereIsMax:
186 return max(self.values, key=self.values.get)
187 else:
188 return min(self.values, key=self.values.get)
189
190 # functions to aggregate discretized maps of indicators
191 # TODO add values in the cells between the positions (similar to discretizing vector graphics to bitmap)
192
193 def indicatorMap(indicatorValues, trajectory, squareSize):
194 '''Returns a dictionary
195 with keys for the indices of the cells (squares)
196 in which the trajectory positions are located
197 at which the indicator values are attached
198
199 ex: speeds and trajectory'''
200
201 assert len(indicatorValues) == trajectory.length()
202 indicatorMap = {}
203 for k in range(trajectory.length()):
204 p = trajectory[k]
205 i = floor(p.x/squareSize)
206 j = floor(p.y/squareSize)
207 if (i,j) in indicatorMap:
208 indicatorMap[(i,j)].append(indicatorValues[k])
209 else:
210 indicatorMap[(i,j)] = [indicatorValues[k]]
211 for k in indicatorMap:
212 indicatorMap[k] = mean(indicatorMap[k])
213 return indicatorMap
214
215 # def indicatorMapFromPolygon(value, polygon, squareSize):
216 # '''Fills an indicator map with the value within the polygon
217 # (array of Nx2 coordinates of the polygon vertices)'''
218 # points = []
219 # for x in arange(min(polygon[:,0])+squareSize/2, max(polygon[:,0]), squareSize):
220 # for y in arange(min(polygon[:,1])+squareSize/2, max(polygon[:,1]), squareSize):
221 # points.append([x,y])
222 # inside = nx.points_inside_poly(array(points), polygon)
223 # indicatorMap = {}
224 # for i in range(len(inside)):
225 # if inside[i]:
226 # indicatorMap[(floor(points[i][0]/squareSize), floor(points[i][1]/squareSize))] = 0
227 # return indicatorMap
228
229 def indicatorMapFromAxis(value, limits, squareSize):
230 '''axis = [xmin, xmax, ymin, ymax] '''
231 indicatorMap = {}
232 for x in arange(limits[0], limits[1], squareSize):
233 for y in arange(limits[2], limits[3], squareSize):
234 indicatorMap[(floor(x/squareSize), floor(y/squareSize))] = value
235 return indicatorMap
236
237 def combineIndicatorMaps(maps, squareSize, combinationFunction):
238 '''Puts many indicator maps together
239 (averaging the values in each cell
240 if more than one maps has a value)'''
241 indicatorMap = {}
242 for m in maps:
243 for k,v in m.items():
244 if k in indicatorMap:
245 indicatorMap[k].append(v)
246 else:
247 indicatorMap[k] = [v]
248 for k in indicatorMap:
249 indicatorMap[k] = combinationFunction(indicatorMap[k])
250 return indicatorMap
251
252 if __name__ == "__main__":
253 import doctest
254 import unittest
255 suite = doctest.DocFileSuite('tests/indicators.txt')
256 unittest.TextTestRunner().run(suite)
257 # #doctest.testmod()
258 # #doctest.testfile("example.txt")