view python/moving.py @ 248:571ba5ed22e2

added utils for bus processing
author Nicolas Saunier <nicolas.saunier@polymtl.ca>
date Wed, 18 Jul 2012 02:54:22 -0400
parents bd8ab323c198
children 99173da7afae
line wrap: on
line source

#! /usr/bin/env python
'''Libraries for moving objects, trajectories...'''

import utils;
import cvutils;

from math import sqrt;

#from shapely.geometry import Polygon

__metaclass__ = type

class Interval:
    '''Generic Interval'''
    def __init__(self, first=0, last=-1, revert = False):
        'Warning, do not revert if last<first, it contradicts the definition of empty'
        if revert and last<first:
            self.first=last
            self.last=first
        else:
            self.first=first
            self.last=last

    def __str__(self):
        return '[{0}, {1}]'.format(self.first, self.last)

    def __repr__(self):
        return self.__str__()

    def empty(self):
        return self.first > self.last

    def length(self):
        '''Returns the length of the interval'''
        return float(max(0,self.last-self.first))

    def equal(self, i2):
        return self.first==i2.first and self.last == i2.last

    def getList(self):
        return [self.first, self.last]

    def contains(self, instant):
        return (self.first<=instant and self.last>=instant)

    def inside(self, interval2):
        'indicates if the temporal interval of self is comprised in interval2'
        return (self.first >= interval2.first) and (self.last <= interval2.last)

    def union(self, interval2):
        '''Smallest interval comprising self and interval2'''
        return TimeInterval(min(self.first, interval2.first), max(self.last, interval2.last))
        
    def intersection(self, interval2):
        '''Largest interval comprised in both self and interval2'''
        return TimeInterval(max(self.first, interval2.first), min(self.last, interval2.last))

    def distance(self, interval2):
        if not self.intersection(interval2).empty():
            return 0
        elif self.first > interval2.last:
            return self.first - interval2.last
        elif self.last < interval2.first:
            return interval2.first - self.last
        else:
            return None


def unionIntervals(intervals):
    'returns the smallest interval containing all intervals'
    inter = intervals[0]
    for i in intervals[1:]:
        inter = inter.union(i)
    return inter


class TimeInterval(Interval):
    '''Temporal interval based on frame numbers (hence the modified length method)
    may be modified directly by setting first and last'''

    def __init__(self, first=0, last=-1):
        Interval.__init__(self, first, last, False)

    def __getitem__(self, i):
        if not self.empty():
            return self.first+i

    def __iter__(self):
        self.iterInstantNum = -1
        return self

    def next(self):
        if self.iterInstantNum >= self.length()-1:
            raise StopIteration
        else:
            self.iterInstantNum += 1
            return self[self.iterInstantNum]

    def length(self):
        '''Returns the length of the interval'''
        return float(max(0,self.last-self.first+1))

# class BoundingPolygon:
#     '''Class for a polygon bounding a set of points
#     with methods to create intersection, unions...
#     '''
# We will use the polygon class of Shapely

class STObject:
    '''Class for spatio-temporal object
    i.e. with temporal and spatial existence 
    (time interval and bounding polygon for positions (e.g. rectangle)).
    It does not mean that the object is defined 
    for all time instants within the time interval'''

    def __init__(self, num = None, timeInterval = None, boundingPolygon = None):
        self.num = num
        self.timeInterval = timeInterval
        self.boundingPolygon = boundingPolygon

    def empty(self):
        return self.timeInterval.empty() or not self.boudingPolygon

    def getId(self):
        return self.num

    def getFirstInstant(self):
        return self.timeInterval.first

    def getLastInstant(self):
        return self.timeInterval.last

    def getTimeInterval(self):
        return self.timeInterval

    def existsAtInstant(self, t):
        return self.timeInterval.contains(t)

    def commonTimeInterval(self, obj2):
        return self.getTimeInterval().intersection(obj2.getTimeInterval())

class Point:
    def __init__(self, x, y):
        self.x = x
        self.y = y

    def __str__(self):
        return '(%f,%f)'%(self.x,self.y)

    def __repr__(self):
        return self.__str__()

    def __add__(self, other):
        return Point(self.x+other.x, self.y+other.y)

    def __sub__(self, other):
        return Point(self.x-other.x, self.y-other.y)

    def multiply(self, alpha):
        return Point(self.x*alpha, self.y*alpha)

    def draw(self, options = 'o', **kwargs):
        from matplotlib.pylab import plot
        plot([self.x], [self.y], options, **kwargs)

    def norm2Squared(self):
        '''2-norm distance (Euclidean distance)'''
        return self.x*self.x+self.y*self.y

    def norm2(self):
        '2-norm distance (Euclidean distance)'
        return sqrt(self.norm2Squared())

    def aslist(self):
        return [self.x, self.y]

    def astuple(self):
        return (self.x, self.y)

    def asint(self):
        return Point(int(self.x), int(self.y))

    def project(self, homography):
        from numpy.core.multiarray import array
        projected = cvutils.projectArray(homography, array([[self.x], [self.y]]))
        return Point(projected[0], projected[1])

    def inPolygon(self, poly):
        '''Returns if the point x, y is inside the polygon.
        The polygon is defined by the ordered list of points in poly
        
        taken from http://www.ariel.com.au/a/python-point-int-poly.html

        Use points_inside_poly from matplotlib.nxutils'''

        n = len(poly);
        counter = 0;

        p1 = poly[0];
        for i in range(n+1):
            p2 = poly[i % n];
            if self.y > min(p1.y,p2.y):
                if self.y <= max(p1.y,p2.y):
                    if self.x <= max(p1.x,p2.x):
                        if p1.y != p2.y:
                            xinters = (self.y-p1.y)*(p2.x-p1.x)/(p2.y-p1.y)+p1.x;
                        if p1.x == p2.x or self.x <= xinters:
                            counter+=1;
            p1=p2
        return (counter%2 == 1);

    @staticmethod
    def dot(p1, p2):
        'Scalar product'
        return p1.x*p2.x+p1.y*p2.y

    @staticmethod
    def cross(p1, p2):
        'Cross product'
        return p1.x*p2.y-p1.y*p2.x

    @staticmethod
    def distanceNorm2(p1, p2):
        return (p1-p2).norm2()

    @staticmethod
    def plotAll(points, color='r'):
        from matplotlib.pyplot import scatter
        scatter([p.x for p in points],[p.y for p in points], c=color)


def predictPosition(nTimeSteps, initialPosition, initialVelocity, initialAcceleration = Point(0,0)):
    '''Predicts the position in nTimeSteps at constant speed/acceleration'''
    return initalPosition+velocity.multiply(nTimeSteps) + initialAcceleration.multiply(nTimeSteps**2)


class FlowVector:
    '''Class to represent 4-D flow vectors,
    ie a position and a velocity'''
    def __init__(self, position, velocity):
        'position and velocity should be Point instances'
        self.position = position
        self.velocity = velocity

    def __add__(self, other):
        return FlowVector(self.position+other.position, self.velocity+other.velocity)

    def multiply(self, alpha):
        return FlowVector(self.position.multiply(alpha), self.velocity.multiply(alpha))

    def draw(self, options = '', **kwargs):
        from matplotlib.pylab import plot
        plot([self.position.x, self.position.x+self.velocity.x], [self.position.y, self.position.y+self.velocity.y], options, **kwargs)
        self.position.draw(options+'x', **kwargs)
    
    @staticmethod
    def similar(f1, f2, maxDistance2, maxDeltavelocity2):
        return (f1.position-f2.position).norm2Squared()<maxDistance2 and (f1.velocity-f2.velocity).norm2Squared()<maxDeltavelocity2

def segmentIntersection(p1, p2, p3, p4):
    '''Returns the intersecting point of the segments [p1, p2] and [p3, p4], None otherwise'''
    from numpy import matrix
    from numpy.linalg import linalg, det

    dp1 = p2-p1#[s1[0][1]-s1[0][0], s1[1][1]-s1[1][0]]
    dp2 = p4-p3#[s2[0][1]-s2[0][0], s2[1][1]-s2[1][0]]

    A = matrix([[dp1.y, -dp1.x],
                [dp2.y, -dp2.x]])
    B = matrix([[dp1.y*p1.x-dp1.x*p1.y],
                [dp2.y*p3.x-dp2.x*p3.y]])

    if linalg.det(A) == 0:#crossProduct(ds1, ds2) == 0:
        return None
    else:
        intersection = linalg.solve(A,B)
        if (utils.inBetween(p1.x, p2.x, intersection[0,0])
            and utils.inBetween(p3.x, p4.x, intersection[0,0])
            and utils.inBetween(p1.y, p2.y, intersection[1,0])
            and utils.inBetween(p3.y, p4.y, intersection[1,0])):
            return Point(intersection[0,0], intersection[1,0])
        else:
            return None

class Trajectory:
    '''Class for trajectories
    i.e. a temporal sequence of positions

    the class is iterable.'''

    def __init__(self, positions=None):
        if positions != None:
            self.positions = positions
        else:
            self.positions = [[],[]]

    @staticmethod
    def load(line1, line2):
        return Trajectory([[float(n) for n in line1.split(' ')],
                           [float(n) for n in line2.split(' ')]])

    def __str__(self):
        return ' '.join([self.__getitem__(i).__str__() for i in xrange(self.length())])

    def __repr__(self):
        return str(self)

    def __getitem__(self, i):
        return Point(self.positions[0][i], self.positions[1][i])

    def __iter__(self):
        self.iterInstantNum = 0
        return self

    def next(self):
        if self.iterInstantNum >= self.length():
            raise StopIteration
        else:
            self.iterInstantNum += 1
            return self[self.iterInstantNum-1]

    def length(self):
        return len(self.positions[0])

    def addPositionXY(self, x, y):
        self.positions[0].append(x)
        self.positions[1].append(y)

    def addPosition(self, p):
        self.addPositionXY(p.x, p.y)

    @staticmethod
    def _draw(positions, options = '', withOrigin = False, lastCoordinate = None, **kwargs):
        from matplotlib.pylab import plot
        if lastCoordinate == None:
            plot(positions[0], positions[1], options, **kwargs)
        elif 0 <= lastCoordinate <= len(positions[0]):
            plot(positions[0][:lastCoordinate], positions[1][:lastCoordinate], options, **kwargs)
        if withOrigin:
            plot([positions[0][0]], [positions[1][0]], 'ro', **kwargs)

    def project(self, homography):
        from numpy.core.multiarray import array
        projected = cvutils.projectArray(homography, array(self.positions))
        return Trajectory(projected)

    def draw(self, options = '', withOrigin = False, **kwargs):
        Trajectory._draw(self.positions, options, withOrigin,**kwargs)

    def drawAt(self, lastCoordinate, options = '', withOrigin = False, **kwargs):
        Trajectory._draw(self.positions, options, withOrigin, lastCoordinate, **kwargs)

    def drawOnWorldImage(self, nPixelsPerUnitDistance, imageHeight, options = '', withOrigin = False, **kwargs):
        from matplotlib.pylab import plot
        imgPositions = [[x*nPixelsPerUnitDistance for x in self.positions[0]],
                        [-x*nPixelsPerUnitDistance+imageHeight for x in self.positions[1]]]
        Trajectory._draw(imgPositions, options, withOrigin, **kwargs)

    def getXCoordinates(self):
        return self.positions[0]

    def getYCoordinates(self):
        return self.positions[1]

    def asArray(self):
        from numpy.core.multiarray import array
        return array(self.positions)
    
    def xBounds(self):
        # look for function that does min and max in one pass
        return [min(self.getXCoordinates()), max(self.getXCoordinates())]
    
    def yBounds(self):
        # look for function that does min and max in one pass
        return [min(self.getYCoordinates()), max(self.getYCoordinates())]
    
    def add(self, traj2):
        '''Returns a new trajectory of the same length'''
        if self.length() != traj2.length():
            print 'Trajectories of different lengths'
            return None
        else:
            return Trajectory([[a+b for a,b in zip(self.getXCoordinates(),traj2.getXCoordinates())],
                               [a+b for a,b in zip(self.getYCoordinates(),traj2.getYCoordinates())]])

    def subtract(self, traj2):
        '''Returns a new trajectory of the same length'''
        if self.length() != traj2.length():
            print 'Trajectories of different lengths'
            return None
        else:
            return Trajectory([[a-b for a,b in zip(self.getXCoordinates(),traj2.getXCoordinates())],
                               [a-b for a,b in zip(self.getYCoordinates(),traj2.getYCoordinates())]])

    def norm(self):
        '''Returns the list of the norms at each instant'''
#        def add(x, y): return x+y
#        sq = map(add, [x*x for x in self.positions[0]], [y*y for y in self.positions[1]])
#        return sqrt(sq)
        from numpy import hypot
        return hypot(self.positions[0], self.positions[1])

    def cumulatedDisplacement(self):
        'Returns the sum of the distances between each successive point'
        displacement = 0
        for i in xrange(self.length()-1):
            displacement += Point.distanceNorm2(self.__getitem__(i),self.__getitem__(i+1))
        return displacement

    def wiggliness(self):
        return self.cumulatedDisplacement()/float(Point.distanceNorm2(self.__getitem__(0),self.__getitem__(self.length()-1)))

    def getIntersections(self, p1, p2):
        '''Returns a list of the indices at which the trajectory 
        intersects with the segment of extremities p1 and p2 
        the list is empty if there is no crossing'''
        indices = []

        for i in xrange(self.length()-1):
            q1=self.__getitem__(i)
            q2=self.__getitem__(i+1)
            p = utils.segmentIntersection(q1, q2, p1, p2)
            if p:
                if q1.x != q2.x:
                    ratio = (p.x-q1.x)/(q2.x-q1.x)
                elif q1.y != q2.y:
                    ratio = (p.y-q1.y)/(q2.y-q1.y)
                else:
                    ratio = 0
                indices.append(i+ratio)
        return indices

    def getTrajectoryInInterval(self, inter):
        if inter.first >=0 and inter.last<= self.length():
            return Trajectory([self.positions[0][inter.first:inter.last],
                               self.positions[1][inter.first:inter.last]])
        else:
            return None
    
    def getTrajectoryInPolygon(self, polygon):
        '''Returns the set of points inside the polygon
        (array of Nx2 coordinates of the polygon vertices)'''
        import matplotlib.nxutils as nx
        traj = Trajectory()
        result = nx.points_inside_poly(self.asArray().T, polygon)
        for i in xrange(self.length()):
            if result[i]:
                traj.addPositionXY(self.positions[0][i], self.positions[1][i])
        return traj

    # version 2: use shapely polygon contains

##################
# Moving Objects
##################

userTypeNames = ['car',
                 'pedestrian',
                 'twowheels',
                 'bus',
                 'truck']

userType2Num = utils.inverseEnumeration(userTypeNames)

class MovingObject(STObject):
    '''Class for moving objects
    i.e. with a trajectory and a geometry (volume) (constant)
    and a usertype (e.g. road user)
    '''

    def __init__(self, num = None, timeInterval = None, positions = None, geometry = None, userType = None):
        STObject.__init__(self, num, timeInterval)
        self.positions = positions
        self.geometry = geometry
        self.userType = userType
        # compute bounding polygon from trajectory

    def getObjectInTimeInterval(self, inter):
        '''Returns a new object extracted from self,
        restricted to time interval inter'''
        intersection = inter.intersection(self.getTimeInterval())
        if not intersection.empty():
            trajectoryInterval = TimeInterval(intersection.first-self.getFirstInstant(), intersection.last-self.getFirstInstant())
            obj = MovingObject(self.num, intersection, self.positions.getTrajectoryInInterval(trajectoryInterval), self.geometry, self.userType)
            if self.velocities:
                obj.velocities = self.velocities.getTrajectoryInInterval(trajectoryInterval)
            return obj
        else:
            print 'The object does not exist at '+str(inter)
            return None

    def length(self):
        return self.timeInterval.length()

    def getPositions(self):
        return self.positions

    def getVelocities(self):
        return self.velocities

    def getSpeeds(self):
        return self.getVelocities().norm()

    def getPositionAt(self, i):
        return self.positions[i]

    def getVelocityAt(self, i):
        return self.velocities[i]

    def getPositionAtInstant(self, i):
        return self.positions[i-self.getFirstInstant()]

    def getVelocityAtInstant(self, i):
        return self.velocities[i-self.getFirstInstant()]

    def getXCoordinates(self):
        return self.positions.getXCoordinates()
    
    def getYCoordinates(self):
        return self.positions.getYCoordinates()
    
    def draw(self, options = '', withOrigin = False, **kwargs):
        self.positions.draw(options, withOrigin, **kwargs)

    def drawOnWorldImage(self, nPixelsPerUnitDistance, imageHeight, options = '', withOrigin = False, **kwargs):
        self.positions.drawOnWorldImage(nPixelsPerUnitDistance, imageHeight, options, withOrigin, **kwargs)

    def play(self, videoFilename, homography = None):
        cvutils.displayTrajectories(videoFilename, [self], homography, self.getFirstInstant(), self.getLastInstant())

    def getInstantsCrossingLane(self, p1, p2):
        '''Returns the instant(s)
        at which the object passes from one side of the segment to the other
        empty list if there is no crossing'''
        indices = self.positions.getIntersections(p1, p2)
        return [t+self.getFirstInstant() for t in indices]

    def predictPosition(self, instant, nTimeSteps, externalAcceleration = Point(0,0)):
        '''Predicts the position of object at instant+deltaT, 
        at constant speed'''
        return predictPosition(nTimeSteps, self.getPositionAtInstant(instant), self.getVelocityAtInstant(instant), externalAcceleration)

    @staticmethod
    def collisionCourseDotProduct(movingObject1, movingObject2, instant):
        'A positive result indicates that the road users are getting closer'
        deltap = movingObject1.getPositionAtInstant(instant)-movingObject2.getPositionAtInstant(instant)
        deltav = movingObject2.getVelocityAtInstant(instant)-movingObject1.getVelocityAtInstant(instant)
        return Point.dot(deltap, deltav)

    @staticmethod
    def collisionCourseCosine(movingObject1, movingObject2, instant):
        'A positive result indicates that the road users are getting closer'
        deltap = movingObject1.getPositionAtInstant(instant)-movingObject2.getPositionAtInstant(instant)
        deltav = movingObject2.getVelocityAtInstant(instant)-movingObject1.getVelocityAtInstant(instant)
        return Point.dot(deltap, deltav)/(deltap.norm2()*deltav.norm2())

def plotRoadUsers(objects, colors):
    '''Colors is a PlottingPropertyValues instance'''
    from matplotlib.pyplot import figure, axis
    figure()
    for obj in objects:
        obj.draw(colors.get(obj.userType))
    axis('equal')


if __name__ == "__main__":
    import doctest
    import unittest
    suite = doctest.DocFileSuite('tests/moving.txt')
    #suite = doctest.DocTestSuite()
    unittest.TextTestRunner().run(suite)
    #doctest.testmod()
    #doctest.testfile("example.txt")