Mercurial Hosting > traffic-intelligence
view python/moving.py @ 208:d9855499fc88
Added functions to write trajectories through sqlite
author | Francois Belisle <belisle.francois@gmail.com> |
---|---|
date | Tue, 05 Jun 2012 13:12:19 -0400 |
parents | e2f31813ade6 |
children | ada6e8fbe4c6 |
line wrap: on
line source
#! /usr/bin/env python '''Libraries for moving objects, trajectories...''' import utils; import cvutils; from math import sqrt, hypot; #from shapely.geometry import Polygon __metaclass__ = type class Interval: '''Generic Interval''' def __init__(self, first=0, last=-1, revert = False): 'Warning, do not revert if last<first, it contradicts the definition of empty' if revert and last<first: self.first=last self.last=first else: self.first=first self.last=last def __str__(self): return '[{0}, {1}]'.format(self.first, self.last) def __repr__(self): return self.__str__() def empty(self): return self.first > self.last def length(self): '''Returns the length of the interval''' return float(max(0,self.last-self.first)) def equal(self, i2): return self.first==i2.first and self.last == i2.last def getList(self): return [self.first, self.last] def contains(self, instant): return (self.first<=instant and self.last>=instant) def inside(self, interval2): 'indicates if the temporal interval of self is comprised in interval2' return (self.first >= interval2.first) and (self.last <= interval2.last) def union(self, interval2): '''Largest interval comprising self and interval2''' return TimeInterval(min(self.first, interval2.first), max(self.last, interval2.last)) def intersection(self, interval2): '''Largest interval comprised in both self and interval2''' return TimeInterval(max(self.first, interval2.first), min(self.last, interval2.last)) class TimeInterval(Interval): '''Temporal interval based on frame numbers (hence the modified length method) may be modified directly by setting first and last''' def __init__(self, first=0, last=-1): Interval.__init__(self, first, last, False) def __getitem__(self, i): if not self.empty(): return self.first+i def __iter__(self): self.iterInstantNum = -1 return self def next(self): if self.iterInstantNum >= self.length()-1: raise StopIteration else: self.iterInstantNum += 1 return self[self.iterInstantNum] def length(self): '''Returns the length of the interval''' return float(max(0,self.last-self.first+1)) # class BoundingPolygon: # '''Class for a polygon bounding a set of points # with methods to create intersection, unions... # ''' # We will use the polygon class of Shapely class STObject: '''Class for spatio-temporal object i.e. with temporal and spatial existence (time interval and bounding polygon for positions (e.g. rectangle)). It does not mean that the object is defined for all time instants within the time interval''' def __init__(self, num = None, timeInterval = None, boundingPolygon = None): self.num = num self.timeInterval = timeInterval self.boundingPolygon = boundingPolygon def empty(self): return self.timeInterval.empty() or not self.boudingPolygon def getFirstInstant(self): return self.timeInterval.first def getLastInstant(self): return self.timeInterval.last def getTimeInterval(self): return self.timeInterval def existsAtInstant(self, t): return self.timeInterval.contains(t) def commonTimeInterval(self, obj2): return self.getTimeInterval().intersection(obj2.getTimeInterval()) class Point: def __init__(self, x, y): self.x = x self.y = y def __str__(self): return '(%f,%f)'%(self.x,self.y) def __repr__(self): return self.__str__() def __add__(self, other): return Point(self.x+other.x, self.y+other.y) def __sub__(self, other): return Point(self.x-other.x, self.y-other.y) def multiply(self, alpha): return Point(self.x*alpha, self.y*alpha) def draw(self, options = 'o'): from matplotlib.pylab import plot plot([self.x], [self.y], options) def norm2Squared(self): '''2-norm distance (Euclidean distance)''' return self.x*self.x+self.y*self.y def norm2(self): '2-norm distance (Euclidean distance)' return sqrt(self.norm2Squared()) def aslist(self): return [self.x, self.y] def astuple(self): return (self.x, self.y) def project(self, homography): from numpy.core.multiarray import array projected = cvutils.projectArray(homography, array([[self.x], [self.y]])) return Point(projected[0], projected[1]) def inPolygon(self, poly): '''Returns if the point x, y is inside the polygon. The polygon is defined by the ordered list of points in poly taken from http://www.ariel.com.au/a/python-point-int-poly.html Use points_inside_poly from matplotlib.nxutils''' n = len(poly); counter = 0; p1 = poly[0]; for i in range(n+1): p2 = poly[i % n]; if self.y > min(p1.y,p2.y): if self.y <= max(p1.y,p2.y): if self.x <= max(p1.x,p2.x): if p1.y != p2.y: xinters = (self.y-p1.y)*(p2.x-p1.x)/(p2.y-p1.y)+p1.x; if p1.x == p2.x or self.x <= xinters: counter+=1; p1=p2 return (counter%2 == 1); @staticmethod def dot(p1, p2): 'Scalar product' return p1.x*p2.x+p1.y*p2.y @staticmethod def cross(p1, p2): 'Cross product' return p1.x*p2.y-p1.y*p2.x @staticmethod def distanceNorm2(p1, p2): return (p1-p2).norm2() @staticmethod def plotAll(points, color='r'): from matplotlib.pyplot import scatter scatter([p.x for p in points],[p.y for p in points], c=color) class FlowVector: '''Class to represent 4-D flow vectors, ie a position and a velocity''' def __init__(self, position, velocity): 'position and velocity should be Point instances' self.position = position self.velocity = velocity def __add__(self, other): return FlowVector(self.position+other.position, self.velocity+other.velocity) def multiply(self, alpha): return FlowVector(self.position.multiply(alpha), self.velocity.multiply(alpha)) def draw(self, options = ''): from matplotlib.pylab import plot plot([self.position.x, self.position.x+self.velocity.x], [self.position.y, self.position.y+self.velocity.y], options) self.position.draw(options+'x') @staticmethod def similar(f1, f2, maxDistance2, maxDeltavelocity2): return (f1.position-f2.position).norm2Squared()<maxDistance2 and (f1.velocity-f2.velocity).norm2Squared()<maxDeltavelocity2 def segmentIntersection(p1, p2, p3, p4): '''Returns the intersecting point of the segments [p1, p2] and [p3, p4], None otherwise''' from numpy import matrix from numpy.linalg import linalg, det dp1 = p2-p1#[s1[0][1]-s1[0][0], s1[1][1]-s1[1][0]] dp2 = p4-p3#[s2[0][1]-s2[0][0], s2[1][1]-s2[1][0]] A = matrix([[dp1.y, -dp1.x], [dp2.y, -dp2.x]]) B = matrix([[dp1.y*p1.x-dp1.x*p1.y], [dp2.y*p3.x-dp2.x*p3.y]]) if linalg.det(A) == 0:#crossProduct(ds1, ds2) == 0: return None else: intersection = linalg.solve(A,B) if (utils.inBetween(p1.x, p2.x, intersection[0,0]) and utils.inBetween(p3.x, p4.x, intersection[0,0]) and utils.inBetween(p1.y, p2.y, intersection[1,0]) and utils.inBetween(p3.y, p4.y, intersection[1,0])): return Point(intersection[0,0], intersection[1,0]) else: return None class Trajectory: '''Class for trajectories i.e. a temporal sequence of positions the class is iterable.''' def __init__(self, positions=None): if positions != None: self.positions = positions else: self.positions = [[],[]] @staticmethod def load(line1, line2): return Trajectory([[float(n) for n in line1.split(' ')], [float(n) for n in line2.split(' ')]]) def __str__(self): return ' '.join([self.__getitem__(i).__str__() for i in xrange(self.length())]) def __repr__(self): return str(self) def __getitem__(self, i): return Point(self.positions[0][i], self.positions[1][i]) def __iter__(self): self.iterInstantNum = 0 return self def next(self): if self.iterInstantNum >= self.length(): raise StopIteration else: self.iterInstantNum += 1 return self[self.iterInstantNum-1] def length(self): return len(self.positions[0]) def addPositionXY(self, x, y): self.positions[0].append(x) self.positions[1].append(y) def addPosition(self, p): self.addPositionXY(p.x, p.y) @staticmethod def _draw(positions, options = '', withOrigin = False, lastCoordinate = None): from matplotlib.pylab import plot if lastCoordinate == None: plot(positions[0], positions[1], options) elif 0 <= lastCoordinate <= len(positions[0]): plot(positions[0][:lastCoordinate], positions[1][:lastCoordinate], options) if withOrigin: plot([positions[0][0]], [positions[1][0]], 'ro') def project(self, homography): from numpy.core.multiarray import array projected = cvutils.projectArray(homography, array(self.positions)) return Trajectory(projected) def draw(self, options = '', withOrigin = False): Trajectory._draw(self.positions, options, withOrigin) def drawAt(self, lastCoordinate, options = '', withOrigin = False): Trajectory._draw(self.positions, options, withOrigin, lastCoordinate) def drawOnWorldImage(self, nPixelsPerUnitDistance, imageHeight, options = '', withOrigin = False): from matplotlib.pylab import plot imgPositions = [[x*nPixelsPerUnitDistance for x in self.positions[0]], [-x*nPixelsPerUnitDistance+imageHeight for x in self.positions[1]]] Trajectory._draw(imgPositions, options, withOrigin) def getXCoordinates(self): return self.positions[0] def getYCoordinates(self): return self.positions[1] def asArray(self): from numpy.core.multiarray import array return array(self.positions) def xBounds(self): # look for function that does min and max in one pass return [min(self.getXCoordinates()), max(self.getXCoordinates())] def yBounds(self): # look for function that does min and max in one pass return [min(self.getYCoordinates()), max(self.getYCoordinates())] def add(self, traj2): '''Returns a new trajectory of the same length''' if self.length() != traj2.length(): print 'Trajectories of different lengths' return None else: return Trajectory([[a+b for a,b in zip(self.getXCoordinates(),traj2.getXCoordinates())], [a+b for a,b in zip(self.getYCoordinates(),traj2.getYCoordinates())]]) def subtract(self, traj2): '''Returns a new trajectory of the same length''' if self.length() != traj2.length(): print 'Trajectories of different lengths' return None else: return Trajectory([[a-b for a,b in zip(self.getXCoordinates(),traj2.getXCoordinates())], [a-b for a,b in zip(self.getYCoordinates(),traj2.getYCoordinates())]]) def norm(self): '''Returns the list of the norms at each instant''' # def add(x, y): return x+y # sq = map(add, [x*x for x in self.positions[0]], [y*y for y in self.positions[1]]) # return sqrt(sq) return [hypot(x,y) for x,y in zip(self.positions[0], self.positions[1])] def cumulatedDisplacement(self): 'Returns the sum of the distances between each successive point' displacement = 0 for i in xrange(self.length()-1): displacement += Point.distanceNorm2(self.__getitem__(i),self.__getitem__(i+1)) return displacement def wiggliness(self): return self.cumulatedDisplacement()/float(Point.distanceNorm2(self.__getitem__(0),self.__getitem__(self.length()-1))) def getIntersections(self, p1, p2): '''Returns a list of the indices at which the trajectory intersects with the segment of extremities p1 and p2 the list is empty if there is no crossing''' indices = [] for i in xrange(self.length()-1): q1=self.__getitem__(i) q2=self.__getitem__(i+1) p = utils.segmentIntersection(q1, q2, p1, p2) if p: if q1.x != q2.x: ratio = (p.x-q1.x)/(q2.x-q1.x) elif q1.y != q2.y: ratio = (p.y-q1.y)/(q2.y-q1.y) else: ratio = 0 indices.append(i+ratio) return indices def getTrajectoryInInterval(self, inter): if inter.first >=0 and inter.last<= self.length(): return Trajectory([self.positions[0][inter.first:inter.last], self.positions[1][inter.first:inter.last]]) else: return None def getTrajectoryInPolygon(self, polygon): '''Returns the set of points inside the polygon (array of Nx2 coordinates of the polygon vertices)''' import matplotlib.nxutils as nx traj = Trajectory() result = nx.points_inside_poly(self.asArray().T, polygon) for i in xrange(self.length()): if result[i]: traj.addPositionXY(self.positions[0][i], self.positions[1][i]) return traj # version 2: use shapely polygon contains ################## # Moving Objects ################## userTypeNames = ['car', 'pedestrian', 'twowheels', 'bus', 'truck'] userType2Num = utils.inverseEnumeration(userTypeNames) class MovingObject(STObject): '''Class for moving objects i.e. with a trajectory and a geometry (volume) (constant) and a usertype (e.g. road user) ''' def __init__(self, num = None, timeInterval = None, positions = None, geometry = None, userType = None): STObject.__init__(self, num, timeInterval) self.positions = positions self.geometry = geometry self.userType = userType # compute bounding polygon from trajectory def getObjectInTimeInterval(self, inter): '''Returns a new object extracted from self, restricted to time interval inter''' intersection = inter.intersection(self.getTimeInterval()) if not intersection.empty(): trajectoryInterval = TimeInterval(intersection.first-self.getFirstInstant(), intersection.last-self.getFirstInstant()) obj = MovingObject(self.num, intersection, self.positions.getTrajectoryInInterval(trajectoryInterval), self.geometry, self.userType) if self.velocities: obj.velocities = self.velocities.getTrajectoryInInterval(trajectoryInterval) return obj else: print 'The object does not exist at '+str(inter) return None def length(self): return self.timeInterval.length() def getPositions(self): return self.positions def getVelocities(self): return self.velocities def getSpeeds(self): return self.getVelocities().norm() def getPositionAt(self, i): return self.positions[i] def getVelocityAt(self, i): return self.velocities[i] def getPositionAtInstant(self, i): return self.positions[i-self.getFirstInstant()] def getVelocityAtInstant(self, i): return self.velocities[i-self.getFirstInstant()] def getXCoordinates(self): return self.positions.getXCoordinates() def getYCoordinates(self): return self.positions.getYCoordinates() def draw(self, options = '', withOrigin = False): self.positions.draw(options, withOrigin) def drawWorldOnImage(self, nPixelsPerUnitDistance, imageHeight, options = '', withOrigin = False): self.positions.drawWorldOnImage(nPixelsPerUnitDistance, imageHeight, options, withOrigin) def getInstantsCrossingLane(self, p1, p2): '''Returns the instant(s) at which the object passes from one side of the segment to the other empty list if there is no crossing''' indices = self.positions.getIntersections(p1, p2) return [t+self.getFirstInstant() for t in indices] @staticmethod def collisionCourseDotProduct(movingObject1, movingObject2, instant): 'A positive result indicates that the road users are getting closer' deltap = movingObject1.getPositionAtInstant(instant)-movingObject2.getPositionAtInstant(instant) deltav = movingObject2.getVelocityAtInstant(instant)-movingObject1.getVelocityAtInstant(instant) return Point.dot(deltap, deltav) @staticmethod def collisionCourseCosine(movingObject1, movingObject2, instant): 'A positive result indicates that the road users are getting closer' deltap = movingObject1.getPositionAtInstant(instant)-movingObject2.getPositionAtInstant(instant) deltav = movingObject2.getVelocityAtInstant(instant)-movingObject1.getVelocityAtInstant(instant) return Point.dot(deltap, deltav)/(deltap.norm2()*deltav.norm2()) def plotRoadUsers(objects, colors): '''Colors is a PlottingPropertyValues instance''' from matplotlib.pyplot import figure, axis figure() for obj in objects: obj.draw(colors.get(obj.userType)) axis('equal') # need for a class representing the indicators, their units, how to print them in graphs... class TemporalIndicator: '''Class for temporal indicators i.e. indicators that take a value at specific instants values should be * a dict, for the values at specific time instants * or a list with a time interval object if continuous measurements it should have more information like name, unit''' def __init__(self, name, values, timeInterval=None): self.name = name self.isCosine = name.find('Cosine') self.values = values self.timeInterval = timeInterval if timeInterval: assert len(values) == timeInterval.length() def empty(self): return len(self.values) == 0 def __getitem__(self, i): if self.timeInterval: if self.timeInterval.contains(i): return self.values[i-self.timeInterval.first] else: if i in self.values.keys(): return self.values[i] return None # default def __iter__(self): self.iterInstantNum = 0 # index in the interval or keys of the dict return self def next(self): if self.iterInstantNum >= len(self.values):#(self.timeInterval and self.iterInstantNum>=self.timeInterval.length())\ # or (self.iterInstantNum >= self.values) raise StopIteration else: self.iterInstantNum += 1 if self.timeInterval: return self.values[self.iterInstantNum-1] else: return self.values.values()[self.iterInstantNum-1] def getTimeInterval(self): if not self.timeInterval and type(self.values)==dict: instants = self.values.keys() if instants: self.timeInterval = TimeInterval(instants[0], instants[-1]) else: self.timeInterval = TimeInterval() return self.timeInterval def getValues(self): if self.timeInterval: return self.values else: return self.values.values() def getAngleValues(self): '''if the indicator is a function of an angle, transform it to an angle (eg cos) (no transformation otherwise)''' from numpy import arccos values = self.getValues() if self.isCosine >= 0: return [arccos(c) for c in values] else: return values class SeverityIndicator(TemporalIndicator): '''Class for severity indicators field mostSevereIsMax is True if the most severe value taken by the indicator is the maximum''' def __init__(self, name, values, timeInterval=None, mostSevereIsMax=True, ignoredValue = None): TemporalIndicator.__init__(self, name, values, timeInterval) self.mostSevereIsMax = mostSevereIsMax self.ignoredValue = ignoredValue def getMostSevereValue(self, minNInstants=1): from matplotlib.mlab import find from numpy.core.multiarray import array from numpy.core.fromnumeric import mean values = array(self.values.values()) if self.ignoredValue: indices = find(values != self.ignoredValue) else: indices = range(len(values)) if len(indices) >= minNInstants: values = sorted(values[indices], reverse = self.mostSevereIsMax) # inverted if most severe is max -> take the first values return mean(values[:minNInstants]) else: return None def indicatorMap(indicatorValues, trajectory, squareSize): '''Returns a dictionary with keys for the indices of the cells (squares) in which the trajectory positions are located at which the indicator values are attached ex: speeds and trajectory''' from numpy import floor, mean assert len(indicatorValues) == trajectory.length() indicatorMap = {} for k in xrange(trajectory.length()): p = trajectory[k] i = floor(p.x/squareSize) j = floor(p.y/squareSize) if indicatorMap.has_key((i,j)): indicatorMap[(i,j)].append(indicatorValues[k]) else: indicatorMap[(i,j)] = [indicatorValues[k]] for k in indicatorMap.keys(): indicatorMap[k] = mean(indicatorMap[k]) return indicatorMap def indicatorMapFromPolygon(value, polygon, squareSize): '''Fills an indicator map with the value within the polygon (array of Nx2 coordinates of the polygon vertices)''' import matplotlib.nxutils as nx from numpy.core.multiarray import array, arange from numpy import floor points = [] for x in arange(min(polygon[:,0])+squareSize/2, max(polygon[:,0]), squareSize): for y in arange(min(polygon[:,1])+squareSize/2, max(polygon[:,1]), squareSize): points.append([x,y]) inside = nx.points_inside_poly(array(points), polygon) indicatorMap = {} for i in xrange(len(inside)): if inside[i]: indicatorMap[(floor(points[i][0]/squareSize), floor(points[i][1]/squareSize))] = 0 return indicatorMap def indicatorMapFromAxis(value, limits, squareSize): '''axis = [xmin, xmax, ymin, ymax] ''' from numpy.core.multiarray import arange from numpy import floor indicatorMap = {} for x in arange(limits[0], limits[1], squareSize): for y in arange(limits[2], limits[3], squareSize): indicatorMap[(floor(x/squareSize), floor(y/squareSize))] = value return indicatorMap def combineIndicatorMaps(maps, squareSize, combinationFunction): '''Puts many indicator maps together (averaging the values in each cell if more than one maps has a value)''' #from numpy import mean indicatorMap = {} for m in maps: for k,v in m.iteritems(): if indicatorMap.has_key(k): indicatorMap[k].append(v) else: indicatorMap[k] = [v] for k in indicatorMap.keys(): indicatorMap[k] = combinationFunction(indicatorMap[k]) return indicatorMap if __name__ == "__main__": import doctest import unittest suite = doctest.DocFileSuite('tests/moving.txt') #suite = doctest.DocTestSuite() unittest.TextTestRunner().run(suite) #doctest.testmod() #doctest.testfile("example.txt")