Mercurial Hosting > traffic-intelligence
view python/moving.py @ 43:6d11d9e7ad4e
methods for trajectories and objects
to add and subtract trajectories
and extract objects from existing objects, for sub-time intervals
author | Nicolas Saunier <nicolas.saunier@polymtl.ca> |
---|---|
date | Wed, 14 Jul 2010 14:02:11 -0400 |
parents | eb78c6edc0c8 |
children | 1fb5606506ae |
line wrap: on
line source
#! /usr/bin/env python '''Libraries for moving objects, trajectories...''' import utils; from math import sqrt, hypot; from shapely.geometry import Polygon __metaclass__ = type #class MovingObject: class Interval: '''Generic Interval''' def __init__(self, first=0, last=-1, revert = False): 'Warning, do not revert if last<first, it contradicts the definition of empty' if revert and last<first: self.first=last self.last=first else: self.first=first self.last=last def __str__(self): return '%d %d'%(self.first, self.last) def empty(self): return self.first > self.last def length(self): '''Returns the length of the interval''' return max(0,self.last-self.first) def getList(self): return [self.first, self.last] def contains(self, instant): return (self.first<=instant and self.last>=instant) def inside(self, interval2): 'indicates if the temporal interval of self is comprised in interval2' return (self.first >= interval2.first) and (self.last <= interval2.last) def union(self, interval2): '''Largest interval comprising self and interval2''' return TimeInterval(min(self.first, interval2.first), max(self.last, interval2.last)) def intersection(self, interval2): '''Largest interval comprising self and interval2''' return TimeInterval(max(self.first, interval2.first), min(self.last, interval2.last)) class TimeInterval(Interval): '''Temporal interval''' def __init__(self, first=0, last=-1): Interval.__init__(self, first, last, False) def __getitem__(self, i): if not self.empty(): return self.first+i def __iter__(self): self.iterInstantNum = 0 return self def next(self): if self.iterInstantNum >= self.length(): raise StopIteration else: self.iterInstantNum += 1 return self[self.iterInstantNum] def length(self): '''Returns the length of the interval''' return max(0,self.last-self.first+1) # class BoundingPolygon: # '''Class for a polygon bounding a set of points # with methods to create intersection, unions... # ''' # We will use the polygon class of Shapely class STObject: '''Class for spatio-temporal object i.e. with temporal and spatial existence (time interval and bounding polygon for positions (e.g. rectangle)). It does not mean that the object is defined for all time instants within the time interval''' def __init__(self, num = None, timeInterval = None, boundingPolygon = None): self.num = num self.timeInterval = timeInterval self.boundingPolygon = boundingPolygon def empty(self): return self.timeInterval.empty() or not self.boudingPolygon def getFirstInstant(self): return self.timeInterval.first def getLastInstant(self): return self.timeInterval.last def getTimeInterval(self): return self.timeInterval def commonTimeInterval(self, obj2): return self.getTimeInterval().intersection(obj2.getTimeInterval()) class Point: def __init__(self, x, y): self.x = x self.y = y def __str__(self): return '(%f,%f)'%(self.x,self.y) def __repr__(self): return str(self) def __sub__(self, other): return Point(self.x-other.x, self.y-other.y) def draw(self, options = ''): from matplotlib.pylab import plot plot([self.x], [self.y], 'x'+options) def norm2Squared(self): '''2-norm distance (Euclidean distance)''' return self.x*self.x+self.y*self.y def norm2(self): '2-norm distance (Euclidean distance)' return sqrt(self.norm2Squared()) def distanceNorm2(p1, p2): return (p1-p2).norm2() def aslist(self): return [self.x, self.y] class Trajectory: '''Class for trajectories i.e. a temporal sequence of positions the class is iterable.''' def __init__(self, positions): self.positions = positions @staticmethod def load(line1, line2): return Trajectory([[float(n) for n in line1.split(' ')], [float(n) for n in line2.split(' ')]]) def __str__(self): return ' '.join([self.__getitem__(i).__str__() for i in xrange(self.length())]) def __getitem__(self, i): return Point(self.positions[0][i], self.positions[1][i]) def __iter__(self): self.iterInstantNum = 0 return self def next(self): if self.iterInstantNum >= self.length(): raise StopIteration else: self.iterInstantNum += 1 return self[self.iterInstantNum-1] def addPosition(self, p): if not self.positions: self.positions = [[p.x],[p.y]] else: self.positions[0].append(p.x) self.positions[1].append(p.y) def draw(self, options = ''): from matplotlib.pylab import plot plot(self.positions[0], self.positions[1], options) def length(self): return len(self.positions[0]) def getXCoordinates(self): return self.positions[0] def getYCoordinates(self): return self.positions[1] def xBounds(self): # look for function that does min and max in one pass return [min(self.getXCoordinates()), max(self.getXCoordinates())] def yBounds(self): # look for function that does min and max in one pass return [min(self.getYCoordinates()), max(self.getYCoordinates())] def add(self, traj2): '''Returns a new trajectory of the same length''' if self.length() != traj2.length(): print 'Trajectories of different lengths' return None else: return Trajectory([[a+b for a,b in zip(self.getXCoordinates(),traj2.getXCoordinates())], [a+b for a,b in zip(self.getYCoordinates(),traj2.getYCoordinates())]]) def subtract(self, traj2): '''Returns a new trajectory of the same length''' if self.length() != traj2.length(): print 'Trajectories of different lengths' return None else: return Trajectory([[a-b for a,b in zip(self.getXCoordinates(),traj2.getXCoordinates())], [a-b for a,b in zip(self.getYCoordinates(),traj2.getYCoordinates())]]) def norm(self): '''Returns the list of the norms at each instant''' # def add(x, y): return x+y # sq = map(add, [x*x for x in self.positions[0]], [y*y for y in self.positions[1]]) # return sqrt(sq) return [hypot(x,y) for x,y in zip(self.positions[0], self.positions[1])] def getTrajectoryInInterval(self, inter): if inter.first >=0 and inter.last<= self.length(): return Trajectory([self.positions[0][inter.first:inter.last], self.positions[1][inter.first:inter.last]]) else: return None def getTrajectoryInPolygon(self, polygon): 'Returns the set of points inside the polygon' # use shapely polygon contains pass class MovingObject(STObject): '''Class for moving objects i.e. with a trajectory and a geometry (volume) and a type (e.g. road user) ''' def __init__(self, num = None, timeInterval = None, positions = None, geometry = None, type = None): STObject.__init__(self, num, timeInterval) self.positions = positions self.geometry = geometry self.type = type # compute bounding polygon from trajectory def getObjectInTimeInterval(self, inter): '''Returns a new object extracted from self, existing at time interval inter''' if inter.inside(self.timeInterval): inter = TimeInterval(inter.first-self.getFirstInstant(), inter.last-self.getFirstInstant()) obj = MovingObject(self.num, inter, self.positions.getTrajectoryInInterval(inter), self.geometry, self.type) if self.velocities: obj.velocities = self.velocities.getTrajectoryInInterval(inter) return obj else: print 'The object does not exist at '+str(inter) return None def length(self): return self.timeInterval.length() def getPositions(self): return self.positions def getVelocities(self): return self.velocities def getPositionAt(self, i): return self.positions[i] def getVelocityAt(self, i): return self.velocities[i] def getXCoordinates(self): return self.positions.getXCoordinates() def getYCoordinates(self): return self.positions.getYCoordinates() def draw(self, options = ''): self.positions.draw(options) def getInstantPassingLane(self, p1, p2): '''Returns the instant(s) the object passes from one side of the segment to the other empty list if not''' instants = [] for i in xrange(self.length()-1): p = utils.segmentIntersection(self.positions[i], self.positions[i+1], p1, p2) if p: if self.positions[i].x != self.positions[i+1].x: ratio = (p.x-self.positions[i].x)/(self.positions[i+1].x-self.positions[i].x) elif self.positions[i].y != self.positions[i+1].y: ratio = (p.y-self.positions[i].y)/(self.positions[i+1].y-self.positions[i].y) else: ratio = 0 instants.append(self.timeInterval[i]*(1-ratio)+ratio*self.timeInterval[i+1]) return instants # def computeVelocities(self): # need for a class representing the indicators, their units, how to print them in graphs... class TemporalIndicator: '''Class for temporal indicators i.e. indicators that take a value at specific instants it should have more information like name, unit''' def __init__(self, name, values = {}): self.name = name self.values = values if __name__ == "__main__": import doctest import unittest suite = doctest.DocFileSuite('tests/moving.txt') #suite = doctest.DocTestSuite() unittest.TextTestRunner().run(suite) #doctest.testmod() #doctest.testfile("example.txt")