Mercurial Hosting > traffic-intelligence
view python/indicators.py @ 267:32e88b513f5c
added code to compute probability of collision
author | Nicolas Saunier <nicolas.saunier@polymtl.ca> |
---|---|
date | Fri, 27 Jul 2012 20:32:14 -0400 |
parents | 7a3bf04cf016 |
children | a9988971aac8 |
line wrap: on
line source
#! /usr/bin/env python '''Class for indicators, temporal indicators, and safety indicators''' __metaclass__ = type # need for a class representing the indicators, their units, how to print them in graphs... class TemporalIndicator: '''Class for temporal indicators i.e. indicators that take a value at specific instants values should be * a dict, for the values at specific time instants * or a list with a time interval object if continuous measurements it should have more information like name, unit''' def __init__(self, name, values, timeInterval=None): self.name = name self.isCosine = name.find('Cosine') self.values = values self.timeInterval = timeInterval if timeInterval: assert len(values) == timeInterval.length() def empty(self): return len(self.values) == 0 def __getitem__(self, i): if self.timeInterval: if self.timeInterval.contains(i): return self.values[i-self.timeInterval.first] else: if i in self.values.keys(): return self.values[i] return None # default def __iter__(self): self.iterInstantNum = 0 # index in the interval or keys of the dict return self def next(self): if self.iterInstantNum >= len(self.values):#(self.timeInterval and self.iterInstantNum>=self.timeInterval.length())\ # or (self.iterInstantNum >= self.values) raise StopIteration else: self.iterInstantNum += 1 if self.timeInterval: return self.values[self.iterInstantNum-1] else: return self.values.values()[self.iterInstantNum-1] def getTimeInterval(self): if not self.timeInterval and type(self.values)==dict: instants = self.values.keys() if instants: self.timeInterval = TimeInterval(instants[0], instants[-1]) else: self.timeInterval = TimeInterval() return self.timeInterval def getValues(self): if self.timeInterval: return self.values else: return self.values.values() def getAngleValues(self): '''if the indicator is a function of an angle, transform it to an angle (eg cos) (no transformation otherwise)''' from numpy import arccos values = self.getValues() if self.isCosine >= 0: return [arccos(c) for c in values] else: return values def plot(self, options = '', **kwargs): from matplotlib.pylab import plot if not self.timeInterval and type(self.values)==dict: time = sorted(self.values.keys()) plot(time, [self.values[i] for i in time], options, **kwargs) else: plot(list(getTimeInterval()), self.values, options, **kwargs) class SeverityIndicator(TemporalIndicator): '''Class for severity indicators field mostSevereIsMax is True if the most severe value taken by the indicator is the maximum''' def __init__(self, name, values, timeInterval=None, mostSevereIsMax=True, ignoredValue = None): TemporalIndicator.__init__(self, name, values, timeInterval) self.mostSevereIsMax = mostSevereIsMax self.ignoredValue = ignoredValue def getMostSevereValue(self, minNInstants=1): from matplotlib.mlab import find from numpy.core.multiarray import array from numpy.core.fromnumeric import mean values = array(self.values.values()) if self.ignoredValue: indices = find(values != self.ignoredValue) else: indices = range(len(values)) if len(indices) >= minNInstants: values = sorted(values[indices], reverse = self.mostSevereIsMax) # inverted if most severe is max -> take the first values return mean(values[:minNInstants]) else: return None # functions to aggregate discretized maps of indicators # TODO add values in the cells between the positions (similar to discretizing vector graphics to bitmap) def indicatorMap(indicatorValues, trajectory, squareSize): '''Returns a dictionary with keys for the indices of the cells (squares) in which the trajectory positions are located at which the indicator values are attached ex: speeds and trajectory''' from numpy import floor, mean assert len(indicatorValues) == trajectory.length() indicatorMap = {} for k in xrange(trajectory.length()): p = trajectory[k] i = floor(p.x/squareSize) j = floor(p.y/squareSize) if indicatorMap.has_key((i,j)): indicatorMap[(i,j)].append(indicatorValues[k]) else: indicatorMap[(i,j)] = [indicatorValues[k]] for k in indicatorMap.keys(): indicatorMap[k] = mean(indicatorMap[k]) return indicatorMap def indicatorMapFromPolygon(value, polygon, squareSize): '''Fills an indicator map with the value within the polygon (array of Nx2 coordinates of the polygon vertices)''' import matplotlib.nxutils as nx from numpy.core.multiarray import array, arange from numpy import floor points = [] for x in arange(min(polygon[:,0])+squareSize/2, max(polygon[:,0]), squareSize): for y in arange(min(polygon[:,1])+squareSize/2, max(polygon[:,1]), squareSize): points.append([x,y]) inside = nx.points_inside_poly(array(points), polygon) indicatorMap = {} for i in xrange(len(inside)): if inside[i]: indicatorMap[(floor(points[i][0]/squareSize), floor(points[i][1]/squareSize))] = 0 return indicatorMap def indicatorMapFromAxis(value, limits, squareSize): '''axis = [xmin, xmax, ymin, ymax] ''' from numpy.core.multiarray import arange from numpy import floor indicatorMap = {} for x in arange(limits[0], limits[1], squareSize): for y in arange(limits[2], limits[3], squareSize): indicatorMap[(floor(x/squareSize), floor(y/squareSize))] = value return indicatorMap def combineIndicatorMaps(maps, squareSize, combinationFunction): '''Puts many indicator maps together (averaging the values in each cell if more than one maps has a value)''' #from numpy import mean indicatorMap = {} for m in maps: for k,v in m.iteritems(): if indicatorMap.has_key(k): indicatorMap[k].append(v) else: indicatorMap[k] = [v] for k in indicatorMap.keys(): indicatorMap[k] = combinationFunction(indicatorMap[k]) return indicatorMap if __name__ == "__main__": import doctest import unittest suite = doctest.DocFileSuite('tests/indicators.txt') unittest.TextTestRunner().run(suite) # #doctest.testmod() # #doctest.testfile("example.txt")