Mercurial Hosting > traffic-intelligence
view python/moving.py @ 399:c389fae9689a
Added a class to read list of image instead of video. This is controlled by the use of the database-filename and folder-data parameters in the config file.
author | Jean-Philippe Jodoin <jpjodoin@gmail.com> |
---|---|
date | Mon, 29 Jul 2013 17:12:45 -0400 |
parents | 0ce2210790b1 |
children | 31604ef1cad4 |
line wrap: on
line source
#! /usr/bin/env python '''Libraries for moving objects, trajectories...''' import utils import cvutils from math import sqrt from numpy import median try: from shapely.geometry import Polygon, Point as shapelyPoint from shapely.prepared import prep shapelyAvailable = True except ImportError: print('Shapely library could not be loaded') shapelyAvailable = False __metaclass__ = type class Interval(object): '''Generic interval: a subset of real numbers (not iterable)''' def __init__(self, first=0, last=-1, revert = False): if revert and last<first: self.first=last self.last=first else: self.first=first self.last=last def __str__(self): return '[{0}, {1}]'.format(self.first, self.last) def __repr__(self): return self.__str__() def empty(self): return self.first > self.last def center(self): return (self.first+self.last)/2. def length(self): '''Returns the length of the interval''' return float(max(0,self.last-self.first)) def equal(self, i2): return self.first==i2.first and self.last == i2.last def getList(self): return [self.first, self.last] def contains(self, instant): return (self.first<=instant and self.last>=instant) def inside(self, interval2): '''Indicates if the temporal interval of self is comprised in interval2''' return (self.first >= interval2.first) and (self.last <= interval2.last) @classmethod def union(cls, interval1, interval2): '''Smallest interval comprising self and interval2''' return cls(min(interval1.first, interval2.first), max(interval2.last, interval2.last)) @classmethod def intersection(cls, interval1, interval2): '''Largest interval comprised in both self and interval2''' return cls(max(interval1.first, interval2.first), min(interval1.last, interval2.last)) def distance(self, interval2): if not Interval.intersection(self, interval2).empty(): return 0 elif self.first > interval2.last: return self.first - interval2.last elif self.last < interval2.first: return interval2.first - self.last else: return None def unionIntervals(intervals): 'returns the smallest interval containing all intervals' inter = intervals[0] for i in intervals[1:]: inter = Interval.union(inter, i) return inter class TimeInterval(Interval): '''Temporal interval: set of instants at fixed time step, between first and last, included For example: based on frame numbers (hence the modified length method) It may be modified directly by setting first and last''' def __init__(self, first=0, last=-1): super(TimeInterval, self).__init__(first, last, False) @staticmethod def fromInterval(inter): return TimeInterval(inter.first, inter.last) def __getitem__(self, i): if not self.empty(): return self.first+i def __iter__(self): self.iterInstantNum = -1 return self def next(self): if self.iterInstantNum >= self.length()-1: raise StopIteration else: self.iterInstantNum += 1 return self[self.iterInstantNum] def length(self): '''Returns the length of the interval''' return float(max(0,self.last-self.first+1)) # class BoundingPolygon: # '''Class for a polygon bounding a set of points # with methods to create intersection, unions... # ''' # We will use the polygon class of Shapely class STObject(object): '''Class for spatio-temporal object, i.e. with temporal and spatial existence (time interval and bounding polygon for positions (e.g. rectangle)). It may not mean that the object is defined for all time instants within the time interval''' def __init__(self, num = None, timeInterval = None, boundingPolygon = None): self.num = num self.timeInterval = timeInterval self.boundingPolygon = boundingPolygon def empty(self): return self.timeInterval.empty() or not self.boudingPolygon def getNum(self): return self.num def getFirstInstant(self): return self.timeInterval.first def getLastInstant(self): return self.timeInterval.last def getTimeInterval(self): return self.timeInterval def existsAtInstant(self, t): return self.timeInterval.contains(t) def commonTimeInterval(self, obj2): return TimeInterval.intersection(self.getTimeInterval(), obj2.getTimeInterval()) class Point(object): def __init__(self, x, y): self.x = x self.y = y def __str__(self): return '(%f,%f)'%(self.x,self.y) def __repr__(self): return self.__str__() def __add__(self, other): return Point(self.x+other.x, self.y+other.y) def __sub__(self, other): return Point(self.x-other.x, self.y-other.y) def multiply(self, alpha): return Point(self.x*alpha, self.y*alpha) def draw(self, options = 'o', **kwargs): from matplotlib.pylab import plot plot([self.x], [self.y], options, **kwargs) def norm2Squared(self): '''2-norm distance (Euclidean distance)''' return self.x*self.x+self.y*self.y def norm2(self): '''2-norm distance (Euclidean distance)''' return sqrt(self.norm2Squared()) def norm1(self): return abs(self.x)+abs(self.y) def normMax(self): return max(abs(self.x),abs(self.y)) def aslist(self): return [self.x, self.y] def astuple(self): return (self.x, self.y) def asint(self): return Point(int(self.x), int(self.y)) if shapelyAvailable: def asShapely(self): return shapelyPoint(self.x, self.y) def project(self, homography): from numpy.core.multiarray import array projected = cvutils.projectArray(homography, array([[self.x], [self.y]])) return Point(projected[0], projected[1]) def inPolygonNoShapely(self, polygon): '''Indicates if the point x, y is inside the polygon (array of Nx2 coordinates of the polygon vertices) taken from http://www.ariel.com.au/a/python-point-int-poly.html Use Polygon.contains if Shapely is installed''' n = polygon.shape[0]; counter = 0; p1 = polygon[0,:]; for i in range(n+1): p2 = polygon[i % n,:]; if self.y > min(p1[1],p2[1]): if self.y <= max(p1[1],p2[1]): if self.x <= max(p1[0],p2[0]): if p1[1] != p2[1]: xinters = (self.y-p1[1])*(p2[0]-p1[0])/(p2[1]-p1[1])+p1[0]; if p1[0] == p2[0] or self.x <= xinters: counter+=1; p1=p2 return (counter%2 == 1); @staticmethod def dot(p1, p2): 'Scalar product' return p1.x*p2.x+p1.y*p2.y @staticmethod def cross(p1, p2): 'Cross product' return p1.x*p2.y-p1.y*p2.x @staticmethod def cosine(p1, p2): return Point.dot(p1,p2)/(p1.norm2()*p2.norm2()) @staticmethod def distanceNorm2(p1, p2): return (p1-p2).norm2() @staticmethod def plotAll(points, **kwargs): from matplotlib.pyplot import scatter scatter([p.x for p in points],[p.y for p in points], **kwargs) def similarOrientation(self, refDirection, cosineThreshold): 'Indicates whether the cosine of the vector and refDirection is smaller than cosineThreshold' return Point.cosine(self, refDirection) >= cosineThreshold if shapelyAvailable: def pointsInPolygon(points, polygon): '''Optimized tests of a series of points within (Shapely) polygon ''' prepared_polygon = prep(polygon) return filter(prepared_polygon.contains, points) class NormAngle(object): '''Alternate encoding of a point, by its norm and orientation''' def __init__(self, norm, angle): self.norm = norm self.angle = angle @staticmethod def fromPoint(p): from math import atan2 norm = p.norm2() if norm > 0: angle = atan2(p.y, p.x) else: angle = 0. return NormAngle(norm, angle) def __add__(self, other): 'a norm cannot become negative' return NormAngle(max(self.norm+other.norm, 0), self.angle+other.angle) def getPoint(self): from math import cos, sin return Point(self.norm*cos(self.angle), self.norm*sin(self.angle)) def predictPositionNoLimit(nTimeSteps, initialPosition, initialVelocity, initialAcceleration = Point(0,0)): '''Predicts the position in nTimeSteps at constant speed/acceleration''' return initialVelocity + initialAcceleration.multiply(nTimeSteps),initialPosition+initialVelocity.multiply(nTimeSteps) + initialAcceleration.multiply(nTimeSteps**2*0.5) def predictPosition(position, speedOrientation, control, maxSpeed = None): '''Predicts the position (moving.Point) at the next time step with given control input (deltaSpeed, deltaTheta) speedOrientation is the other encoding of velocity, (speed, orientation) speedOrientation and control are NormAngle''' predictedSpeedTheta = speedOrientation+control if maxSpeed: predictedSpeedTheta.norm = min(predictedSpeedTheta.norm, maxSpeed) predictedPosition = position+predictedSpeedTheta.getPoint() return predictedPosition, predictedSpeedTheta class FlowVector(object): '''Class to represent 4-D flow vectors, ie a position and a velocity''' def __init__(self, position, velocity): 'position and velocity should be Point instances' self.position = position self.velocity = velocity def __add__(self, other): return FlowVector(self.position+other.position, self.velocity+other.velocity) def multiply(self, alpha): return FlowVector(self.position.multiply(alpha), self.velocity.multiply(alpha)) def draw(self, options = '', **kwargs): from matplotlib.pylab import plot plot([self.position.x, self.position.x+self.velocity.x], [self.position.y, self.position.y+self.velocity.y], options, **kwargs) self.position.draw(options+'x', **kwargs) @staticmethod def similar(f1, f2, maxDistance2, maxDeltavelocity2): return (f1.position-f2.position).norm2Squared()<maxDistance2 and (f1.velocity-f2.velocity).norm2Squared()<maxDeltavelocity2 def intersection(p1, p2, dp1, dp2): '''Returns the intersection point between the two lines defined by the respective vectors (dp) and origin points (p)''' from numpy import matrix from numpy.linalg import linalg A = matrix([[dp1.y, -dp1.x], [dp2.y, -dp2.x]]) B = matrix([[dp1.y*p1.x-dp1.x*p1.y], [dp2.y*p2.x-dp2.x*p2.y]]) if linalg.det(A) == 0: return None else: intersection = linalg.solve(A,B) return Point(intersection[0,0], intersection[1,0]) def segmentIntersection(p1, p2, p3, p4): '''Returns the intersecting point of the segments [p1, p2] and [p3, p4], None otherwise''' if (Interval.intersection(Interval(p1.x,p2.x,True), Interval(p3.x,p4.x,True)).empty()) or (Interval.intersection(Interval(p1.y,p2.y,True), Interval(p3.y,p4.y,True)).empty()): return None else: dp1 = p2-p1 dp3 = p4-p3 inter = intersection(p1, p3, dp1, dp3) if (inter != None and utils.inBetween(p1.x, p2.x, inter.x) and utils.inBetween(p3.x, p4.x, inter.x) and utils.inBetween(p1.y, p2.y, inter.y) and utils.inBetween(p3.y, p4.y, inter.y)): return inter else: return None # TODO: implement a better algorithm for intersections of sets of segments http://en.wikipedia.org/wiki/Line_segment_intersection class Trajectory(object): '''Class for trajectories: temporal sequence of positions The class is iterable''' def __init__(self, positions=None): if positions != None: self.positions = positions else: self.positions = [[],[]] @staticmethod def load(line1, line2): return Trajectory([[float(n) for n in line1.split(' ')], [float(n) for n in line2.split(' ')]]) @staticmethod def fromPointList(points): t = Trajectory() for p in points: t.addPosition(p) return t def __len__(self): return len(self.positions[0]) def length(self): return self.__len__() def __str__(self): return ' '.join([self.__getitem__(i).__str__() for i in xrange(self.length())]) def __repr__(self): return str(self) def __getitem__(self, i): return Point(self.positions[0][i], self.positions[1][i]) def __iter__(self): self.iterInstantNum = 0 return self def next(self): if self.iterInstantNum >= self.length(): raise StopIteration else: self.iterInstantNum += 1 return self[self.iterInstantNum-1] def setPositionXY(self, i, x, y): if i < self.__len__(): self.positions[0][i] = x self.positions[1][i] = y def setPosition(self, i, p): self.setPositionXY(i, p.x, p.y) def addPositionXY(self, x, y): self.positions[0].append(x) self.positions[1].append(y) def addPosition(self, p): self.addPositionXY(p.x, p.y) def duplicateLastPosition(self): self.positions[0].append(self.positions[0][-1]) self.positions[1].append(self.positions[1][-1]) @staticmethod def _draw(positions, options = '', withOrigin = False, lastCoordinate = None, timeStep = 1, **kwargs): from matplotlib.pylab import plot if lastCoordinate == None: plot(positions[0][::timeStep], positions[1][::timeStep], options, **kwargs) elif 0 <= lastCoordinate <= len(positions[0]): plot(positions[0][:lastCoordinate:timeStep], positions[1][:lastCoordinate:timeStep], options, **kwargs) if withOrigin: plot([positions[0][0]], [positions[1][0]], 'ro', **kwargs) def project(self, homography): return Trajectory(cvutils.projectTrajectory(homography, self.positions)) def draw(self, options = '', withOrigin = False, timeStep = 1, **kwargs): Trajectory._draw(self.positions, options, withOrigin, None, timeStep, **kwargs) def drawAt(self, lastCoordinate, options = '', withOrigin = False, timeStep = 1, **kwargs): Trajectory._draw(self.positions, options, withOrigin, lastCoordinate, timeStep, **kwargs) def drawOnWorldImage(self, nPixelsPerUnitDistance, imageHeight, options = '', withOrigin = False, timeStep = 1, **kwargs): from matplotlib.pylab import plot imgPositions = [[x*nPixelsPerUnitDistance for x in self.positions[0]], [-x*nPixelsPerUnitDistance+imageHeight for x in self.positions[1]]] Trajectory._draw(imgPositions, options, withOrigin, timeStep, **kwargs) def getXCoordinates(self): return self.positions[0] def getYCoordinates(self): return self.positions[1] def asArray(self): from numpy.core.multiarray import array return array(self.positions) def xBounds(self): # look for function that does min and max in one pass return Interval(min(self.getXCoordinates()), max(self.getXCoordinates())) def yBounds(self): # look for function that does min and max in one pass return Interval(min(self.getYCoordinates()), max(self.getYCoordinates())) def add(self, traj2): '''Returns a new trajectory of the same length''' if self.length() != traj2.length(): print 'Trajectories of different lengths' return None else: return Trajectory([[a+b for a,b in zip(self.getXCoordinates(),traj2.getXCoordinates())], [a+b for a,b in zip(self.getYCoordinates(),traj2.getYCoordinates())]]) def subtract(self, traj2): '''Returns a new trajectory of the same length''' if self.length() != traj2.length(): print 'Trajectories of different lengths' return None else: return Trajectory([[a-b for a,b in zip(self.getXCoordinates(),traj2.getXCoordinates())], [a-b for a,b in zip(self.getYCoordinates(),traj2.getYCoordinates())]]) def differentiate(self): diff = Trajectory() for i in xrange(1, self.length()): diff.addPosition(self[i]-self[i-1]) return diff def norm(self): '''Returns the list of the norms at each instant''' # def add(x, y): return x+y # sq = map(add, [x*x for x in self.positions[0]], [y*y for y in self.positions[1]]) # return sqrt(sq) from numpy import hypot return hypot(self.positions[0], self.positions[1]) def cumulatedDisplacement(self): 'Returns the sum of the distances between each successive point' displacement = 0 for i in xrange(self.length()-1): displacement += Point.distanceNorm2(self.__getitem__(i),self.__getitem__(i+1)) return displacement def similarOrientation(self, refDirection, cosineThreshold, minProportion = 0.5): '''Indicates whether the minProportion (<=1.) (eg half) of the trajectory elements (vectors for velocity) have a cosine with refDirection is smaller than cosineThreshold''' count = 0 lengthThreshold = float(self.length())*minProportion for p in self: if p.similarOrientation(refDirection, cosineThreshold): count += 1 if count > lengthThreshold: return True return False def wiggliness(self): return self.cumulatedDisplacement()/float(Point.distanceNorm2(self.__getitem__(0),self.__getitem__(self.length()-1))) def getIntersections(self, p1, p2): '''Returns a list of the indices at which the trajectory intersects with the segment of extremities p1 and p2 the list is empty if there is no crossing''' indices = [] for i in xrange(self.length()-1): q1=self.__getitem__(i) q2=self.__getitem__(i+1) p = utils.segmentIntersection(q1, q2, p1, p2) if p: if q1.x != q2.x: ratio = (p.x-q1.x)/(q2.x-q1.x) elif q1.y != q2.y: ratio = (p.y-q1.y)/(q2.y-q1.y) else: ratio = 0 indices.append(i+ratio) return indices def getTrajectoryInInterval(self, inter): if inter.first >=0 and inter.last<= self.length(): return Trajectory([self.positions[0][inter.first:inter.last], self.positions[1][inter.first:inter.last]]) else: return None def getTrajectoryInPolygonNoShapely(self, polygon): '''Returns the trajectory built with the set of points inside the polygon (array of Nx2 coordinates of the polygon vertices)''' traj = Trajectory() for p in self: if p.inPolygonNoShapely(polygon): traj.addPosition(p) return traj if shapelyAvailable: def getTrajectoryInPolygon(self, polygon): '''Returns the trajectory built with the set of points inside the (shapely) polygon''' traj = Trajectory() points = [p.asShapely() for p in self] for p in pointsInPolygon(points, polygon): traj.addPositionXY(p.x, p.y) return traj @staticmethod def lcss(t1, t2, lcss): return lcss.compute(t1, t2) class CurvilinearTrajectory(Trajectory): '''Sub class of trajectory for trajectories with curvilinear coordinates and lane assignements longitudinal coordinate is stored as first coordinate (exterior name S) lateral coordiante is stored as second coordinate''' def __init__(self, S = [], Y = [], lanes = []): self.positions = [S,Y] self.lanes = lanes def __getitem__(self,i): return [self.positions[0][i], self.positions[1][i], self.lanes[i]] def getSCoordinates(self): return self.getXCoordinates() def getLanes(self): return self.lanes def addPosition(self, s, y, lane): self.addPositionXY(s,y) self.lanes.append(lane) def setPosition(self, i, s, y, lane): self.setPositionXY(i, s, y) if i < self.__len__(): self.lanes[i] = lane ################## # Moving Objects ################## userTypeNames = ['unknown', 'car', 'pedestrian', 'motorcycle', 'bicycle', 'bus', 'truck'] userType2Num = utils.inverseEnumeration(userTypeNames) class MovingObject(STObject): '''Class for moving objects: a spatio-temporal object with a trajectory and a geometry (constant volume over time) and a usertype (e.g. road user) coded as a number (see ''' def __init__(self, num = None, timeInterval = None, positions = None, velocities = None, geometry = None, userType = None): super(MovingObject, self).__init__(num, timeInterval) self.positions = positions self.velocities = velocities self.geometry = geometry self.userType = userType self.features = None # compute bounding polygon from trajectory def getObjectInTimeInterval(self, inter): '''Returns a new object extracted from self, restricted to time interval inter''' intersection = TimeInterval.intersection(inter, self.getTimeInterval()) if not intersection.empty(): trajectoryInterval = TimeInterval(intersection.first-self.getFirstInstant(), intersection.last-self.getFirstInstant()) obj = MovingObject(self.num, intersection, self.positions.getTrajectoryInInterval(trajectoryInterval), self.geometry, self.userType) if self.velocities: obj.velocities = self.velocities.getTrajectoryInInterval(trajectoryInterval) return obj else: print 'The object does not exist at '+str(inter) return None def length(self): return self.timeInterval.length() def getPositions(self): return self.positions def getVelocities(self): return self.velocities def getUserType(self): return self.userType def getCurvilinearPositions(self): if hasattr(self, 'curvilinearPositions'): return self.curvilinearPositions else: return None def setUserType(self, userType): self.userType = userType def setFeatures(self, features): self.features = [features[i] for i in self.featureNumbers] def getSpeeds(self): return self.getVelocities().norm() def getPositionAt(self, i): return self.positions[i] def getVelocityAt(self, i): return self.velocities[i] def getPositionAtInstant(self, i): return self.positions[i-self.getFirstInstant()] def getVelocityAtInstant(self, i): return self.velocities[i-self.getFirstInstant()] def getXCoordinates(self): return self.positions.getXCoordinates() def getYCoordinates(self): return self.positions.getYCoordinates() def draw(self, options = '', withOrigin = False, timeStep = 1, **kwargs): self.positions.draw(options, withOrigin, timeStep, **kwargs) def drawOnWorldImage(self, nPixelsPerUnitDistance, imageHeight, options = '', withOrigin = False, timeStep = 1, **kwargs): self.positions.drawOnWorldImage(nPixelsPerUnitDistance, imageHeight, options, withOrigin, timeStep, **kwargs) def play(self, videoFilename, homography = None): cvutils.displayTrajectories(videoFilename, [self], homography, self.getFirstInstant(), self.getLastInstant()) def speedDiagnostics(self, framerate = 1., display = False): from numpy import std from scipy.stats import scoreatpercentile speeds = framerate*self.getSpeeds() coef = utils.linearRegression(range(len(speeds)), speeds) print('min/5th perc speed: {} / {}\nspeed diff: {}\nspeed stdev: {}\nregression: {}'.format(min(speeds), scoreatpercentile(speeds, 5), speeds[-2]-speeds[1], std(speeds), coef[0])) if display: from matplotlib.pyplot import figure, plot, axis figure(1) self.draw() axis('equal') figure(2) plot(list(self.getTimeInterval()), speeds) @staticmethod def distances(obj1, obj2, instant): from scipy.spatial.distance import cdist positions1 = [f.getPositionAtInstant(instant).astuple() for f in obj1.features if f.existsAtInstant(instant)] positions2 = [f.getPositionAtInstant(instant).astuple() for f in obj2.features if f.existsAtInstant(instant)] return cdist(positions1, positions2, metric = 'euclidean') @staticmethod def minDistance(obj1, obj2, instant): return MovingObject.distances(obj1, obj2, instant).min() @staticmethod def maxDistance(obj1, obj2, instant): return MovingObject.distances(obj1, obj2, instant).max() def maxSize(self): '''Returns the max distance between features at instant there are the most features''' if hasattr(self, 'features'): nFeatures = -1 tMaxFeatures = 0 for t in self.getTimeInterval(): n = len([f for f in self.features if f.existsAtInstant(t)]) if n > nFeatures: nFeatures = n tMaxFeatures = t return MovingObject.maxDistance(self, self, tMaxFeatures) else: print('Load features to compute a maximum size') return None def getInstantsCrossingLane(self, p1, p2): '''Returns the instant(s) at which the object passes from one side of the segment to the other empty list if there is no crossing''' indices = self.positions.getIntersections(p1, p2) return [t+self.getFirstInstant() for t in indices] def predictPosition(self, instant, nTimeSteps, externalAcceleration = Point(0,0)): '''Predicts the position of object at instant+deltaT, at constant speed''' return predictPositionNoLimit(nTimeSteps, self.getPositionAtInstant(instant), self.getVelocityAtInstant(instant), externalAcceleration) def classifyUserTypeSpeed(self, threshold, statisticsFunc = median): '''Classifies slow and fast road users slow: non-motorized -> pedestrians fast: motorized -> cars''' if statisticsFunc(self.velocities.norm()) >= threshold: self.setUserType(userType2Num['car']) else: self.setUserType(userType2Num['pedestrian']) @staticmethod def collisionCourseDotProduct(movingObject1, movingObject2, instant): 'A positive result indicates that the road users are getting closer' deltap = movingObject1.getPositionAtInstant(instant)-movingObject2.getPositionAtInstant(instant) deltav = movingObject2.getVelocityAtInstant(instant)-movingObject1.getVelocityAtInstant(instant) return Point.dot(deltap, deltav) @staticmethod def collisionCourseCosine(movingObject1, movingObject2, instant): 'A positive result indicates that the road users are getting closer' return Point.cosine(movingObject1.getPositionAtInstant(instant)-movingObject2.getPositionAtInstant(instant), #deltap movingObject2.getVelocityAtInstant(instant)-movingObject1.getVelocityAtInstant(instant)) #deltav def plotRoadUsers(objects, colors): '''Colors is a PlottingPropertyValues instance''' from matplotlib.pyplot import figure, axis figure() for obj in objects: obj.draw(colors.get(obj.userType)) axis('equal') if __name__ == "__main__": import doctest import unittest suite = doctest.DocFileSuite('tests/moving.txt') #suite = doctest.DocTestSuite() unittest.TextTestRunner().run(suite) #doctest.testmod() #doctest.testfile("example.txt")