view python/indicators.py @ 247:8f0ed138d373

moved the tests for indicators
author Nicolas Saunier <nicolas.saunier@polymtl.ca>
date Wed, 18 Jul 2012 02:54:02 -0400
parents 5027c174ab90
children 7a3bf04cf016
line wrap: on
line source

#! /usr/bin/env python
'''Class for indicators, temporal indicators, and safety indicators'''

__metaclass__ = type

# need for a class representing the indicators, their units, how to print them in graphs...
class TemporalIndicator:
    '''Class for temporal indicators
    i.e. indicators that take a value at specific instants

    values should be
    * a dict, for the values at specific time instants
    * or a list with a time interval object if continuous measurements

    it should have more information like name, unit'''
    
    def __init__(self, name, values, timeInterval=None):
        self.name = name
        self.isCosine = name.find('Cosine')
        self.values = values
        self.timeInterval = timeInterval
        if timeInterval:
            assert len(values) == timeInterval.length()

    def empty(self):
        return len(self.values) == 0

    def __getitem__(self, i):
        if self.timeInterval:
            if self.timeInterval.contains(i):
                return self.values[i-self.timeInterval.first]
        else:
            if i in self.values.keys():
                return self.values[i]
        return None # default

    def __iter__(self):
        self.iterInstantNum = 0 # index in the interval or keys of the dict
        return self

    def next(self):
        if self.iterInstantNum >= len(self.values):#(self.timeInterval and self.iterInstantNum>=self.timeInterval.length())\
           #     or (self.iterInstantNum >= self.values)
            raise StopIteration
        else:
            self.iterInstantNum += 1
            if self.timeInterval:
                return self.values[self.iterInstantNum-1]
            else:
                return self.values.values()[self.iterInstantNum-1]

    def getTimeInterval(self):
        if not self.timeInterval and type(self.values)==dict:
            instants = self.values.keys()
            if instants:
                self.timeInterval = TimeInterval(instants[0], instants[-1])
            else:
                self.timeInterval = TimeInterval()
        return self.timeInterval

    def getValues(self):
        if self.timeInterval:
            return self.values
        else:
            return self.values.values()

    def getAngleValues(self):
        '''if the indicator is a function of an angle, 
        transform it to an angle (eg cos)
        (no transformation otherwise)'''
        from numpy import arccos
        values = self.getValues()
        if self.isCosine >= 0:
            return [arccos(c) for c in values]
        else: 
            return values

class SeverityIndicator(TemporalIndicator):
    '''Class for severity indicators 
    field mostSevereIsMax is True 
    if the most severe value taken by the indicator is the maximum'''

    def __init__(self, name, values, timeInterval=None, mostSevereIsMax=True, ignoredValue = None): 
        TemporalIndicator.__init__(self, name, values, timeInterval)
        self.mostSevereIsMax = mostSevereIsMax
        self.ignoredValue = ignoredValue

    def getMostSevereValue(self, minNInstants=1):
        from matplotlib.mlab import find
        from numpy.core.multiarray import array
        from numpy.core.fromnumeric import mean
        values = array(self.values.values())
        if self.ignoredValue:
            indices = find(values != self.ignoredValue)
        else:
            indices = range(len(values))
        if len(indices) >= minNInstants:
            values = sorted(values[indices], reverse = self.mostSevereIsMax) # inverted if most severe is max -> take the first values
            return mean(values[:minNInstants])
        else:
            return None

# functions to aggregate discretized maps of indicators
# TODO add values in the cells between the positions (similar to discretizing vector graphics to bitmap)

def indicatorMap(indicatorValues, trajectory, squareSize):
    '''Returns a dictionary 
    with keys for the indices of the cells (squares)
    in which the trajectory positions are located
    at which the indicator values are attached

    ex: speeds and trajectory'''

    from numpy import floor, mean
    assert len(indicatorValues) == trajectory.length()
    indicatorMap = {}
    for k in xrange(trajectory.length()):
        p = trajectory[k]
        i = floor(p.x/squareSize)
        j = floor(p.y/squareSize)
        if indicatorMap.has_key((i,j)):
            indicatorMap[(i,j)].append(indicatorValues[k])
        else:
            indicatorMap[(i,j)] = [indicatorValues[k]]
    for k in indicatorMap.keys():
        indicatorMap[k] = mean(indicatorMap[k])
    return indicatorMap

def indicatorMapFromPolygon(value, polygon, squareSize):
    '''Fills an indicator map with the value within the polygon
    (array of Nx2 coordinates of the polygon vertices)'''
    import matplotlib.nxutils as nx
    from numpy.core.multiarray import array, arange
    from numpy import floor

    points = []
    for x in arange(min(polygon[:,0])+squareSize/2, max(polygon[:,0]), squareSize):
        for y in arange(min(polygon[:,1])+squareSize/2, max(polygon[:,1]), squareSize):
            points.append([x,y])
    inside = nx.points_inside_poly(array(points), polygon)
    indicatorMap = {}
    for i in xrange(len(inside)):
        if inside[i]:
            indicatorMap[(floor(points[i][0]/squareSize), floor(points[i][1]/squareSize))] = 0
    return indicatorMap

def indicatorMapFromAxis(value, limits, squareSize):
    '''axis = [xmin, xmax, ymin, ymax] '''
    from numpy.core.multiarray import arange
    from numpy import floor
    indicatorMap = {}
    for x in arange(limits[0], limits[1], squareSize):
        for y in arange(limits[2], limits[3], squareSize):
            indicatorMap[(floor(x/squareSize), floor(y/squareSize))] = value
    return indicatorMap

def combineIndicatorMaps(maps, squareSize, combinationFunction):
    '''Puts many indicator maps together 
    (averaging the values in each cell 
    if more than one maps has a value)'''
    #from numpy import mean
    indicatorMap = {}
    for m in maps:
        for k,v in m.iteritems():
            if indicatorMap.has_key(k):
                indicatorMap[k].append(v)
            else:
                indicatorMap[k] = [v]
    for k in indicatorMap.keys():
        indicatorMap[k] = combinationFunction(indicatorMap[k])
    return indicatorMap

if __name__ == "__main__":
    import doctest
    import unittest
    suite = doctest.DocFileSuite('tests/indicators.txt')
    unittest.TextTestRunner().run(suite)
#     #doctest.testmod()
#     #doctest.testfile("example.txt")