Mercurial Hosting > traffic-intelligence
view python/indicators.py @ 361:9d486af42e49
added files to ignore given the CMakeLists.txt
author | Nicolas Saunier <nicolas.saunier@polymtl.ca> |
---|---|
date | Thu, 11 Jul 2013 15:21:13 -0400 |
parents | 2f39c4ed0b62 |
children | 2db4e76599a1 |
line wrap: on
line source
#! /usr/bin/env python '''Class for indicators, temporal indicators, and safety indicators''' __metaclass__ = type import moving # need for a class representing the indicators, their units, how to print them in graphs... class TemporalIndicator: '''Class for temporal indicators i.e. indicators that take a value at specific instants values should be * a dict, for the values at specific time instants * or a list with a time interval object if continuous measurements it should have more information like name, unit''' def __init__(self, name, values, timeInterval=None, maxValue = None): self.name = name if timeInterval: assert len(values) == timeInterval.length() self.timeInterval = timeInterval self.values = {} for i in xrange(int(round(self.timeInterval.length()))): self.values[self.timeInterval[i]] = values[i] else: self.values = values instants = sorted(self.values.keys()) if instants: self.timeInterval = moving.TimeInterval(instants[0], instants[-1]) else: self.timeInterval = moving.TimeInterval() self.maxValue = maxValue def __len__(self): return len(self.values) def empty(self): return len(self.values) == 0 def __getitem__(self, i): 'Returns ith value in time interval' if i in self.values.keys(): return self.values[i] else: return None def getIthValue(self, i): sortedKeys = sorted(self.values.keys()) if 0<=i<len(sortedKeys): return self.values[sortedKeys[i]] else: return None def __iter__(self): self.iterInstantNum = 0 # index in the interval or keys of the dict return self def next(self): if self.iterInstantNum >= len(self.values):#(self.timeInterval and self.iterInstantNum>=self.timeInterval.length())\ # or (self.iterInstantNum >= self.values) raise StopIteration else: self.iterInstantNum += 1 return self.getIthValue(self.iterInstantNum-1) def getTimeInterval(self): return self.timeInterval def getName(self): return self.name def getValues(self): return [self.__getitem__(t) for t in self.timeInterval] def plot(self, options = '', xfactor = 1., **kwargs): from matplotlib.pylab import plot,ylim if self.getTimeInterval().length() == 1: marker = 'o' else: marker = '' time = sorted(self.values.keys()) plot([x/xfactor for x in time], [self.values[i] for i in time], options+marker, **kwargs) if self.maxValue: ylim(ymax = self.maxValue) def valueSorted(self): ''' return the values after sort the keys in the indicator This should probably not be used: to delete''' print('Deprecated: values should not be accessed in this way') values=[] keys = self.values.keys() keys.sort() for key in keys: values.append(self.values[key]) return values @staticmethod def getDLCSS(indic1, indic2, threshold, delta = float('inf') , method ='min' ): ''' compute the distance between two indicators using LCSS two common methods are used: min or mean of the indicators length''' print('Deprecated: this is not appropriate method for indicator comparison') l1 = indic1.valueSorted l2 = indic2.valueSorted DLCSS = None if method == 'min': DLCSS = 1- (LCSS(l1,l2, threshold, delta, distance))/min(len(l1),len(l2)) if method == 'mean': average = len(l1)+len(l2)/2 DLCSS = 1- ((LCSS(l1,l2, threshold, delta, distance))/average) return DLCSS def distanceForLCSS(x, y): # lambda x,y:abs(x-y) if x == None or y == None: return float('inf') else: return abs(x-y) # non-aligned LCSS computations, ok for delta = inf def computeLCSS(indicator1, indicator2, threshold, delta = float('inf')): ''' compute the LCSS between two indicators using LCSS''' from utils import LCSS if indicator1 and indicator2: return LCSS(indicator1.getValues(), indicator2.getValues(), threshold, distanceForLCSS, delta) else: return 0 def computeNormalizedLCSS(indicator1, indicator2, threshold, delta = float('inf'), method= min): ''' compute the normalized LCSS between two indicators using LCSS ie, the LCSS divided by the min or mean of the indicator lengths''' from utils import normalizedLCSS if indicator1 and indicator2: return normalizedLCSS(indicator1.getValues(), indicator2.getValues(), threshold, distanceForLCSS, delta, method) else: return 0. def computeDLCSS(indicator1, indicator2, threshold, delta = float('inf'), method = min): ''' compute the LCSS distance between two indicators using LCSS''' from utils import DLCSS return DLCSS(indicator1.getValues(), indicator2.getValues(), threshold, distanceForLCSS, delta, method) # aligned LCSS computations def computeAlignedLCSS(indicator1, indicator2, threshold, delta = float('inf')): ''' compute the aligned LCSS between two indicators using LCSS''' from utils import alignedLCSS if indicator1 and indicator2: return alignedLCSS(indicator1.getValues(), indicator2.getValues(), threshold, distanceForLCSS, delta) else: return 0 def computeNormalizedAlignedLCSS(indicator1, indicator2, threshold, delta = float('inf'), method= min): ''' compute the normalized aligned LCSS between two indicators using LCSS ie, the LCSS divided by the min or mean of the indicator lengths''' from utils import normalizedAlignedLCSS if indicator1 and indicator2: return normalizedAlignedLCSS(indicator1.getValues(), indicator2.getValues(), threshold, distanceForLCSS, delta, method) else: return 0. def computeAlignedDLCSS(indicator1, indicator2, threshold, delta = float('inf'), method = min): ''' compute the aligned LCSS distance between two indicators using LCSS''' from utils import alignedDLCSS return alignedDLCSS(indicator1.getValues(), indicator2.getValues(), threshold, distanceForLCSS, delta, method) class SeverityIndicator(TemporalIndicator): '''Class for severity indicators field mostSevereIsMax is True if the most severe value taken by the indicator is the maximum''' def __init__(self, name, values, timeInterval=None, mostSevereIsMax=True, maxValue = None): TemporalIndicator.__init__(self, name, values, timeInterval, maxValue) self.mostSevereIsMax = mostSevereIsMax def getMostSevereValue(self, minNInstants=1): # TODO use scoreatpercentile from matplotlib.mlab import find from numpy.core.multiarray import array from numpy.core.fromnumeric import mean values = array(self.values.values()) indices = range(len(values)) if len(indices) >= minNInstants: values = sorted(values[indices], reverse = self.mostSevereIsMax) # inverted if most severe is max -> take the first values return mean(values[:minNInstants]) else: return None # functions to aggregate discretized maps of indicators # TODO add values in the cells between the positions (similar to discretizing vector graphics to bitmap) def indicatorMap(indicatorValues, trajectory, squareSize): '''Returns a dictionary with keys for the indices of the cells (squares) in which the trajectory positions are located at which the indicator values are attached ex: speeds and trajectory''' from numpy import floor, mean assert len(indicatorValues) == trajectory.length() indicatorMap = {} for k in xrange(trajectory.length()): p = trajectory[k] i = floor(p.x/squareSize) j = floor(p.y/squareSize) if indicatorMap.has_key((i,j)): indicatorMap[(i,j)].append(indicatorValues[k]) else: indicatorMap[(i,j)] = [indicatorValues[k]] for k in indicatorMap.keys(): indicatorMap[k] = mean(indicatorMap[k]) return indicatorMap def indicatorMapFromPolygon(value, polygon, squareSize): '''Fills an indicator map with the value within the polygon (array of Nx2 coordinates of the polygon vertices)''' import matplotlib.nxutils as nx from numpy.core.multiarray import array, arange from numpy import floor points = [] for x in arange(min(polygon[:,0])+squareSize/2, max(polygon[:,0]), squareSize): for y in arange(min(polygon[:,1])+squareSize/2, max(polygon[:,1]), squareSize): points.append([x,y]) inside = nx.points_inside_poly(array(points), polygon) indicatorMap = {} for i in xrange(len(inside)): if inside[i]: indicatorMap[(floor(points[i][0]/squareSize), floor(points[i][1]/squareSize))] = 0 return indicatorMap def indicatorMapFromAxis(value, limits, squareSize): '''axis = [xmin, xmax, ymin, ymax] ''' from numpy.core.multiarray import arange from numpy import floor indicatorMap = {} for x in arange(limits[0], limits[1], squareSize): for y in arange(limits[2], limits[3], squareSize): indicatorMap[(floor(x/squareSize), floor(y/squareSize))] = value return indicatorMap def combineIndicatorMaps(maps, squareSize, combinationFunction): '''Puts many indicator maps together (averaging the values in each cell if more than one maps has a value)''' #from numpy import mean indicatorMap = {} for m in maps: for k,v in m.iteritems(): if indicatorMap.has_key(k): indicatorMap[k].append(v) else: indicatorMap[k] = [v] for k in indicatorMap.keys(): indicatorMap[k] = combinationFunction(indicatorMap[k]) return indicatorMap if __name__ == "__main__": import doctest import unittest suite = doctest.DocFileSuite('tests/indicators.txt') unittest.TextTestRunner().run(suite) # #doctest.testmod() # #doctest.testfile("example.txt")