view trafficintelligence/indicators.py @ 1267:ad60e5adf084

cleaned interaction categorization and added stationary category
author Nicolas Saunier <nicolas.saunier@polymtl.ca>
date Wed, 29 May 2024 09:52:42 -0400
parents 0f5bebd62a55
children
line wrap: on
line source

#! /usr/bin/env python
'''Class for indicators, temporal indicators, and safety indicators'''

from matplotlib.pyplot import plot, ylim
from numpy import array, arange, mean, floor, mean, percentile

from trafficintelligence import moving
from trafficintelligence.utils import LCSS as utilsLCSS

def multivariateName(indicatorNames):
    return '_'.join(indicatorNames)

# need for a class representing the indicators, their units, how to print them in graphs...
class TemporalIndicator(object):
    '''Class for temporal indicators
    i.e. indicators that take a value at specific instants

    values is a dict, for the values at specific time instants
    it should have more information like name, unit'''
    
    def __init__(self, name, values, maxValue = None):
        self.name = name
        self.values = values
        instants = sorted(self.values.keys())
        if len(instants) > 0:
            self.timeInterval = moving.TimeInterval(instants[0], instants[-1])
        else:
            self.timeInterval = moving.TimeInterval()
        # else:
        #     assert len(values) == timeInterval.length()
        #     self.timeInterval = timeInterval
        #     self.values = {}
        #     for i in range(int(round(self.timeInterval.length()))):
        #         self.values[self.timeInterval[i]] = values[i]
        self.maxValue = maxValue

    def __len__(self):
        return len(self.values)

    def empty(self):
        return len(self.values) == 0

    def __getitem__(self, t):
        'Returns the value at time t'
        return self.values.get(t)

    def getIthValue(self, i):
        sortedKeys = sorted(self.values.keys())
        if 0<=i<len(sortedKeys):
            return self.values[sortedKeys[i]]
        else:
            return None

    def __iter__(self):
        self.iterInstantNum = 0 # index in the interval or keys of the dict
        return self

    def __next__(self):
        if self.iterInstantNum >= len(self.values):#(self.timeInterval and self.iterInstantNum>=self.timeInterval.length())\
           #     or (self.iterInstantNum >= self.values)
            raise StopIteration
        else:
            self.iterInstantNum += 1
            return self.getIthValue(self.iterInstantNum-1)

    def min(self):
        return min(self.values.values())

    def max(self):
        return max(self.values.values())
        
    def getTimeInterval(self):
        return self.timeInterval

    def getName(self):
        return self.name

    def getValues(self, withNone = True):
        result = [self.__getitem__(t) for t in self.timeInterval]
        if withNone:
            return result
        else:
            return [x for x in result if x is not None]

    def getInstants(self):
        return list(self.values.keys())

    def existsAtInstant(self, instant):
        return instant in self.values

    def plot(self, options = '', xfactor = 1., yfactor = 1., timeShift = 0, **kwargs):
        if self.getTimeInterval().length() == 1:
            marker = 'o'
        else:
            marker = ''
        time = sorted(self.values.keys())
        plot([(x+timeShift)/xfactor for x in time], [self.values[i]/yfactor for i in time], options+marker, **kwargs)
        if self.maxValue:
            ylim(ymax = self.maxValue)

    @classmethod
    def createMultivariate(cls, indicators):
        '''Creates a new temporal indicator where the value at each instant is a list 
        of the indicator values at the instant, in the same order
        the time interval will be the union of the time intervals of the indicators
        name is concatenation of the indicator names'''
        if len(indicators) < 2:
            print('Error creating multivariate indicator with only {} indicator'.format(len(indicators)))
            return None

        timeInterval = moving.TimeInterval.unionIntervals([indic.getTimeInterval() for indic in indicators])
        values = {}
        for t in timeInterval:
            tmpValues = [indic[t] for indic in indicators]
            uniqueValues = set(tmpValues)
            if len(uniqueValues) >= 2 or uniqueValues.pop() is not None:
                values[t] = tmpValues
        return cls(multivariateName([indic.name for indic in indicators]), values)

# TODO static method avec class en parametre pour faire des indicateurs agrege, list par instant

def l1Distance(x, y): # lambda x,y:abs(x-y)
    if x is None or y is None:
        return float('inf')
    else:
        return abs(x-y)

def multiL1Matching(x, y, thresholds, proportionMatching=1.):
    n = 0
    nDimensions = len(x)
    for i in range(nDimensions):
        if l1Distance(x[i], y[i]) <= thresholds[i]:
            n += 1
    return n >= nDimensions*proportionMatching

class LCSS(utilsLCSS):
    '''Adapted LCSS class for indicators, same pattern'''
    def __init__(self, similarityFunc, delta = float('inf'), minLength = 0, aligned = False, lengthFunc = min):
        utilsLCSS.__init__(self, similarityFunc = similarityFunc, delta = delta, aligned = aligned, lengthFunc = lengthFunc)
        self.minLength = minLength

    def checkIndicator(self, indicator):
        return indicator is not None and len(indicator) >= self.minLength

    def compute(self, indicator1, indicator2, computeSubSequence = False):
        if self.checkIndicator(indicator1) and self.checkIndicator(indicator2):
            return self._compute(indicator1.getValues(), indicator2.getValues(), computeSubSequence)
        else:
            return 0

    def computeNormalized(self, indicator1, indicator2, computeSubSequence = False):
        if self.checkIndicator(indicator1) and self.checkIndicator(indicator2):
            return self._computeNormalized(indicator1.getValues(), indicator2.getValues(), computeSubSequence)
        else:
            return 0.

    def computeDistance(self, indicator1, indicator2, computeSubSequence = False):
        if self.checkIndicator(indicator1) and self.checkIndicator(indicator2):
            return self._computeDistance(indicator1.getValues(), indicator2.getValues(), computeSubSequence)
        else:
            return 1.
        
class SeverityIndicator(TemporalIndicator):
    '''Class for severity indicators 
    field mostSevereIsMax is True 
    if the most severe value taken by the indicator is the maximum'''

    def __init__(self, name, values, mostSevereIsMax=True, maxValue = None): 
        TemporalIndicator.__init__(self, name, values, maxValue)
        self.mostSevereIsMax = mostSevereIsMax

    def getMostSevereValue(self, minNInstants=None, centile=None):
        '''if there are more than minNInstants observations, 
        returns either the average of these maximum values 
        or if centile is not None the n% centile from the most severe value

        eg for TTC, centile = 15 returns the 15th centile (value such that 15% of observations are lower)'''
        values = list(self.values.values())
        if centile is not None:
            if self.mostSevereIsMax:
                c = 100-centile
            else:
                c = centile
            return percentile(values, c)
        elif minNInstants is not None and minNInstants <= self.__len__():
            values = sorted(values, reverse = self.mostSevereIsMax) # inverted if most severe is max -> take the first values
            return mean(values[:minNInstants])
        else:
            return None

    def getInstantOfMostSevereValue(self, minSevereValue = None):
        '''Returns the instant at which the indicator reaches its most severe value
        or the instants when value is above minSevereValue (it not None)'''
        if minSevereValue is None:
            if self.mostSevereIsMax:
                return max(self.values, key=self.values.get)
            else:
                return min(self.values, key=self.values.get)
        else:
            if self.mostSevereIsMax:
                return [t for t in self.values if self.values[t] >= minSevereValue]
            else:
                return [t for t in self.values if self.values[t] <= minSevereValue]

# functions to aggregate discretized maps of indicators
# TODO add values in the cells between the positions (similar to discretizing vector graphics to bitmap)

def indicatorMap(indicatorValues, trajectory, squareSize):
    '''Returns a dictionary 
    with keys for the indices of the cells (squares)
    in which the trajectory positions are located
    at which the indicator values are attached

    ex: speeds and trajectory'''

    assert len(indicatorValues) == trajectory.length()
    indicatorMap = {}
    for k in range(trajectory.length()):
        p = trajectory[k]
        i = floor(p.x/squareSize)
        j = floor(p.y/squareSize)
        if (i,j) in indicatorMap:
            indicatorMap[(i,j)].append(indicatorValues[k])
        else:
            indicatorMap[(i,j)] = [indicatorValues[k]]
    for k in indicatorMap:
        indicatorMap[k] = mean(indicatorMap[k])
    return indicatorMap

# def indicatorMapFromPolygon(value, polygon, squareSize):
#     '''Fills an indicator map with the value within the polygon
#     (array of Nx2 coordinates of the polygon vertices)'''
#     points = []
#     for x in arange(min(polygon[:,0])+squareSize/2, max(polygon[:,0]), squareSize):
#         for y in arange(min(polygon[:,1])+squareSize/2, max(polygon[:,1]), squareSize):
#             points.append([x,y])
#     inside = nx.points_inside_poly(array(points), polygon)
#     indicatorMap = {}
#     for i in range(len(inside)):
#         if inside[i]:
#             indicatorMap[(floor(points[i][0]/squareSize), floor(points[i][1]/squareSize))] = 0
#     return indicatorMap

def indicatorMapFromAxis(value, limits, squareSize):
    '''axis = [xmin, xmax, ymin, ymax] '''
    indicatorMap = {}
    for x in arange(limits[0], limits[1], squareSize):
        for y in arange(limits[2], limits[3], squareSize):
            indicatorMap[(floor(x/squareSize), floor(y/squareSize))] = value
    return indicatorMap

def combineIndicatorMaps(maps, squareSize, combinationFunction):
    '''Puts many indicator maps together 
    (averaging the values in each cell 
    if more than one maps has a value)'''
    indicatorMap = {}
    for m in maps:
        for k,v in m.items():
            if k in indicatorMap:
                indicatorMap[k].append(v)
            else:
                indicatorMap[k] = [v]
    for k in indicatorMap:
        indicatorMap[k] = combinationFunction(indicatorMap[k])
    return indicatorMap

if __name__ == "__main__":
    import doctest
    import unittest
    suite = doctest.DocFileSuite('tests/indicators.txt')
    unittest.TextTestRunner().run(suite)
#     #doctest.testmod()
#     #doctest.testfile("example.txt")