Mercurial Hosting > traffic-intelligence
comparison trafficintelligence/indicators.py @ 1028:cc5cb04b04b0
major update using the trafficintelligence package name and install through pip
author | Nicolas Saunier <nicolas.saunier@polymtl.ca> |
---|---|
date | Fri, 15 Jun 2018 11:19:10 -0400 |
parents | python/indicators.py@933670761a57 |
children | c6cf75a2ed08 |
comparison
equal
deleted
inserted
replaced
1027:6129296848d3 | 1028:cc5cb04b04b0 |
---|---|
1 #! /usr/bin/env python | |
2 '''Class for indicators, temporal indicators, and safety indicators''' | |
3 | |
4 from trafficintelligence import moving | |
5 #import matplotlib.nxutils as nx | |
6 from matplotlib.pyplot import plot, ylim | |
7 from matplotlib.pylab import find | |
8 from numpy import array, arange, mean, floor, mean | |
9 from scipy import percentile | |
10 | |
11 def multivariateName(indicatorNames): | |
12 return '_'.join(indicatorNames) | |
13 | |
14 # need for a class representing the indicators, their units, how to print them in graphs... | |
15 class TemporalIndicator(object): | |
16 '''Class for temporal indicators | |
17 i.e. indicators that take a value at specific instants | |
18 | |
19 values should be | |
20 * a dict, for the values at specific time instants | |
21 * or a list with a time interval object if continuous measurements | |
22 | |
23 it should have more information like name, unit''' | |
24 | |
25 def __init__(self, name, values, timeInterval = None, maxValue = None): | |
26 self.name = name | |
27 if timeInterval is None: | |
28 self.values = values | |
29 instants = sorted(self.values.keys()) | |
30 if len(instants) > 0: | |
31 self.timeInterval = moving.TimeInterval(instants[0], instants[-1]) | |
32 else: | |
33 self.timeInterval = moving.TimeInterval() | |
34 else: | |
35 assert len(values) == timeInterval.length() | |
36 self.timeInterval = timeInterval | |
37 self.values = {} | |
38 for i in range(int(round(self.timeInterval.length()))): | |
39 self.values[self.timeInterval[i]] = values[i] | |
40 self.maxValue = maxValue | |
41 | |
42 def __len__(self): | |
43 return len(self.values) | |
44 | |
45 def empty(self): | |
46 return len(self.values) == 0 | |
47 | |
48 def __getitem__(self, t): | |
49 'Returns the value at time t' | |
50 return self.values.get(t) | |
51 | |
52 def getIthValue(self, i): | |
53 sortedKeys = sorted(self.values.keys()) | |
54 if 0<=i<len(sortedKeys): | |
55 return self.values[sortedKeys[i]] | |
56 else: | |
57 return None | |
58 | |
59 def __iter__(self): | |
60 self.iterInstantNum = 0 # index in the interval or keys of the dict | |
61 return self | |
62 | |
63 def __next__(self): | |
64 if self.iterInstantNum >= len(self.values):#(self.timeInterval and self.iterInstantNum>=self.timeInterval.length())\ | |
65 # or (self.iterInstantNum >= self.values) | |
66 raise StopIteration | |
67 else: | |
68 self.iterInstantNum += 1 | |
69 return self.getIthValue(self.iterInstantNum-1) | |
70 | |
71 def getTimeInterval(self): | |
72 return self.timeInterval | |
73 | |
74 def getName(self): | |
75 return self.name | |
76 | |
77 def getValues(self): | |
78 return [self.__getitem__(t) for t in self.timeInterval] | |
79 | |
80 def plot(self, options = '', xfactor = 1., yfactor = 1., timeShift = 0, **kwargs): | |
81 if self.getTimeInterval().length() == 1: | |
82 marker = 'o' | |
83 else: | |
84 marker = '' | |
85 time = sorted(self.values.keys()) | |
86 plot([(x+timeShift)/xfactor for x in time], [self.values[i]/yfactor for i in time], options+marker, **kwargs) | |
87 if self.maxValue: | |
88 ylim(ymax = self.maxValue) | |
89 | |
90 @classmethod | |
91 def createMultivariate(cls, indicators): | |
92 '''Creates a new temporal indicator where the value at each instant is a list | |
93 of the indicator values at the instant, in the same order | |
94 the time interval will be the union of the time intervals of the indicators | |
95 name is concatenation of the indicator names''' | |
96 if len(indicators) < 2: | |
97 print('Error creating multivariate indicator with only {} indicator'.format(len(indicators))) | |
98 return None | |
99 | |
100 timeInterval = moving.TimeInterval.unionIntervals([indic.getTimeInterval() for indic in indicators]) | |
101 values = {} | |
102 for t in timeInterval: | |
103 tmpValues = [indic[t] for indic in indicators] | |
104 uniqueValues = set(tmpValues) | |
105 if len(uniqueValues) >= 2 or uniqueValues.pop() is not None: | |
106 values[t] = tmpValues | |
107 return cls(multivariateName([indic.name for indic in indicators]), values) | |
108 | |
109 # TODO static method avec class en parametre pour faire des indicateurs agrege, list par instant | |
110 | |
111 def l1Distance(x, y): # lambda x,y:abs(x-y) | |
112 if x is None or y is None: | |
113 return float('inf') | |
114 else: | |
115 return abs(x-y) | |
116 | |
117 def multiL1Matching(x, y, thresholds, proportionMatching=1.): | |
118 n = 0 | |
119 nDimensions = len(x) | |
120 for i in range(nDimensions): | |
121 if l1Distance(x[i], y[i]) <= thresholds[i]: | |
122 n += 1 | |
123 return n >= nDimensions*proportionMatching | |
124 | |
125 from utils import LCSS as utilsLCSS | |
126 | |
127 class LCSS(utilsLCSS): | |
128 '''Adapted LCSS class for indicators, same pattern''' | |
129 def __init__(self, similarityFunc, delta = float('inf'), minLength = 0, aligned = False, lengthFunc = min): | |
130 utilsLCSS.__init__(self, similarityFunc = similarityFunc, delta = delta, aligned = aligned, lengthFunc = lengthFunc) | |
131 self.minLength = minLength | |
132 | |
133 def checkIndicator(self, indicator): | |
134 return indicator is not None and len(indicator) >= self.minLength | |
135 | |
136 def compute(self, indicator1, indicator2, computeSubSequence = False): | |
137 if self.checkIndicator(indicator1) and self.checkIndicator(indicator2): | |
138 return self._compute(indicator1.getValues(), indicator2.getValues(), computeSubSequence) | |
139 else: | |
140 return 0 | |
141 | |
142 def computeNormalized(self, indicator1, indicator2, computeSubSequence = False): | |
143 if self.checkIndicator(indicator1) and self.checkIndicator(indicator2): | |
144 return self._computeNormalized(indicator1.getValues(), indicator2.getValues(), computeSubSequence) | |
145 else: | |
146 return 0. | |
147 | |
148 def computeDistance(self, indicator1, indicator2, computeSubSequence = False): | |
149 if self.checkIndicator(indicator1) and self.checkIndicator(indicator2): | |
150 return self._computeDistance(indicator1.getValues(), indicator2.getValues(), computeSubSequence) | |
151 else: | |
152 return 1. | |
153 | |
154 class SeverityIndicator(TemporalIndicator): | |
155 '''Class for severity indicators | |
156 field mostSevereIsMax is True | |
157 if the most severe value taken by the indicator is the maximum''' | |
158 | |
159 def __init__(self, name, values, timeInterval=None, mostSevereIsMax=True, maxValue = None): | |
160 TemporalIndicator.__init__(self, name, values, timeInterval, maxValue) | |
161 self.mostSevereIsMax = mostSevereIsMax | |
162 | |
163 def getMostSevereValue(self, minNInstants=1, centile=15.): | |
164 '''if there are more than minNInstants observations, | |
165 returns either the average of these maximum values | |
166 or if centile is not None the n% centile from the most severe value | |
167 | |
168 eg for TTC, 15 returns the 15th centile (value such that 15% of observations are lower)''' | |
169 if self.__len__() < minNInstants: | |
170 return None | |
171 else: | |
172 values = list(self.values.values()) | |
173 if centile is not None: | |
174 if self.mostSevereIsMax: | |
175 c = 100-centile | |
176 else: | |
177 c = centile | |
178 return percentile(values, c) | |
179 else: | |
180 values = sorted(values, reverse = self.mostSevereIsMax) # inverted if most severe is max -> take the first values | |
181 return mean(values[:minNInstants]) | |
182 | |
183 def getInstantOfMostSevereValue(self): | |
184 '''Returns the instant at which the indicator reaches its most severe value''' | |
185 if self.mostSevereIsMax: | |
186 return max(self.values, key=self.values.get) | |
187 else: | |
188 return min(self.values, key=self.values.get) | |
189 | |
190 # functions to aggregate discretized maps of indicators | |
191 # TODO add values in the cells between the positions (similar to discretizing vector graphics to bitmap) | |
192 | |
193 def indicatorMap(indicatorValues, trajectory, squareSize): | |
194 '''Returns a dictionary | |
195 with keys for the indices of the cells (squares) | |
196 in which the trajectory positions are located | |
197 at which the indicator values are attached | |
198 | |
199 ex: speeds and trajectory''' | |
200 | |
201 assert len(indicatorValues) == trajectory.length() | |
202 indicatorMap = {} | |
203 for k in range(trajectory.length()): | |
204 p = trajectory[k] | |
205 i = floor(p.x/squareSize) | |
206 j = floor(p.y/squareSize) | |
207 if (i,j) in indicatorMap: | |
208 indicatorMap[(i,j)].append(indicatorValues[k]) | |
209 else: | |
210 indicatorMap[(i,j)] = [indicatorValues[k]] | |
211 for k in indicatorMap: | |
212 indicatorMap[k] = mean(indicatorMap[k]) | |
213 return indicatorMap | |
214 | |
215 # def indicatorMapFromPolygon(value, polygon, squareSize): | |
216 # '''Fills an indicator map with the value within the polygon | |
217 # (array of Nx2 coordinates of the polygon vertices)''' | |
218 # points = [] | |
219 # for x in arange(min(polygon[:,0])+squareSize/2, max(polygon[:,0]), squareSize): | |
220 # for y in arange(min(polygon[:,1])+squareSize/2, max(polygon[:,1]), squareSize): | |
221 # points.append([x,y]) | |
222 # inside = nx.points_inside_poly(array(points), polygon) | |
223 # indicatorMap = {} | |
224 # for i in range(len(inside)): | |
225 # if inside[i]: | |
226 # indicatorMap[(floor(points[i][0]/squareSize), floor(points[i][1]/squareSize))] = 0 | |
227 # return indicatorMap | |
228 | |
229 def indicatorMapFromAxis(value, limits, squareSize): | |
230 '''axis = [xmin, xmax, ymin, ymax] ''' | |
231 indicatorMap = {} | |
232 for x in arange(limits[0], limits[1], squareSize): | |
233 for y in arange(limits[2], limits[3], squareSize): | |
234 indicatorMap[(floor(x/squareSize), floor(y/squareSize))] = value | |
235 return indicatorMap | |
236 | |
237 def combineIndicatorMaps(maps, squareSize, combinationFunction): | |
238 '''Puts many indicator maps together | |
239 (averaging the values in each cell | |
240 if more than one maps has a value)''' | |
241 indicatorMap = {} | |
242 for m in maps: | |
243 for k,v in m.items(): | |
244 if k in indicatorMap: | |
245 indicatorMap[k].append(v) | |
246 else: | |
247 indicatorMap[k] = [v] | |
248 for k in indicatorMap: | |
249 indicatorMap[k] = combinationFunction(indicatorMap[k]) | |
250 return indicatorMap | |
251 | |
252 if __name__ == "__main__": | |
253 import doctest | |
254 import unittest | |
255 suite = doctest.DocFileSuite('tests/indicators.txt') | |
256 unittest.TextTestRunner().run(suite) | |
257 # #doctest.testmod() | |
258 # #doctest.testfile("example.txt") |