diff trafficintelligence/indicators.py @ 1028:cc5cb04b04b0

major update using the trafficintelligence package name and install through pip
author Nicolas Saunier <nicolas.saunier@polymtl.ca>
date Fri, 15 Jun 2018 11:19:10 -0400
parents python/indicators.py@933670761a57
children c6cf75a2ed08
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/trafficintelligence/indicators.py	Fri Jun 15 11:19:10 2018 -0400
@@ -0,0 +1,258 @@
+#! /usr/bin/env python
+'''Class for indicators, temporal indicators, and safety indicators'''
+
+from trafficintelligence import moving
+#import matplotlib.nxutils as nx
+from matplotlib.pyplot import plot, ylim
+from matplotlib.pylab import find
+from numpy import array, arange, mean, floor, mean
+from scipy import percentile
+
+def multivariateName(indicatorNames):
+    return '_'.join(indicatorNames)
+
+# need for a class representing the indicators, their units, how to print them in graphs...
+class TemporalIndicator(object):
+    '''Class for temporal indicators
+    i.e. indicators that take a value at specific instants
+
+    values should be
+    * a dict, for the values at specific time instants
+    * or a list with a time interval object if continuous measurements
+
+    it should have more information like name, unit'''
+    
+    def __init__(self, name, values, timeInterval = None, maxValue = None):
+        self.name = name
+        if timeInterval is None:
+            self.values = values
+            instants = sorted(self.values.keys())
+            if len(instants) > 0:
+                self.timeInterval = moving.TimeInterval(instants[0], instants[-1])
+            else:
+                self.timeInterval = moving.TimeInterval()
+        else:
+            assert len(values) == timeInterval.length()
+            self.timeInterval = timeInterval
+            self.values = {}
+            for i in range(int(round(self.timeInterval.length()))):
+                self.values[self.timeInterval[i]] = values[i]
+        self.maxValue = maxValue
+
+    def __len__(self):
+        return len(self.values)
+
+    def empty(self):
+        return len(self.values) == 0
+
+    def __getitem__(self, t):
+        'Returns the value at time t'
+        return self.values.get(t)
+
+    def getIthValue(self, i):
+        sortedKeys = sorted(self.values.keys())
+        if 0<=i<len(sortedKeys):
+            return self.values[sortedKeys[i]]
+        else:
+            return None
+
+    def __iter__(self):
+        self.iterInstantNum = 0 # index in the interval or keys of the dict
+        return self
+
+    def __next__(self):
+        if self.iterInstantNum >= len(self.values):#(self.timeInterval and self.iterInstantNum>=self.timeInterval.length())\
+           #     or (self.iterInstantNum >= self.values)
+            raise StopIteration
+        else:
+            self.iterInstantNum += 1
+            return self.getIthValue(self.iterInstantNum-1)
+
+    def getTimeInterval(self):
+        return self.timeInterval
+
+    def getName(self):
+        return self.name
+
+    def getValues(self):
+        return [self.__getitem__(t) for t in self.timeInterval]
+
+    def plot(self, options = '', xfactor = 1., yfactor = 1., timeShift = 0, **kwargs):
+        if self.getTimeInterval().length() == 1:
+            marker = 'o'
+        else:
+            marker = ''
+        time = sorted(self.values.keys())
+        plot([(x+timeShift)/xfactor for x in time], [self.values[i]/yfactor for i in time], options+marker, **kwargs)
+        if self.maxValue:
+            ylim(ymax = self.maxValue)
+
+    @classmethod
+    def createMultivariate(cls, indicators):
+        '''Creates a new temporal indicator where the value at each instant is a list 
+        of the indicator values at the instant, in the same order
+        the time interval will be the union of the time intervals of the indicators
+        name is concatenation of the indicator names'''
+        if len(indicators) < 2:
+            print('Error creating multivariate indicator with only {} indicator'.format(len(indicators)))
+            return None
+
+        timeInterval = moving.TimeInterval.unionIntervals([indic.getTimeInterval() for indic in indicators])
+        values = {}
+        for t in timeInterval:
+            tmpValues = [indic[t] for indic in indicators]
+            uniqueValues = set(tmpValues)
+            if len(uniqueValues) >= 2 or uniqueValues.pop() is not None:
+                values[t] = tmpValues
+        return cls(multivariateName([indic.name for indic in indicators]), values)
+
+# TODO static method avec class en parametre pour faire des indicateurs agrege, list par instant
+
+def l1Distance(x, y): # lambda x,y:abs(x-y)
+    if x is None or y is None:
+        return float('inf')
+    else:
+        return abs(x-y)
+
+def multiL1Matching(x, y, thresholds, proportionMatching=1.):
+    n = 0
+    nDimensions = len(x)
+    for i in range(nDimensions):
+        if l1Distance(x[i], y[i]) <= thresholds[i]:
+            n += 1
+    return n >= nDimensions*proportionMatching
+
+from utils import LCSS as utilsLCSS
+
+class LCSS(utilsLCSS):
+    '''Adapted LCSS class for indicators, same pattern'''
+    def __init__(self, similarityFunc, delta = float('inf'), minLength = 0, aligned = False, lengthFunc = min):
+        utilsLCSS.__init__(self, similarityFunc = similarityFunc, delta = delta, aligned = aligned, lengthFunc = lengthFunc)
+        self.minLength = minLength
+
+    def checkIndicator(self, indicator):
+        return indicator is not None and len(indicator) >= self.minLength
+
+    def compute(self, indicator1, indicator2, computeSubSequence = False):
+        if self.checkIndicator(indicator1) and self.checkIndicator(indicator2):
+            return self._compute(indicator1.getValues(), indicator2.getValues(), computeSubSequence)
+        else:
+            return 0
+
+    def computeNormalized(self, indicator1, indicator2, computeSubSequence = False):
+        if self.checkIndicator(indicator1) and self.checkIndicator(indicator2):
+            return self._computeNormalized(indicator1.getValues(), indicator2.getValues(), computeSubSequence)
+        else:
+            return 0.
+
+    def computeDistance(self, indicator1, indicator2, computeSubSequence = False):
+        if self.checkIndicator(indicator1) and self.checkIndicator(indicator2):
+            return self._computeDistance(indicator1.getValues(), indicator2.getValues(), computeSubSequence)
+        else:
+            return 1.
+        
+class SeverityIndicator(TemporalIndicator):
+    '''Class for severity indicators 
+    field mostSevereIsMax is True 
+    if the most severe value taken by the indicator is the maximum'''
+
+    def __init__(self, name, values, timeInterval=None, mostSevereIsMax=True, maxValue = None): 
+        TemporalIndicator.__init__(self, name, values, timeInterval, maxValue)
+        self.mostSevereIsMax = mostSevereIsMax
+
+    def getMostSevereValue(self, minNInstants=1, centile=15.):
+        '''if there are more than minNInstants observations, 
+        returns either the average of these maximum values 
+        or if centile is not None the n% centile from the most severe value
+
+        eg for TTC, 15 returns the 15th centile (value such that 15% of observations are lower)'''
+        if self.__len__() < minNInstants:
+            return None
+        else:
+            values = list(self.values.values())
+            if centile is not None:
+                if self.mostSevereIsMax:
+                    c = 100-centile
+                else:
+                    c = centile
+                return percentile(values, c)
+            else:
+                values = sorted(values, reverse = self.mostSevereIsMax) # inverted if most severe is max -> take the first values
+                return mean(values[:minNInstants])
+
+    def getInstantOfMostSevereValue(self):
+        '''Returns the instant at which the indicator reaches its most severe value'''
+        if self.mostSevereIsMax:
+            return max(self.values, key=self.values.get)
+        else:
+            return min(self.values, key=self.values.get)
+
+# functions to aggregate discretized maps of indicators
+# TODO add values in the cells between the positions (similar to discretizing vector graphics to bitmap)
+
+def indicatorMap(indicatorValues, trajectory, squareSize):
+    '''Returns a dictionary 
+    with keys for the indices of the cells (squares)
+    in which the trajectory positions are located
+    at which the indicator values are attached
+
+    ex: speeds and trajectory'''
+
+    assert len(indicatorValues) == trajectory.length()
+    indicatorMap = {}
+    for k in range(trajectory.length()):
+        p = trajectory[k]
+        i = floor(p.x/squareSize)
+        j = floor(p.y/squareSize)
+        if (i,j) in indicatorMap:
+            indicatorMap[(i,j)].append(indicatorValues[k])
+        else:
+            indicatorMap[(i,j)] = [indicatorValues[k]]
+    for k in indicatorMap:
+        indicatorMap[k] = mean(indicatorMap[k])
+    return indicatorMap
+
+# def indicatorMapFromPolygon(value, polygon, squareSize):
+#     '''Fills an indicator map with the value within the polygon
+#     (array of Nx2 coordinates of the polygon vertices)'''
+#     points = []
+#     for x in arange(min(polygon[:,0])+squareSize/2, max(polygon[:,0]), squareSize):
+#         for y in arange(min(polygon[:,1])+squareSize/2, max(polygon[:,1]), squareSize):
+#             points.append([x,y])
+#     inside = nx.points_inside_poly(array(points), polygon)
+#     indicatorMap = {}
+#     for i in range(len(inside)):
+#         if inside[i]:
+#             indicatorMap[(floor(points[i][0]/squareSize), floor(points[i][1]/squareSize))] = 0
+#     return indicatorMap
+
+def indicatorMapFromAxis(value, limits, squareSize):
+    '''axis = [xmin, xmax, ymin, ymax] '''
+    indicatorMap = {}
+    for x in arange(limits[0], limits[1], squareSize):
+        for y in arange(limits[2], limits[3], squareSize):
+            indicatorMap[(floor(x/squareSize), floor(y/squareSize))] = value
+    return indicatorMap
+
+def combineIndicatorMaps(maps, squareSize, combinationFunction):
+    '''Puts many indicator maps together 
+    (averaging the values in each cell 
+    if more than one maps has a value)'''
+    indicatorMap = {}
+    for m in maps:
+        for k,v in m.items():
+            if k in indicatorMap:
+                indicatorMap[k].append(v)
+            else:
+                indicatorMap[k] = [v]
+    for k in indicatorMap:
+        indicatorMap[k] = combinationFunction(indicatorMap[k])
+    return indicatorMap
+
+if __name__ == "__main__":
+    import doctest
+    import unittest
+    suite = doctest.DocFileSuite('tests/indicators.txt')
+    unittest.TextTestRunner().run(suite)
+#     #doctest.testmod()
+#     #doctest.testfile("example.txt")