Mercurial Hosting > traffic-intelligence
view python/indicators.py @ 295:ba29bd82bd04
added option to disable computation of crossing zones
author | Nicolas Saunier <nicolas.saunier@polymtl.ca> |
---|---|
date | Mon, 11 Feb 2013 15:59:15 -0500 |
parents | 66691c06928c |
children | 93d851d0d21e |
line wrap: on
line source
#! /usr/bin/env python '''Class for indicators, temporal indicators, and safety indicators''' __metaclass__ = type import moving # need for a class representing the indicators, their units, how to print them in graphs... class TemporalIndicator: '''Class for temporal indicators i.e. indicators that take a value at specific instants values should be * a dict, for the values at specific time instants * or a list with a time interval object if continuous measurements it should have more information like name, unit''' def __init__(self, name, values, timeInterval=None, maxValue = None): self.name = name self.isCosine = name.find('Cosine') if timeInterval: assert len(values) == timeInterval.length() self.timeInterval = timeInterval self.values = {} for i in xrange(int(round(self.timeInterval.length()))): self.values[self.timeInterval[i]] = values[i] else: self.values = values instants = sorted(self.values.keys()) if instants: self.timeInterval = moving.TimeInterval(instants[0], instants[-1]) else: self.timeInterval = moving.TimeInterval() self.maxValue = maxValue def __len__(self): return len(self.values) def empty(self): return len(self.values) == 0 def __getitem__(self, i): 'Returns ith value in time interval' if i in self.values.keys(): return self.values[i] else: return None def getIthValue(self, i): sortedKeys = sorted(self.values.keys()) if 0<=i<len(sortedKeys): return self.values[sortedKeys[i]] else: return None def __iter__(self): self.iterInstantNum = 0 # index in the interval or keys of the dict return self def next(self): if self.iterInstantNum >= len(self.values):#(self.timeInterval and self.iterInstantNum>=self.timeInterval.length())\ # or (self.iterInstantNum >= self.values) raise StopIteration else: self.iterInstantNum += 1 return self.getIthValue(self.iterInstantNum-1) def getTimeInterval(self): return self.timeInterval def getValues(self): return [self.__getitem__(t) for t in self.timeInterval] def getAngleValues(self): '''if the indicator is a function of an angle, transform it to an angle (eg cos) (no transformation otherwise)''' from numpy import arccos values = self.getValues() if self.isCosine >= 0: return [arccos(c) for c in values] else: return values def plot(self, options = '', xfactor = 1., **kwargs): from matplotlib.pylab import plot,ylim if self.getTimeInterval().length() == 1: marker = 'o' else: marker = '' time = sorted(self.values.keys()) plot([x/xfactor for x in time], [self.values[i] for i in time], options+marker, **kwargs) if self.maxValue: ylim(ymax = self.maxValue) def computeDLCSS(indicator1, indicator2, threshold, delta = float('inf'), method= 'min'): ''' compute the distance between two indicators using LCSS two common methods are used: min or mean of the indicators length''' from utils import LCSS def distance(x, y): # lambda x,y:abs(x-y) if x == None or y == None: return float('inf') else: return abs(x-y) lcss = LCSS(indicator1.getValues(), indicator2.getValues(), threshold, distance, delta) if method == 'min': denominator = min(len(indicator1), len(indicator2)) elif method == 'mean': denominator = float(len(indicator1) + len(indicator2))/2 else: print('Unknown denominator method name') denominator = 1. return 1-float(lcss)/denominator class SeverityIndicator(TemporalIndicator): '''Class for severity indicators field mostSevereIsMax is True if the most severe value taken by the indicator is the maximum''' def __init__(self, name, values, timeInterval=None, mostSevereIsMax=True, maxValue = None): TemporalIndicator.__init__(self, name, values, timeInterval, maxValue) self.mostSevereIsMax = mostSevereIsMax def getMostSevereValue(self, minNInstants=1): # TODO use scoreatpercentile from matplotlib.mlab import find from numpy.core.multiarray import array from numpy.core.fromnumeric import mean values = array(self.values.values()) indices = range(len(values)) if len(indices) >= minNInstants: values = sorted(values[indices], reverse = self.mostSevereIsMax) # inverted if most severe is max -> take the first values return mean(values[:minNInstants]) else: return None # functions to aggregate discretized maps of indicators # TODO add values in the cells between the positions (similar to discretizing vector graphics to bitmap) def indicatorMap(indicatorValues, trajectory, squareSize): '''Returns a dictionary with keys for the indices of the cells (squares) in which the trajectory positions are located at which the indicator values are attached ex: speeds and trajectory''' from numpy import floor, mean assert len(indicatorValues) == trajectory.length() indicatorMap = {} for k in xrange(trajectory.length()): p = trajectory[k] i = floor(p.x/squareSize) j = floor(p.y/squareSize) if indicatorMap.has_key((i,j)): indicatorMap[(i,j)].append(indicatorValues[k]) else: indicatorMap[(i,j)] = [indicatorValues[k]] for k in indicatorMap.keys(): indicatorMap[k] = mean(indicatorMap[k]) return indicatorMap def indicatorMapFromPolygon(value, polygon, squareSize): '''Fills an indicator map with the value within the polygon (array of Nx2 coordinates of the polygon vertices)''' import matplotlib.nxutils as nx from numpy.core.multiarray import array, arange from numpy import floor points = [] for x in arange(min(polygon[:,0])+squareSize/2, max(polygon[:,0]), squareSize): for y in arange(min(polygon[:,1])+squareSize/2, max(polygon[:,1]), squareSize): points.append([x,y]) inside = nx.points_inside_poly(array(points), polygon) indicatorMap = {} for i in xrange(len(inside)): if inside[i]: indicatorMap[(floor(points[i][0]/squareSize), floor(points[i][1]/squareSize))] = 0 return indicatorMap def indicatorMapFromAxis(value, limits, squareSize): '''axis = [xmin, xmax, ymin, ymax] ''' from numpy.core.multiarray import arange from numpy import floor indicatorMap = {} for x in arange(limits[0], limits[1], squareSize): for y in arange(limits[2], limits[3], squareSize): indicatorMap[(floor(x/squareSize), floor(y/squareSize))] = value return indicatorMap def combineIndicatorMaps(maps, squareSize, combinationFunction): '''Puts many indicator maps together (averaging the values in each cell if more than one maps has a value)''' #from numpy import mean indicatorMap = {} for m in maps: for k,v in m.iteritems(): if indicatorMap.has_key(k): indicatorMap[k].append(v) else: indicatorMap[k] = [v] for k in indicatorMap.keys(): indicatorMap[k] = combinationFunction(indicatorMap[k]) return indicatorMap if __name__ == "__main__": import doctest import unittest suite = doctest.DocFileSuite('tests/indicators.txt') unittest.TextTestRunner().run(suite) # #doctest.testmod() # #doctest.testfile("example.txt")