Mercurial Hosting > traffic-intelligence
diff python/indicators.py @ 244:5027c174ab90
moved indicators to new file, added ExtrapolatedTrajectory class to extrapolation file
author | Nicolas Saunier <nicolas.saunier@polymtl.ca> |
---|---|
date | Tue, 17 Jul 2012 00:15:42 -0400 |
parents | |
children | 8f0ed138d373 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/python/indicators.py Tue Jul 17 00:15:42 2012 -0400 @@ -0,0 +1,171 @@ +#! /usr/bin/env python +'''Class for indicators, temporal indicators, and safety indicators''' + +__metaclass__ = type + +# need for a class representing the indicators, their units, how to print them in graphs... +class TemporalIndicator: + '''Class for temporal indicators + i.e. indicators that take a value at specific instants + + values should be + * a dict, for the values at specific time instants + * or a list with a time interval object if continuous measurements + + it should have more information like name, unit''' + + def __init__(self, name, values, timeInterval=None): + self.name = name + self.isCosine = name.find('Cosine') + self.values = values + self.timeInterval = timeInterval + if timeInterval: + assert len(values) == timeInterval.length() + + def empty(self): + return len(self.values) == 0 + + def __getitem__(self, i): + if self.timeInterval: + if self.timeInterval.contains(i): + return self.values[i-self.timeInterval.first] + else: + if i in self.values.keys(): + return self.values[i] + return None # default + + def __iter__(self): + self.iterInstantNum = 0 # index in the interval or keys of the dict + return self + + def next(self): + if self.iterInstantNum >= len(self.values):#(self.timeInterval and self.iterInstantNum>=self.timeInterval.length())\ + # or (self.iterInstantNum >= self.values) + raise StopIteration + else: + self.iterInstantNum += 1 + if self.timeInterval: + return self.values[self.iterInstantNum-1] + else: + return self.values.values()[self.iterInstantNum-1] + + def getTimeInterval(self): + if not self.timeInterval and type(self.values)==dict: + instants = self.values.keys() + if instants: + self.timeInterval = TimeInterval(instants[0], instants[-1]) + else: + self.timeInterval = TimeInterval() + return self.timeInterval + + def getValues(self): + if self.timeInterval: + return self.values + else: + return self.values.values() + + def getAngleValues(self): + '''if the indicator is a function of an angle, + transform it to an angle (eg cos) + (no transformation otherwise)''' + from numpy import arccos + values = self.getValues() + if self.isCosine >= 0: + return [arccos(c) for c in values] + else: + return values + +class SeverityIndicator(TemporalIndicator): + '''Class for severity indicators + field mostSevereIsMax is True + if the most severe value taken by the indicator is the maximum''' + + def __init__(self, name, values, timeInterval=None, mostSevereIsMax=True, ignoredValue = None): + TemporalIndicator.__init__(self, name, values, timeInterval) + self.mostSevereIsMax = mostSevereIsMax + self.ignoredValue = ignoredValue + + def getMostSevereValue(self, minNInstants=1): + from matplotlib.mlab import find + from numpy.core.multiarray import array + from numpy.core.fromnumeric import mean + values = array(self.values.values()) + if self.ignoredValue: + indices = find(values != self.ignoredValue) + else: + indices = range(len(values)) + if len(indices) >= minNInstants: + values = sorted(values[indices], reverse = self.mostSevereIsMax) # inverted if most severe is max -> take the first values + return mean(values[:minNInstants]) + else: + return None + +# functions to aggregate discretized maps of indicators +# TODO add values in the cells between the positions (similar to discretizing vector graphics to bitmap) + +def indicatorMap(indicatorValues, trajectory, squareSize): + '''Returns a dictionary + with keys for the indices of the cells (squares) + in which the trajectory positions are located + at which the indicator values are attached + + ex: speeds and trajectory''' + + from numpy import floor, mean + assert len(indicatorValues) == trajectory.length() + indicatorMap = {} + for k in xrange(trajectory.length()): + p = trajectory[k] + i = floor(p.x/squareSize) + j = floor(p.y/squareSize) + if indicatorMap.has_key((i,j)): + indicatorMap[(i,j)].append(indicatorValues[k]) + else: + indicatorMap[(i,j)] = [indicatorValues[k]] + for k in indicatorMap.keys(): + indicatorMap[k] = mean(indicatorMap[k]) + return indicatorMap + +def indicatorMapFromPolygon(value, polygon, squareSize): + '''Fills an indicator map with the value within the polygon + (array of Nx2 coordinates of the polygon vertices)''' + import matplotlib.nxutils as nx + from numpy.core.multiarray import array, arange + from numpy import floor + + points = [] + for x in arange(min(polygon[:,0])+squareSize/2, max(polygon[:,0]), squareSize): + for y in arange(min(polygon[:,1])+squareSize/2, max(polygon[:,1]), squareSize): + points.append([x,y]) + inside = nx.points_inside_poly(array(points), polygon) + indicatorMap = {} + for i in xrange(len(inside)): + if inside[i]: + indicatorMap[(floor(points[i][0]/squareSize), floor(points[i][1]/squareSize))] = 0 + return indicatorMap + +def indicatorMapFromAxis(value, limits, squareSize): + '''axis = [xmin, xmax, ymin, ymax] ''' + from numpy.core.multiarray import arange + from numpy import floor + indicatorMap = {} + for x in arange(limits[0], limits[1], squareSize): + for y in arange(limits[2], limits[3], squareSize): + indicatorMap[(floor(x/squareSize), floor(y/squareSize))] = value + return indicatorMap + +def combineIndicatorMaps(maps, squareSize, combinationFunction): + '''Puts many indicator maps together + (averaging the values in each cell + if more than one maps has a value)''' + #from numpy import mean + indicatorMap = {} + for m in maps: + for k,v in m.iteritems(): + if indicatorMap.has_key(k): + indicatorMap[k].append(v) + else: + indicatorMap[k] = [v] + for k in indicatorMap.keys(): + indicatorMap[k] = combinationFunction(indicatorMap[k]) + return indicatorMap