view python/indicators.py @ 312:6c068047edbf

merged with Mohamed s work
author Nicolas Saunier <nicolas.saunier@polymtl.ca>
date Thu, 11 Apr 2013 22:46:33 -0400
parents 93d851d0d21e 8bafd054cda4
children 43e62b9cb652
line wrap: on
line source

#! /usr/bin/env python
'''Class for indicators, temporal indicators, and safety indicators'''

__metaclass__ = type

import moving

# need for a class representing the indicators, their units, how to print them in graphs...
class TemporalIndicator:
    '''Class for temporal indicators
    i.e. indicators that take a value at specific instants

    values should be
    * a dict, for the values at specific time instants
    * or a list with a time interval object if continuous measurements

    it should have more information like name, unit'''
    
    def __init__(self, name, values, timeInterval=None, maxValue = None):
        self.name = name
        self.isCosine = (name.find('Cosine') >= 0)
        if timeInterval:
            assert len(values) == timeInterval.length()
            self.timeInterval = timeInterval
            self.values = {}
            for i in xrange(int(round(self.timeInterval.length()))):
                self.values[self.timeInterval[i]] = values[i]
        else:
            self.values = values
            instants = sorted(self.values.keys())
            if instants:
                self.timeInterval = moving.TimeInterval(instants[0], instants[-1])
            else:
                self.timeInterval = moving.TimeInterval()
        self.maxValue = maxValue

    def __len__(self):
        return len(self.values)

    def empty(self):
        return len(self.values) == 0

    def __getitem__(self, i):
        'Returns ith value in time interval'
        if i in self.values.keys():
            return self.values[i]
        else:
            return None

    def getIthValue(self, i):
        sortedKeys = sorted(self.values.keys())
        if 0<=i<len(sortedKeys):
            return self.values[sortedKeys[i]]
        else:
            return None

    def __iter__(self):
        self.iterInstantNum = 0 # index in the interval or keys of the dict
        return self

    def next(self):
        if self.iterInstantNum >= len(self.values):#(self.timeInterval and self.iterInstantNum>=self.timeInterval.length())\
           #     or (self.iterInstantNum >= self.values)
            raise StopIteration
        else:
            self.iterInstantNum += 1
            return self.getIthValue(self.iterInstantNum-1)

    def getTimeInterval(self):
        return self.timeInterval

    def getValues(self):
        return [self.__getitem__(t) for t in self.timeInterval]

    def getAngleValues(self):
        '''if the indicator is a function of an angle, 
        transform it to an angle (eg cos)
        (no transformation otherwise)'''
        from numpy import arccos
        values = self.getValues()
        if self.isCosine:
            return [arccos(c) for c in values]
        else: 
            return values

    def plot(self, options = '', xfactor = 1., **kwargs):
        from matplotlib.pylab import plot,ylim
        if self.getTimeInterval().length() == 1:
            marker = 'o'
        else:
            marker = ''
        time = sorted(self.values.keys())
        plot([x/xfactor for x in time], [self.values[i] for i in time], options+marker, **kwargs)
        if self.maxValue:
            ylim(ymax = self.maxValue)
	
    def valueSorted(self):
	''' return the values after sort the keys in the indicator
    This should probably not be used: to delete''' 
        values=[]
        keys = self.values.keys()
        keys.sort()
        for key in keys:
            values.append(self.values[key]) 
        return values

    @staticmethod
	def getDLCSS(TemporalIndicator1,TemporalIndicator2, threshold, delta= np.inf , method='min' ):
	''' compute the distance between two indicators using LCSS
	two common methods are used: min or mean of the indicators length'''
        l1= TemporalIndicator1.valueSorted
		l2= TemporalIndicator2.valueSorted
		if method = 'min':
			DLCSS= 1- (LCSS(l1,l2, threshold, delta, distance))/min(len(l1),len(l2)))
		if method = 'mean':
			average= len(l1)+len(l2))/2
			DLCSS= 1- ((LCSS(l1,l2, threshold, delta, distance))/average)
		return DLCSS
		

def computeDLCSS(indicator1, indicator2, threshold, delta = float('inf'), method= 'min'):
    ''' compute the distance between two indicators using LCSS
    two common methods are used: min or mean of the indicators length'''
    from utils import LCSS

    def distance(x, y): # lambda x,y:abs(x-y)
        if x == None or y == None:
            return float('inf')
        else: 
            return abs(x-y)

    lcss = LCSS(indicator1.getValues(), indicator2.getValues(), threshold, distance, delta)
    if method == 'min':
        denominator = min(len(indicator1), len(indicator2))
    elif method == 'mean':
        denominator = float(len(indicator1) + len(indicator2))/2
    else:
        print('Unknown denominator method name')
        denominator = 1.
    return 1-float(lcss)/denominator

class SeverityIndicator(TemporalIndicator):
    '''Class for severity indicators 
    field mostSevereIsMax is True 
    if the most severe value taken by the indicator is the maximum'''

    def __init__(self, name, values, timeInterval=None, mostSevereIsMax=True, maxValue = None): 
        TemporalIndicator.__init__(self, name, values, timeInterval, maxValue)
        self.mostSevereIsMax = mostSevereIsMax

    def getMostSevereValue(self, minNInstants=1): # TODO use scoreatpercentile
        from matplotlib.mlab import find
        from numpy.core.multiarray import array
        from numpy.core.fromnumeric import mean
        values = array(self.values.values())
        indices = range(len(values))
        if len(indices) >= minNInstants:
            values = sorted(values[indices], reverse = self.mostSevereIsMax) # inverted if most severe is max -> take the first values
            return mean(values[:minNInstants])
        else:
            return None

# functions to aggregate discretized maps of indicators
# TODO add values in the cells between the positions (similar to discretizing vector graphics to bitmap)

def indicatorMap(indicatorValues, trajectory, squareSize):
    '''Returns a dictionary 
    with keys for the indices of the cells (squares)
    in which the trajectory positions are located
    at which the indicator values are attached

    ex: speeds and trajectory'''

    from numpy import floor, mean
    assert len(indicatorValues) == trajectory.length()
    indicatorMap = {}
    for k in xrange(trajectory.length()):
        p = trajectory[k]
        i = floor(p.x/squareSize)
        j = floor(p.y/squareSize)
        if indicatorMap.has_key((i,j)):
            indicatorMap[(i,j)].append(indicatorValues[k])
        else:
            indicatorMap[(i,j)] = [indicatorValues[k]]
    for k in indicatorMap.keys():
        indicatorMap[k] = mean(indicatorMap[k])
    return indicatorMap

def indicatorMapFromPolygon(value, polygon, squareSize):
    '''Fills an indicator map with the value within the polygon
    (array of Nx2 coordinates of the polygon vertices)'''
    import matplotlib.nxutils as nx
    from numpy.core.multiarray import array, arange
    from numpy import floor

    points = []
    for x in arange(min(polygon[:,0])+squareSize/2, max(polygon[:,0]), squareSize):
        for y in arange(min(polygon[:,1])+squareSize/2, max(polygon[:,1]), squareSize):
            points.append([x,y])
    inside = nx.points_inside_poly(array(points), polygon)
    indicatorMap = {}
    for i in xrange(len(inside)):
        if inside[i]:
            indicatorMap[(floor(points[i][0]/squareSize), floor(points[i][1]/squareSize))] = 0
    return indicatorMap

def indicatorMapFromAxis(value, limits, squareSize):
    '''axis = [xmin, xmax, ymin, ymax] '''
    from numpy.core.multiarray import arange
    from numpy import floor
    indicatorMap = {}
    for x in arange(limits[0], limits[1], squareSize):
        for y in arange(limits[2], limits[3], squareSize):
            indicatorMap[(floor(x/squareSize), floor(y/squareSize))] = value
    return indicatorMap

def combineIndicatorMaps(maps, squareSize, combinationFunction):
    '''Puts many indicator maps together 
    (averaging the values in each cell 
    if more than one maps has a value)'''
    #from numpy import mean
    indicatorMap = {}
    for m in maps:
        for k,v in m.iteritems():
            if indicatorMap.has_key(k):
                indicatorMap[k].append(v)
            else:
                indicatorMap[k] = [v]
    for k in indicatorMap.keys():
        indicatorMap[k] = combinationFunction(indicatorMap[k])
    return indicatorMap

if __name__ == "__main__":
    import doctest
    import unittest
    suite = doctest.DocFileSuite('tests/indicators.txt')
    unittest.TextTestRunner().run(suite)
#     #doctest.testmod()
#     #doctest.testfile("example.txt")