view python/moving.py @ 66:56fe4ef1377e

generalized map combination to different functions
author Nicolas Saunier <nicolas.saunier@polymtl.ca>
date Sun, 07 Nov 2010 01:34:43 -0500
parents 75cf537b8d88
children ded58c424783
line wrap: on
line source

#! /usr/bin/env python
'''Libraries for moving objects, trajectories...'''

import utils;

from math import sqrt, hypot;

from shapely.geometry import Polygon

__metaclass__ = type

#class MovingObject:

class Interval:
    '''Generic Interval'''
    def __init__(self, first=0, last=-1, revert = False):
        'Warning, do not revert if last<first, it contradicts the definition of empty'
        if revert and last<first:
            self.first=last
            self.last=first
        else:
            self.first=first
            self.last=last

    def __str__(self):
        return '%d %d'%(self.first, self.last)

    def empty(self):
        return self.first > self.last

    def length(self):
        '''Returns the length of the interval'''
        return max(0,self.last-self.first)

    def getList(self):
        return [self.first, self.last]

    def contains(self, instant):
        return (self.first<=instant and self.last>=instant)

    def inside(self, interval2):
        'indicates if the temporal interval of self is comprised in interval2'
        return (self.first >= interval2.first) and (self.last <= interval2.last)

    def union(self, interval2):
        '''Largest interval comprising self and interval2'''
        return TimeInterval(min(self.first, interval2.first), max(self.last, interval2.last))
        
    def intersection(self, interval2):
        '''Largest interval comprising self and interval2'''
        return TimeInterval(max(self.first, interval2.first), min(self.last, interval2.last))


class TimeInterval(Interval):
    '''Temporal interval'''

    def __init__(self, first=0, last=-1):
        Interval.__init__(self, first, last, False)

    def __getitem__(self, i):
        if not self.empty():
            return self.first+i

    def __iter__(self):
        self.iterInstantNum = 0
        return self

    def next(self):
        if self.iterInstantNum >= self.length():
            raise StopIteration
        else:
            self.iterInstantNum += 1
            return self[self.iterInstantNum]

    def length(self):
        '''Returns the length of the interval'''
        return max(0,self.last-self.first+1)

# class BoundingPolygon:
#     '''Class for a polygon bounding a set of points
#     with methods to create intersection, unions...
#     '''
# We will use the polygon class of Shapely

class STObject:
    '''Class for spatio-temporal object
    i.e. with temporal and spatial existence 
    (time interval and bounding polygon for positions (e.g. rectangle)).
    It does not mean that the object is defined 
    for all time instants within the time interval'''

    def __init__(self, num = None, timeInterval = None, boundingPolygon = None):
        self.num = num
        self.timeInterval = timeInterval
        self.boundingPolygon = boundingPolygon

    def empty(self):
        return self.timeInterval.empty() or not self.boudingPolygon

    def getFirstInstant(self):
        return self.timeInterval.first

    def getLastInstant(self):
        return self.timeInterval.last

    def getTimeInterval(self):
        return self.timeInterval

    def commonTimeInterval(self, obj2):
        return self.getTimeInterval().intersection(obj2.getTimeInterval())

class Point:
    def __init__(self, x, y):
        self.x = x
        self.y = y

    def __str__(self):
        return '(%f,%f)'%(self.x,self.y)

    def __repr__(self):
        return str(self)

    def __sub__(self, other):
        return Point(self.x-other.x, self.y-other.y)

    def draw(self, options = ''):
        from matplotlib.pylab import plot
        plot([self.x], [self.y], 'x'+options)

    def norm2Squared(self):
        '''2-norm distance (Euclidean distance)'''
        return self.x*self.x+self.y*self.y

    def norm2(self):
        '2-norm distance (Euclidean distance)'
        return sqrt(self.norm2Squared())

    def aslist(self):
        return [self.x, self.y]

    @staticmethod
    def distanceNorm2(p1, p2):
        return (p1-p2).norm2()

    @staticmethod
    def plotAll(points, color='r'):
        from matplotlib.pyplot import scatter
        scatter([p.x for p in points],[p.y for p in points], c=color)


class Trajectory:
    '''Class for trajectories
    i.e. a temporal sequence of positions

    the class is iterable.'''

    def __init__(self, positions):
        self.positions = positions

    @staticmethod
    def load(line1, line2):
        return Trajectory([[float(n) for n in line1.split(' ')],
                           [float(n) for n in line2.split(' ')]])

    def __str__(self):
        return ' '.join([self.__getitem__(i).__str__() for i in xrange(self.length())])

    def __getitem__(self, i):
        return Point(self.positions[0][i], self.positions[1][i])

    def __iter__(self):
        self.iterInstantNum = 0
        return self

    def next(self):
        if self.iterInstantNum >= self.length():
            raise StopIteration
        else:
            self.iterInstantNum += 1
            return self[self.iterInstantNum-1]

    def addPosition(self, p):
        if not self.positions:
            self.positions = [[p.x],[p.y]]
        else:
            self.positions[0].append(p.x)
            self.positions[1].append(p.y)

    def draw(self, options = ''):
        from matplotlib.pylab import plot
        plot(self.positions[0], self.positions[1], options)

    def length(self):
        return len(self.positions[0])

    def getXCoordinates(self):
        return self.positions[0]

    def getYCoordinates(self):
        return self.positions[1]
    
    def xBounds(self):
        # look for function that does min and max in one pass
        return [min(self.getXCoordinates()), max(self.getXCoordinates())]
    
    def yBounds(self):
        # look for function that does min and max in one pass
        return [min(self.getYCoordinates()), max(self.getYCoordinates())]
    
    def add(self, traj2):
        '''Returns a new trajectory of the same length'''
        if self.length() != traj2.length():
            print 'Trajectories of different lengths'
            return None
        else:
            return Trajectory([[a+b for a,b in zip(self.getXCoordinates(),traj2.getXCoordinates())],
                               [a+b for a,b in zip(self.getYCoordinates(),traj2.getYCoordinates())]])

    def subtract(self, traj2):
        '''Returns a new trajectory of the same length'''
        if self.length() != traj2.length():
            print 'Trajectories of different lengths'
            return None
        else:
            return Trajectory([[a-b for a,b in zip(self.getXCoordinates(),traj2.getXCoordinates())],
                               [a-b for a,b in zip(self.getYCoordinates(),traj2.getYCoordinates())]])

    def norm(self):
        '''Returns the list of the norms at each instant'''
#        def add(x, y): return x+y
#        sq = map(add, [x*x for x in self.positions[0]], [y*y for y in self.positions[1]])
#        return sqrt(sq)
        return [hypot(x,y) for x,y in zip(self.positions[0], self.positions[1])]

    def cumulatedDisplacement(self):
        displacement = 0
        for i in xrange(self.length()-1):
            displacement += Point.distanceNorm2(self.__getitem__(i),self.__getitem__(i+1))
        return displacement

    def wiggliness(self):
        return self.cumulatedDisplacement()/float(Point.distanceNorm2(self.__getitem__(0),self.__getitem__(self.length()-1)))

    def getTrajectoryInInterval(self, inter):
        if inter.first >=0 and inter.last<= self.length():
            return Trajectory([self.positions[0][inter.first:inter.last],
                               self.positions[1][inter.first:inter.last]])
        else:
            return None
    
    def getTrajectoryInPolygon(self, polygon):
        'Returns the set of points inside the polygon'
        # use shapely polygon contains
        pass

##################
# Moving Objects
##################

userTypeNames = ['car',
                 'pedestrian',
                 'twowheels',
                 'bus'
                 'truck']

class MovingObject(STObject):
    '''Class for moving objects
    i.e. with a trajectory and a geometry (volume) (constant)
    and a usertype (e.g. road user)
    '''

    def __init__(self, num = None, timeInterval = None, positions = None, geometry = None, userType = None):
        STObject.__init__(self, num, timeInterval)
        self.positions = positions
        self.geometry = geometry
        self.userType = userType
        # compute bounding polygon from trajectory

    def getObjectInTimeInterval(self, inter):
        '''Returns a new object extracted from self,
        restricted to time interval inter'''
        if inter.inside(self.timeInterval):
            inter = TimeInterval(inter.first-self.getFirstInstant(), inter.last-self.getFirstInstant())
            obj = MovingObject(self.num, inter, self.positions.getTrajectoryInInterval(inter), self.geometry, self.userType)
            if self.velocities:
                obj.velocities = self.velocities.getTrajectoryInInterval(inter)
            return obj
        else:
            print 'The object does not exist at '+str(inter)
            return None

    def length(self):
        return self.timeInterval.length()

    def getPositions(self):
        return self.positions

    def getVelocities(self):
        return self.velocities

    def getSpeeds(self):
        return self.getVelocities().norm()

    def getPositionAt(self, i):
        return self.positions[i]

    def getVelocityAt(self, i):
        return self.velocities[i]

    def getXCoordinates(self):
        return self.positions.getXCoordinates()
    
    def getYCoordinates(self):
        return self.positions.getYCoordinates()
    
    def draw(self, options = ''):
        self.positions.draw(options)

    def getInstantPassingLane(self, p1, p2):
        '''Returns the instant(s)
        at which the object passes from one side of the segment to the other
        empty list if there is no crossing'''
        instants = []

        for i in xrange(self.length()-1):
            p = utils.segmentIntersection(self.positions[i], self.positions[i+1], p1, p2)
            if p:
                if self.positions[i].x != self.positions[i+1].x:
                    ratio = (p.x-self.positions[i].x)/(self.positions[i+1].x-self.positions[i].x)
                elif self.positions[i].y != self.positions[i+1].y:
                    ratio = (p.y-self.positions[i].y)/(self.positions[i+1].y-self.positions[i].y)
                else:
                    ratio = 0
                instants.append(self.timeInterval[i]*(1-ratio)+ratio*self.timeInterval[i+1])
        return instants

    # def computeVelocities(self):

def plotRoadUsers(objects, colors):
    '''Colors is a PlottingPropertyValues instance'''
    from matplotlib.pyplot import figure, axis
    figure()
    for obj in objects:
        obj.draw(colors.get(obj.userType))
    axis('equal')


# need for a class representing the indicators, their units, how to print them in graphs...
class TemporalIndicator:
    '''Class for temporal indicators
    i.e. indicators that take a value at specific instants

    values should be
    * a dict, for the values at specific time instants
    * or a list with a time interval object if continuous measurements

    it should have more information like name, unit'''
    
    def __init__(self, name, values, timeInterval=None):
        self.name = name
        self.values = values
        self.timeInterval = timeInterval

class SeverityIndicator(TemporalIndicator):
    '''Class for severity indicators 
    field mostSevereIsMax is True 
    if the most severe value taken by the indicator is the maximum'''

    def __init__(self, name, values, mostSevereIsMax=True, ignoredValue = None): 
        # , timeInterval=None # implement later
        TemporalIndicator.__init__(self, name, values, timeInterval=None)
        self.mostSevereIsMax = mostSevereIsMax
        self.ignoredValue = ignoredValue

    def getMostSevereValue(self, minNInstants=1):
        from matplotlib.mlab import find
        from numpy.core.multiarray import array
        from numpy.core.fromnumeric import mean
        values = array(self.values.values())
        if self.ignoredValue:
            indices = find(values != self.ignoredValue)
        else:
            indices = range(len(values))
        if len(indices) >= minNInstants:
            values = sorted(values[indices], reverse = self.mostSevereIsMax) # inverted if most severe is max -> take the first values
            return mean(values[:minNInstants])
        else:
            return None

def indicatorMap(indicatorValues, trajectory, squareSize):
    '''Returns a dictionary 
    with keys for the indices of the cells (squares)
    in which the trajectory positions are located
    at which the indicator values are attached'''
    from numpy import floor, mean
    assert len(indicatorValues) == trajectory.length()
    indicatorMap = {}
    for k in xrange(trajectory.length()):
        p = trajectory[k]
        i = floor(p.x/squareSize)
        j = floor(p.y/squareSize)
        if indicatorMap.has_key((i,j)):
            indicatorMap[(i,j)].append(indicatorValues[k])
        else:
            indicatorMap[(i,j)] = [indicatorValues[k]]
    for k in indicatorMap.keys():
        indicatorMap[k] = mean(indicatorMap[k])
    return indicatorMap

def combineIndicatorMaps(maps, squareSize, combinationFunction):
    '''Puts many indicator maps together 
    (averaging the values in each cell 
    if more than one maps has a value)'''
    #from numpy import mean
    indicatorMap = {}
    for m in maps:
        for k,v in m.iteritems():
            if indicatorMap.has_key(k):
                indicatorMap[k].append(v)
            else:
                indicatorMap[k] = [v]
    for k in indicatorMap.keys():
        indicatorMap[k] = combinationFunction(indicatorMap[k])
    return indicatorMap

if __name__ == "__main__":
    import doctest
    import unittest
    suite = doctest.DocFileSuite('tests/moving.txt')
    #suite = doctest.DocTestSuite()
    unittest.TextTestRunner().run(suite)
    #doctest.testmod()
    #doctest.testfile("example.txt")