Mercurial Hosting > traffic-intelligence
view trafficintelligence/indicators.py @ 1266:ebb18043616e
work in progress on categorization
author | Nicolas Saunier <nicolas.saunier@polymtl.ca> |
---|---|
date | Tue, 28 May 2024 17:16:41 -0400 |
parents | 0f5bebd62a55 |
children | ad60e5adf084 |
line wrap: on
line source
#! /usr/bin/env python '''Class for indicators, temporal indicators, and safety indicators''' from matplotlib.pyplot import plot, ylim from numpy import array, arange, mean, floor, mean, percentile from trafficintelligence import moving from trafficintelligence.utils import LCSS as utilsLCSS def multivariateName(indicatorNames): return '_'.join(indicatorNames) # need for a class representing the indicators, their units, how to print them in graphs... class TemporalIndicator(object): '''Class for temporal indicators i.e. indicators that take a value at specific instants values is a dict, for the values at specific time instants it should have more information like name, unit''' def __init__(self, name, values, maxValue = None): self.name = name self.values = values instants = sorted(self.values.keys()) if len(instants) > 0: self.timeInterval = moving.TimeInterval(instants[0], instants[-1]) else: self.timeInterval = moving.TimeInterval() # else: # assert len(values) == timeInterval.length() # self.timeInterval = timeInterval # self.values = {} # for i in range(int(round(self.timeInterval.length()))): # self.values[self.timeInterval[i]] = values[i] self.maxValue = maxValue def __len__(self): return len(self.values) def empty(self): return len(self.values) == 0 def __getitem__(self, t): 'Returns the value at time t' return self.values.get(t) def getIthValue(self, i): sortedKeys = sorted(self.values.keys()) if 0<=i<len(sortedKeys): return self.values[sortedKeys[i]] else: return None def __iter__(self): self.iterInstantNum = 0 # index in the interval or keys of the dict return self def __next__(self): if self.iterInstantNum >= len(self.values):#(self.timeInterval and self.iterInstantNum>=self.timeInterval.length())\ # or (self.iterInstantNum >= self.values) raise StopIteration else: self.iterInstantNum += 1 return self.getIthValue(self.iterInstantNum-1) def min(self): return min(self.values.values()) def max(self): return max(self.values.values()) def getTimeInterval(self): return self.timeInterval def getName(self): return self.name def getValues(self, withNone = True): result = [self.__getitem__(t) for t in self.timeInterval] if withNone: return result else: return [x for x in result if x is not None] def getInstants(self): return list(self.values.keys()) def plot(self, options = '', xfactor = 1., yfactor = 1., timeShift = 0, **kwargs): if self.getTimeInterval().length() == 1: marker = 'o' else: marker = '' time = sorted(self.values.keys()) plot([(x+timeShift)/xfactor for x in time], [self.values[i]/yfactor for i in time], options+marker, **kwargs) if self.maxValue: ylim(ymax = self.maxValue) @classmethod def createMultivariate(cls, indicators): '''Creates a new temporal indicator where the value at each instant is a list of the indicator values at the instant, in the same order the time interval will be the union of the time intervals of the indicators name is concatenation of the indicator names''' if len(indicators) < 2: print('Error creating multivariate indicator with only {} indicator'.format(len(indicators))) return None timeInterval = moving.TimeInterval.unionIntervals([indic.getTimeInterval() for indic in indicators]) values = {} for t in timeInterval: tmpValues = [indic[t] for indic in indicators] uniqueValues = set(tmpValues) if len(uniqueValues) >= 2 or uniqueValues.pop() is not None: values[t] = tmpValues return cls(multivariateName([indic.name for indic in indicators]), values) # TODO static method avec class en parametre pour faire des indicateurs agrege, list par instant def l1Distance(x, y): # lambda x,y:abs(x-y) if x is None or y is None: return float('inf') else: return abs(x-y) def multiL1Matching(x, y, thresholds, proportionMatching=1.): n = 0 nDimensions = len(x) for i in range(nDimensions): if l1Distance(x[i], y[i]) <= thresholds[i]: n += 1 return n >= nDimensions*proportionMatching class LCSS(utilsLCSS): '''Adapted LCSS class for indicators, same pattern''' def __init__(self, similarityFunc, delta = float('inf'), minLength = 0, aligned = False, lengthFunc = min): utilsLCSS.__init__(self, similarityFunc = similarityFunc, delta = delta, aligned = aligned, lengthFunc = lengthFunc) self.minLength = minLength def checkIndicator(self, indicator): return indicator is not None and len(indicator) >= self.minLength def compute(self, indicator1, indicator2, computeSubSequence = False): if self.checkIndicator(indicator1) and self.checkIndicator(indicator2): return self._compute(indicator1.getValues(), indicator2.getValues(), computeSubSequence) else: return 0 def computeNormalized(self, indicator1, indicator2, computeSubSequence = False): if self.checkIndicator(indicator1) and self.checkIndicator(indicator2): return self._computeNormalized(indicator1.getValues(), indicator2.getValues(), computeSubSequence) else: return 0. def computeDistance(self, indicator1, indicator2, computeSubSequence = False): if self.checkIndicator(indicator1) and self.checkIndicator(indicator2): return self._computeDistance(indicator1.getValues(), indicator2.getValues(), computeSubSequence) else: return 1. class SeverityIndicator(TemporalIndicator): '''Class for severity indicators field mostSevereIsMax is True if the most severe value taken by the indicator is the maximum''' def __init__(self, name, values, mostSevereIsMax=True, maxValue = None): TemporalIndicator.__init__(self, name, values, maxValue) self.mostSevereIsMax = mostSevereIsMax def getMostSevereValue(self, minNInstants=None, centile=None): '''if there are more than minNInstants observations, returns either the average of these maximum values or if centile is not None the n% centile from the most severe value eg for TTC, centile = 15 returns the 15th centile (value such that 15% of observations are lower)''' values = list(self.values.values()) if centile is not None: if self.mostSevereIsMax: c = 100-centile else: c = centile return percentile(values, c) elif minNInstants is not None and minNInstants <= self.__len__(): values = sorted(values, reverse = self.mostSevereIsMax) # inverted if most severe is max -> take the first values return mean(values[:minNInstants]) else: return None def getInstantOfMostSevereValue(self, minSevereValue = None): '''Returns the instant at which the indicator reaches its most severe value or the instants when value is above minSevereValue (it not None)''' if minSevereValue is None: if self.mostSevereIsMax: return max(self.values, key=self.values.get) else: return min(self.values, key=self.values.get) else: if self.mostSevereIsMax: return [t for t in self.values if self.values[t] >= minSevereValue] else: return [t for t in self.values if self.values[t] <= minSevereValue] # functions to aggregate discretized maps of indicators # TODO add values in the cells between the positions (similar to discretizing vector graphics to bitmap) def indicatorMap(indicatorValues, trajectory, squareSize): '''Returns a dictionary with keys for the indices of the cells (squares) in which the trajectory positions are located at which the indicator values are attached ex: speeds and trajectory''' assert len(indicatorValues) == trajectory.length() indicatorMap = {} for k in range(trajectory.length()): p = trajectory[k] i = floor(p.x/squareSize) j = floor(p.y/squareSize) if (i,j) in indicatorMap: indicatorMap[(i,j)].append(indicatorValues[k]) else: indicatorMap[(i,j)] = [indicatorValues[k]] for k in indicatorMap: indicatorMap[k] = mean(indicatorMap[k]) return indicatorMap # def indicatorMapFromPolygon(value, polygon, squareSize): # '''Fills an indicator map with the value within the polygon # (array of Nx2 coordinates of the polygon vertices)''' # points = [] # for x in arange(min(polygon[:,0])+squareSize/2, max(polygon[:,0]), squareSize): # for y in arange(min(polygon[:,1])+squareSize/2, max(polygon[:,1]), squareSize): # points.append([x,y]) # inside = nx.points_inside_poly(array(points), polygon) # indicatorMap = {} # for i in range(len(inside)): # if inside[i]: # indicatorMap[(floor(points[i][0]/squareSize), floor(points[i][1]/squareSize))] = 0 # return indicatorMap def indicatorMapFromAxis(value, limits, squareSize): '''axis = [xmin, xmax, ymin, ymax] ''' indicatorMap = {} for x in arange(limits[0], limits[1], squareSize): for y in arange(limits[2], limits[3], squareSize): indicatorMap[(floor(x/squareSize), floor(y/squareSize))] = value return indicatorMap def combineIndicatorMaps(maps, squareSize, combinationFunction): '''Puts many indicator maps together (averaging the values in each cell if more than one maps has a value)''' indicatorMap = {} for m in maps: for k,v in m.items(): if k in indicatorMap: indicatorMap[k].append(v) else: indicatorMap[k] = [v] for k in indicatorMap: indicatorMap[k] = combinationFunction(indicatorMap[k]) return indicatorMap if __name__ == "__main__": import doctest import unittest suite = doctest.DocFileSuite('tests/indicators.txt') unittest.TextTestRunner().run(suite) # #doctest.testmod() # #doctest.testfile("example.txt")