Mercurial Hosting > traffic-intelligence
diff python/moving.py @ 614:5e09583275a4
Merged Nicolas/trafficintelligence into default
author | Mohamed Gomaa <eng.m.gom3a@gmail.com> |
---|---|
date | Fri, 05 Dec 2014 12:13:53 -0500 |
parents | c5406edbcf12 |
children | 5800a87f11ae 0954aaf28231 |
line wrap: on
line diff
--- a/python/moving.py Thu Apr 18 15:29:33 2013 -0400 +++ b/python/moving.py Fri Dec 05 12:13:53 2014 -0500 @@ -5,8 +5,15 @@ import cvutils from math import sqrt +from numpy import median -#from shapely.geometry import Polygon +try: + from shapely.geometry import Polygon, Point as shapelyPoint + from shapely.prepared import prep + shapelyAvailable = True +except ImportError: + print('Shapely library could not be loaded') + shapelyAvailable = False __metaclass__ = type @@ -29,6 +36,9 @@ def empty(self): return self.first > self.last + def center(self): + return (self.first+self.last)/2. + def length(self): '''Returns the length of the interval''' return float(max(0,self.last-self.first)) @@ -90,7 +100,11 @@ def __getitem__(self, i): if not self.empty(): - return self.first+i + if isinstance(i, int): + return self.first+i + else: + raise TypeError, "Invalid argument type." + #elif isinstance( key, slice ): def __iter__(self): self.iterInstantNum = -1 @@ -128,7 +142,7 @@ def empty(self): return self.timeInterval.empty() or not self.boudingPolygon - def getId(self): + def getNum(self): return self.num def getFirstInstant(self): @@ -163,16 +177,31 @@ def __sub__(self, other): return Point(self.x-other.x, self.y-other.y) + def __neg__(self): + return Point(-self.x, -self.y) + + def __getitem__(self, i): + if i == 0: + return self.x + elif i == 1: + return self.y + else: + raise IndexError() + + def orthogonal(self): + return Point(self.y, -self.x) + def multiply(self, alpha): + 'Warning, returns a new Point' return Point(self.x*alpha, self.y*alpha) - def draw(self, options = 'o', **kwargs): + def plot(self, options = 'o', **kwargs): from matplotlib.pylab import plot plot([self.x], [self.y], options, **kwargs) def norm2Squared(self): '''2-norm distance (Euclidean distance)''' - return self.x*self.x+self.y*self.y + return self.x**2+self.y**2 def norm2(self): '''2-norm distance (Euclidean distance)''' @@ -193,36 +222,44 @@ def asint(self): return Point(int(self.x), int(self.y)) + if shapelyAvailable: + def asShapely(self): + return shapelyPoint(self.x, self.y) + def project(self, homography): from numpy.core.multiarray import array projected = cvutils.projectArray(homography, array([[self.x], [self.y]])) return Point(projected[0], projected[1]) - def inPolygon(self, poly): - '''Returns if the point x, y is inside the polygon. - The polygon is defined by the ordered list of points in poly - + def inPolygonNoShapely(self, polygon): + '''Indicates if the point x, y is inside the polygon + (array of Nx2 coordinates of the polygon vertices) + taken from http://www.ariel.com.au/a/python-point-int-poly.html - Use points_inside_poly from matplotlib.nxutils''' + Use Polygon.contains if Shapely is installed''' - n = len(poly); + n = polygon.shape[0]; counter = 0; - p1 = poly[0]; + p1 = polygon[0,:]; for i in range(n+1): - p2 = poly[i % n]; - if self.y > min(p1.y,p2.y): - if self.y <= max(p1.y,p2.y): - if self.x <= max(p1.x,p2.x): - if p1.y != p2.y: - xinters = (self.y-p1.y)*(p2.x-p1.x)/(p2.y-p1.y)+p1.x; - if p1.x == p2.x or self.x <= xinters: + p2 = polygon[i % n,:]; + if self.y > min(p1[1],p2[1]): + if self.y <= max(p1[1],p2[1]): + if self.x <= max(p1[0],p2[0]): + if p1[1] != p2[1]: + xinters = (self.y-p1[1])*(p2[0]-p1[0])/(p2[1]-p1[1])+p1[0]; + if p1[0] == p2[0] or self.x <= xinters: counter+=1; p1=p2 return (counter%2 == 1); @staticmethod + def fromList(p): + return Point(p[0], p[1]) + + @staticmethod def dot(p1, p2): 'Scalar product' return p1.x*p2.x+p1.y*p2.y @@ -233,13 +270,201 @@ return p1.x*p2.y-p1.y*p2.x @staticmethod + def cosine(p1, p2): + return Point.dot(p1,p2)/(p1.norm2()*p2.norm2()) + + @staticmethod def distanceNorm2(p1, p2): return (p1-p2).norm2() @staticmethod - def plotAll(points, color='r'): + def plotAll(points, **kwargs): from matplotlib.pyplot import scatter - scatter([p.x for p in points],[p.y for p in points], c=color) + scatter([p.x for p in points],[p.y for p in points], **kwargs) + + def similarOrientation(self, refDirection, cosineThreshold): + 'Indicates whether the cosine of the vector and refDirection is smaller than cosineThreshold' + return Point.cosine(self, refDirection) >= cosineThreshold + + @staticmethod + def timeToCollision(p1, p2, v1, v2, collisionThreshold): + '''Computes exact time to collision with a distance threshold + The unknown of the equation is the time to reach the intersection + between the relative trajectory of one road user + and the circle of radius collisionThreshold around the other road user''' + from math import sqrt + dv = v1-v2 + dp = p1-p2 + a = dv.norm2Squared()#(v1.x-v2.x)**2 + (v1.y-v2.y)**2 + b = 2*Point.dot(dv, dp)#2 * ((p1.x-p2.x) * (v1.x-v2.x) + (p1.y-p2.y) * (v1.y-v2.y)) + c = dp.norm2Squared() - collisionThreshold**2#(p1.x-p2.x)**2 + (p1.y-p2.y)**2 - collisionThreshold**2 + + delta = b**2 - 4*a*c + if delta >= 0: + deltaRoot = sqrt(delta) + ttc1 = (-b + deltaRoot)/(2*a) + ttc2 = (-b - deltaRoot)/(2*a) + if ttc1 >= 0 and ttc2 >= 0: + ttc = min(ttc1,ttc2) + elif ttc1 >= 0: + ttc = ttc1 + elif ttc2 >= 0: + ttc = ttc2 + else: # ttc1 < 0 and ttc2 < 0: + ttc = None + else: + ttc = None + return ttc + + @staticmethod + def midPoint(p1, p2): + 'Returns the middle of the segment [p1, p2]' + return Point(0.5*p1.x+0.5*p2.x, 0.5*p1.y+0.5*p2.y) + +if shapelyAvailable: + def pointsInPolygon(points, polygon): + '''Optimized tests of a series of points within (Shapely) polygon ''' + prepared_polygon = prep(polygon) + return filter(prepared_polygon.contains, points) + +# Functions for coordinate transformation +# From Paul St-Aubin's PVA tools +def subsec_spline_dist(splines): + ''' Prepare list of spline subsegments from a spline list. + + Output: + ======= + ss_spline_d[spline #][mode][station] + + where: + mode=0: incremental distance + mode=1: cumulative distance + mode=2: cumulative distance with trailing distance + ''' + + from numpy import zeros + ss_spline_d = [] + #Prepare subsegment distances + for spline in range(len(splines)): + ss_spline_d.append([[],[],[]]) + ss_spline_d[spline][0] = zeros(len(splines[spline])-1) #Incremental distance + ss_spline_d[spline][1] = zeros(len(splines[spline])-1) #Cumulative distance + ss_spline_d[spline][2] = zeros(len(splines[spline])) #Cumulative distance with trailing distance + for spline_p in range(len(splines[spline])): + if spline_p > (len(splines[spline]) - 2): + break + ss_spline_d[spline][0][spline_p] = utils.pointDistanceL2(splines[spline][spline_p][0],splines[spline][spline_p][1],splines[spline][(spline_p+1)][0],splines[spline][(spline_p+1)][1]) + ss_spline_d[spline][1][spline_p] = sum(ss_spline_d[spline][0][0:spline_p]) + ss_spline_d[spline][2][spline_p] = ss_spline_d[spline][1][spline_p]#sum(ss_spline_d[spline][0][0:spline_p]) + + ss_spline_d[spline][2][-1] = ss_spline_d[spline][2][-2] + ss_spline_d[spline][0][-1] + + return ss_spline_d + +def ppldb2p(qx,qy, p0x,p0y, p1x,p1y): + ''' Point-projection (Q) on line defined by 2 points (P0,P1). + http://cs.nyu.edu/~yap/classes/visual/03s/hw/h2/math.pdf + ''' + if(p0x == p1x and p0y == p1y): + return None + try: + #Approximate slope singularity by giving some slope roundoff; account for roundoff error + if(round(p0x, 10) == round(p1x, 10)): + p1x += 0.0000000001 + if(round(p0y, 10) == round(p1y, 10)): + p1y += 0.0000000001 + #make the calculation + Y = (-(qx)*(p0y-p1y)-(qy*(p0y-p1y)**2)/(p0x-p1x)+p0x**2*(p0y-p1y)/(p0x-p1x)-p0x*p1x*(p0y-p1y)/(p0x-p1x)-p0y*(p0x-p1x))/(p1x-p0x-(p0y-p1y)**2/(p0x-p1x)) + X = (-Y*(p1y-p0y)+qx*(p1x-p0x)+qy*(p1y-p0y))/(p1x-p0x) + except ZeroDivisionError: + print('Error: Division by zero in ppldb2p. Please report this error with the full traceback:') + print('qx={0}, qy={1}, p0x={2}, p0y={3}, p1x={4}, p1y={5}...'.format(qx, qy, p0x, p0y, p1x, p1y)) + import pdb; pdb.set_trace() + return Point(X,Y) + +def getSYfromXY(p, splines, goodEnoughSplineDistance = 0.5): + ''' Snap a point p to it's nearest subsegment of it's nearest spline (from the list splines). A spline is a list of points (class Point), most likely a trajectory. + + Output: + ======= + [spline index, + subsegment leading point index, + snapped point, + subsegment distance, + spline distance, + orthogonal point offset] + ''' + minOffsetY = float('inf') + #For each spline + for spline in range(len(splines)): + #For each spline point index + for spline_p in range(len(splines[spline])-1): + #Get closest point on spline + closestPoint = ppldb2p(p.x,p.y,splines[spline][spline_p][0],splines[spline][spline_p][1],splines[spline][spline_p+1][0],splines[spline][spline_p+1][1]) + if closestPoint == None: + print('Error: Spline {0}, segment {1} has identical bounds and therefore is not a vector. Projection cannot continue.'.format(spline, spline_p)) + return None + # check if the + if utils.inBetween(splines[spline][spline_p][0], splines[spline][spline_p+1][0], closestPoint.x) and utils.inBetween(splines[spline][spline_p][1], splines[spline][spline_p+1][1], closestPoint.y): + offsetY = Point.distanceNorm2(closestPoint, p) + if offsetY < minOffsetY: + minOffsetY = offsetY + snappedSpline = spline + snappedSplineLeadingPoint = spline_p + snappedPoint = Point(closestPoint.x, closestPoint.y) + #Jump loop if significantly close + if offsetY < goodEnoughSplineDistance: + break + #Get sub-segment distance + if minOffsetY != float('inf'): + subsegmentDistance = Point.distanceNorm2(snappedPoint, splines[snappedSpline][snappedSplineLeadingPoint]) + #Get cumulative alignment distance (total segment distance) + splineDistanceS = splines[snappedSpline].getCumulativeDistance(snappedSplineLeadingPoint) + subsegmentDistance + orthogonalSplineVector = (splines[snappedSpline][snappedSplineLeadingPoint+1]-splines[snappedSpline][snappedSplineLeadingPoint]).orthogonal() + offsetVector = p-snappedPoint + if Point.dot(orthogonalSplineVector, offsetVector) < 0: + minOffsetY = -minOffsetY + return [snappedSpline, snappedSplineLeadingPoint, snappedPoint, subsegmentDistance, splineDistanceS, minOffsetY] + else: + return None + +def getXYfromSY(s, y, splineNum, splines, mode = 0): + ''' Find X,Y coordinate from S,Y data. + if mode = 0 : return Snapped X,Y + if mode !=0 : return Real X,Y + ''' + + #(buckle in, it gets ugly from here on out) + ss_spline_d = subsec_spline_dist(splines) + + #Find subsegment + snapped_x = None + snapped_y = None + for spline_ss_index in range(len(ss_spline_d[splineNum][1])): + if(s < ss_spline_d[splineNum][1][spline_ss_index]): + ss_value = s - ss_spline_d[splineNum][1][spline_ss_index-1] + #Get normal vector and then snap + vector_l_x = (splines[splineNum][spline_ss_index][0] - splines[splineNum][spline_ss_index-1][0]) + vector_l_y = (splines[splineNum][spline_ss_index][1] - splines[splineNum][spline_ss_index-1][1]) + magnitude = sqrt(vector_l_x**2 + vector_l_y**2) + n_vector_x = vector_l_x/magnitude + n_vector_y = vector_l_y/magnitude + snapped_x = splines[splineNum][spline_ss_index-1][0] + ss_value*n_vector_x + snapped_y = splines[splineNum][spline_ss_index-1][1] + ss_value*n_vector_y + + #Real values (including orthogonal projection of y)) + real_x = snapped_x - y*n_vector_y + real_y = snapped_y + y*n_vector_x + break + + if mode == 0 or (not snapped_x): + if(not snapped_x): + snapped_x = splines[splineNum][-1][0] + snapped_y = splines[splineNum][-1][1] + return [snapped_x,snapped_y] + else: + return [real_x,real_y] + class NormAngle(object): '''Alternate encoding of a point, by its norm and orientation''' @@ -254,6 +479,8 @@ norm = p.norm2() if norm > 0: angle = atan2(p.y, p.x) + else: + angle = 0. return NormAngle(norm, angle) def __add__(self, other): @@ -294,44 +521,59 @@ def multiply(self, alpha): return FlowVector(self.position.multiply(alpha), self.velocity.multiply(alpha)) - def draw(self, options = '', **kwargs): + def plot(self, options = '', **kwargs): from matplotlib.pylab import plot plot([self.position.x, self.position.x+self.velocity.x], [self.position.y, self.position.y+self.velocity.y], options, **kwargs) - self.position.draw(options+'x', **kwargs) + self.position.plot(options+'x', **kwargs) @staticmethod def similar(f1, f2, maxDistance2, maxDeltavelocity2): return (f1.position-f2.position).norm2Squared()<maxDistance2 and (f1.velocity-f2.velocity).norm2Squared()<maxDeltavelocity2 +def intersection(p1, p2, p3, p4): + ''' Intersection point (x,y) of lines formed by the vectors p1-p2 and p3-p4 + http://paulbourke.net/geometry/pointlineplane/''' + dp12 = p2-p1 + dp34 = p4-p3 + #det = (p4.y-p3.y)*(p2.x-p1.x)-(p4.x-p3.x)*(p2.y-p1.y) + det = dp34.y*dp12.x-dp34.x*dp12.y + if det == 0: + return None + else: + ua = (dp34.x*(p1.y-p3.y)-dp34.y*(p1.x-p3.x))/det + return p1+dp12.multiply(ua) + +# def intersection(p1, p2, dp1, dp2): +# '''Returns the intersection point between the two lines +# defined by the respective vectors (dp) and origin points (p)''' +# from numpy import matrix +# from numpy.linalg import linalg +# A = matrix([[dp1.y, -dp1.x], +# [dp2.y, -dp2.x]]) +# B = matrix([[dp1.y*p1.x-dp1.x*p1.y], +# [dp2.y*p2.x-dp2.x*p2.y]]) + +# if linalg.det(A) == 0: +# return None +# else: +# intersection = linalg.solve(A,B) +# return Point(intersection[0,0], intersection[1,0]) + def segmentIntersection(p1, p2, p3, p4): '''Returns the intersecting point of the segments [p1, p2] and [p3, p4], None otherwise''' - from numpy import matrix - from numpy.linalg import linalg, det if (Interval.intersection(Interval(p1.x,p2.x,True), Interval(p3.x,p4.x,True)).empty()) or (Interval.intersection(Interval(p1.y,p2.y,True), Interval(p3.y,p4.y,True)).empty()): return None else: - dp1 = p2-p1#[s1[0][1]-s1[0][0], s1[1][1]-s1[1][0]] - dp2 = p4-p3#[s2[0][1]-s2[0][0], s2[1][1]-s2[1][0]] - - A = matrix([[dp1.y, -dp1.x], - [dp2.y, -dp2.x]]) - B = matrix([[dp1.y*p1.x-dp1.x*p1.y], - [dp2.y*p3.x-dp2.x*p3.y]]) - - if linalg.det(A) == 0:#crossProduct(ds1, ds2) == 0: - return None + inter = intersection(p1, p2, p3, p4) + if (inter != None + and utils.inBetween(p1.x, p2.x, inter.x) + and utils.inBetween(p3.x, p4.x, inter.x) + and utils.inBetween(p1.y, p2.y, inter.y) + and utils.inBetween(p3.y, p4.y, inter.y)): + return inter else: - intersection = linalg.solve(A,B) - if (utils.inBetween(p1.x, p2.x, intersection[0,0]) - and utils.inBetween(p3.x, p4.x, intersection[0,0]) - and utils.inBetween(p1.y, p2.y, intersection[1,0]) - and utils.inBetween(p3.y, p4.y, intersection[1,0])): - return Point(intersection[0,0], intersection[1,0]) - else: - return None - -# TODO: implement a better algorithm for intersections of sets of segments http://en.wikipedia.org/wiki/Line_segment_intersection + return None class Trajectory(object): '''Class for trajectories: temporal sequence of positions @@ -345,6 +587,16 @@ self.positions = [[],[]] @staticmethod + def generate(p, v, nPoints): + t = Trajectory() + p0 = Point(p.x, p.y) + t.addPosition(p0) + for i in xrange(nPoints-1): + p0 += v + t.addPosition(p0) + return t, Trajectory([[v.x]*nPoints, [v.y]*nPoints]) + + @staticmethod def load(line1, line2): return Trajectory([[float(n) for n in line1.split(' ')], [float(n) for n in line2.split(' ')]]) @@ -352,18 +604,36 @@ @staticmethod def fromPointList(points): t = Trajectory() - for p in points: - t.addPosition(p) + if isinstance(points[0], list) or isinstance(points[0], tuple): + for p in points: + t.addPositionXY(p[0],p[1]) + else: + for p in points: + t.addPosition(p) return t + def __len__(self): + return len(self.positions[0]) + + def length(self): + return self.__len__() + + def empty(self): + return self.__len__() == 0 + + def __getitem__(self, i): + if isinstance(i, int): + return Point(self.positions[0][i], self.positions[1][i]) + else: + raise TypeError, "Invalid argument type." + #elif isinstance( key, slice ): + def __str__(self): return ' '.join([self.__getitem__(i).__str__() for i in xrange(self.length())]) def __repr__(self): - return str(self) + return self.__str__() - def __getitem__(self, i): - return Point(self.positions[0][i], self.positions[1][i]) def __iter__(self): self.iterInstantNum = 0 @@ -376,11 +646,13 @@ self.iterInstantNum += 1 return self[self.iterInstantNum-1] - def length(self): - return len(self.positions[0]) + def setPositionXY(self, i, x, y): + if i < self.__len__(): + self.positions[0][i] = x + self.positions[1][i] = y - def __len__(self): - return self.length() + def setPosition(self, i, p): + self.setPositionXY(i, p.x, p.y) def addPositionXY(self, x, y): self.positions[0].append(x) @@ -389,8 +661,12 @@ def addPosition(self, p): self.addPositionXY(p.x, p.y) + def duplicateLastPosition(self): + self.positions[0].append(self.positions[0][-1]) + self.positions[1].append(self.positions[1][-1]) + @staticmethod - def _draw(positions, options = '', withOrigin = False, lastCoordinate = None, timeStep = 1, **kwargs): + def _plot(positions, options = '', withOrigin = False, lastCoordinate = None, timeStep = 1, **kwargs): from matplotlib.pylab import plot if lastCoordinate == None: plot(positions[0][::timeStep], positions[1][::timeStep], options, **kwargs) @@ -400,21 +676,19 @@ plot([positions[0][0]], [positions[1][0]], 'ro', **kwargs) def project(self, homography): - from numpy.core.multiarray import array - projected = cvutils.projectArray(homography, array(self.positions)) - return Trajectory(projected) + return Trajectory(cvutils.projectTrajectory(homography, self.positions)) + + def plot(self, options = '', withOrigin = False, timeStep = 1, **kwargs): + Trajectory._plot(self.positions, options, withOrigin, None, timeStep, **kwargs) - def draw(self, options = '', withOrigin = False, timeStep = 1, **kwargs): - Trajectory._draw(self.positions, options, withOrigin, None, timeStep, **kwargs) + def plotAt(self, lastCoordinate, options = '', withOrigin = False, timeStep = 1, **kwargs): + Trajectory._plot(self.positions, options, withOrigin, lastCoordinate, timeStep, **kwargs) - def drawAt(self, lastCoordinate, options = '', withOrigin = False, timeStep = 1, **kwargs): - Trajectory._draw(self.positions, options, withOrigin, lastCoordinate, timeStep, **kwargs) - - def drawOnWorldImage(self, nPixelsPerUnitDistance, imageHeight, options = '', withOrigin = False, timeStep = 1, **kwargs): + def plotOnWorldImage(self, nPixelsPerUnitDistance, imageHeight, options = '', withOrigin = False, timeStep = 1, **kwargs): from matplotlib.pylab import plot imgPositions = [[x*nPixelsPerUnitDistance for x in self.positions[0]], [-x*nPixelsPerUnitDistance+imageHeight for x in self.positions[1]]] - Trajectory._draw(imgPositions, options, withOrigin, timeStep, **kwargs) + Trajectory._plot(imgPositions, options, withOrigin, timeStep, **kwargs) def getXCoordinates(self): return self.positions[0] @@ -452,6 +726,14 @@ return Trajectory([[a-b for a,b in zip(self.getXCoordinates(),traj2.getXCoordinates())], [a-b for a,b in zip(self.getYCoordinates(),traj2.getYCoordinates())]]) + def differentiate(self, doubleLastPosition = False): + diff = Trajectory() + for i in xrange(1, self.length()): + diff.addPosition(self[i]-self[i-1]) + if doubleLastPosition: + diff.addPosition(diff[-1]) + return diff + def norm(self): '''Returns the list of the norms at each instant''' # def add(x, y): return x+y @@ -460,12 +742,52 @@ from numpy import hypot return hypot(self.positions[0], self.positions[1]) - def cumulatedDisplacement(self): - 'Returns the sum of the distances between each successive point' - displacement = 0 + # def cumulatedDisplacement(self): + # 'Returns the sum of the distances between each successive point' + # displacement = 0 + # for i in xrange(self.length()-1): + # displacement += Point.distanceNorm2(self.__getitem__(i),self.__getitem__(i+1)) + # return displacement + + def computeCumulativeDistances(self): + '''Computes the distance from each point to the next and the cumulative distance up to the point + Can be accessed through getDistance(idx) and getCumulativeDistance(idx)''' + self.distances = [] + self.cumulativeDistances = [0.] + p1 = self[0] + cumulativeDistance = 0. for i in xrange(self.length()-1): - displacement += Point.distanceNorm2(self.__getitem__(i),self.__getitem__(i+1)) - return displacement + p2 = self[i+1] + self.distances.append(Point.distanceNorm2(p1,p2)) + cumulativeDistance += self.distances[-1] + self.cumulativeDistances.append(cumulativeDistance) + p1 = p2 + + def getDistance(self,i): + '''Return the distance between points i and i+1''' + if i < self.length()-1: + return self.distances[i] + else: + print('Index {} beyond trajectory length {}-1'.format(i, self.length())) + + def getCumulativeDistance(self, i): + '''Return the cumulative distance between the beginning and point i''' + if i < self.length(): + return self.cumulativeDistances[i] + else: + print('Index {} beyond trajectory length {}'.format(i, self.length())) + + def similarOrientation(self, refDirection, cosineThreshold, minProportion = 0.5): + '''Indicates whether the minProportion (<=1.) (eg half) of the trajectory elements (vectors for velocity) + have a cosine with refDirection is smaller than cosineThreshold''' + count = 0 + lengthThreshold = float(self.length())*minProportion + for p in self: + if p.similarOrientation(refDirection, cosineThreshold): + count += 1 + if count > lengthThreshold: + return True + return False def wiggliness(self): return self.cumulatedDisplacement()/float(Point.distanceNorm2(self.__getitem__(0),self.__getitem__(self.length()-1))) @@ -497,34 +819,104 @@ else: return None - def getTrajectoryInPolygon(self, polygon): - '''Returns the set of points inside the polygon + def getTrajectoryInPolygonNoShapely(self, polygon): + '''Returns the trajectory built with the set of points inside the polygon (array of Nx2 coordinates of the polygon vertices)''' - import matplotlib.nxutils as nx traj = Trajectory() - result = nx.points_inside_poly(self.asArray().T, polygon) - for i in xrange(self.length()): - if result[i]: - traj.addPositionXY(self.positions[0][i], self.positions[1][i]) + for p in self: + if p.inPolygonNoShapely(polygon): + traj.addPosition(p) return traj - # version 2: use shapely polygon contains - + if shapelyAvailable: + def getTrajectoryInPolygon(self, polygon): + '''Returns the trajectory built with the set of points inside the (shapely) polygon''' + traj = Trajectory() + points = [p.asShapely() for p in self] + for p in pointsInPolygon(points, polygon): + traj.addPositionXY(p.x, p.y) + return traj + @staticmethod - def norm2LCSS(t1, t2, threshold): - return utils.LCSS(t1, t2, threshold, Point.distanceNorm2) + def lcss(t1, t2, lcss): + return lcss.compute(t1, t2) + +class CurvilinearTrajectory(Trajectory): + '''Sub class of trajectory for trajectories with curvilinear coordinates and lane assignements + longitudinal coordinate is stored as first coordinate (exterior name S) + lateral coordiante is stored as second coordinate''' + + def __init__(self, S = None, Y = None, lanes = None): + if S == None or Y == None or len(S) != len(Y): + self.positions = [[],[]] + if S != None and Y != None and len(S) != len(Y): + print("S and Y coordinates of different lengths\nInitializing to empty lists") + else: + self.positions = [S,Y] + if lanes == None or len(lanes) != self.length(): + self.lanes = [] + else: + self.lanes = lanes + + def __getitem__(self,i): + if isinstance(i, int): + return [self.positions[0][i], self.positions[1][i], self.lanes[i]] + else: + raise TypeError, "Invalid argument type." + #elif isinstance( key, slice ): + + def getSCoordinates(self): + return self.getXCoordinates() + + def getLanes(self): + return self.lanes - @staticmethod - def normMaxLCSS(t1, t2, threshold): - return utils.LCSS(t1, t2, threshold, lambda p1, p2: (p1-p2).normMax()) + def addPositionSYL(self, s, y, lane): + self.addPositionXY(s,y) + self.lanes.append(lane) + + def addPosition(self, p): + 'Adds position in the point format for curvilinear of list with 3 values' + self.addPositionSYL(p[0], p[1], p[2]) + + def setPosition(self, i, s, y, lane): + self.setPositionXY(i, s, y) + if i < self.__len__(): + self.lanes[i] = lane + + def differentiate(self, doubleLastPosition = False): + diff = CurvilinearTrajectory() + p1 = self[0] + for i in xrange(1, self.length()): + p2 = self[i] + diff.addPositionSYL(p2[0]-p1[0], p2[1]-p1[1], p1[2]) + p1=p2 + if doubleLastPosition and self.length() > 1: + diff.addPosition(diff[-1]) + return diff + + def getIntersections(self, S1, lane = None): + '''Returns a list of the indices at which the trajectory + goes past the curvilinear coordinate S1 + (in provided lane if lane != None) + the list is empty if there is no crossing''' + indices = [] + for i in xrange(self.length()-1): + q1=self.__getitem__(i) + q2=self.__getitem__(i+1) + if q1[0] <= S1 < q2[0] and (lane == None or (self.lanes[i] == lane and self.lanes[i+1] == lane)): + indices.append(i+(S1-q1[0])/(q2[0]-q1[0])) + return indices ################## # Moving Objects ################## -userTypeNames = ['car', +userTypeNames = ['unknown', + 'car', 'pedestrian', - 'twowheels', + 'motorcycle', + 'bicycle', 'bus', 'truck'] @@ -532,16 +924,24 @@ class MovingObject(STObject): '''Class for moving objects: a spatio-temporal object - with a trajectory and a geometry (constant volume over time) and a usertype (e.g. road user) + with a trajectory and a geometry (constant volume over time) + and a usertype (e.g. road user) coded as a number (see userTypeNames) ''' - def __init__(self, num = None, timeInterval = None, positions = None, geometry = None, userType = None): + def __init__(self, num = None, timeInterval = None, positions = None, velocities = None, geometry = None, userType = userType2Num['unknown']): super(MovingObject, self).__init__(num, timeInterval) self.positions = positions + self.velocities = velocities self.geometry = geometry self.userType = userType + self.features = [] # compute bounding polygon from trajectory + @staticmethod + def generate(p, v, timeInterval): + positions, velocities = Trajectory.generate(p, v, int(timeInterval.length())) + return MovingObject(timeInterval = timeInterval, positions = positions, velocities = velocities) + def getObjectInTimeInterval(self, inter): '''Returns a new object extracted from self, restricted to time interval inter''' @@ -565,6 +965,18 @@ def getVelocities(self): return self.velocities + def getUserType(self): + return self.userType + + def getCurvilinearPositions(self): + if hasattr(self, 'curvilinearPositions'): + return self.curvilinearPositions + else: + return None + + def setUserType(self, userType): + self.userType = userType + def setFeatures(self, features): self.features = [features[i] for i in self.featureNumbers] @@ -589,15 +1001,73 @@ def getYCoordinates(self): return self.positions.getYCoordinates() - def draw(self, options = '', withOrigin = False, timeStep = 1, **kwargs): - self.positions.draw(options, withOrigin, timeStep, **kwargs) + def plot(self, options = '', withOrigin = False, timeStep = 1, withFeatures = False, **kwargs): + if withFeatures: + for f in self.features: + f.positions.plot('r', True, timeStep, **kwargs) + self.positions.plot('bx-', True, timeStep, **kwargs) + else: + self.positions.plot(options, withOrigin, timeStep, **kwargs) - def drawOnWorldImage(self, nPixelsPerUnitDistance, imageHeight, options = '', withOrigin = False, timeStep = 1, **kwargs): - self.positions.drawOnWorldImage(nPixelsPerUnitDistance, imageHeight, options, withOrigin, timeStep, **kwargs) + def plotOnWorldImage(self, nPixelsPerUnitDistance, imageHeight, options = '', withOrigin = False, timeStep = 1, **kwargs): + self.positions.plotOnWorldImage(nPixelsPerUnitDistance, imageHeight, options, withOrigin, timeStep, **kwargs) def play(self, videoFilename, homography = None): cvutils.displayTrajectories(videoFilename, [self], homography, self.getFirstInstant(), self.getLastInstant()) + def speedDiagnostics(self, framerate = 1., display = False): + from numpy import std + from scipy.stats import scoreatpercentile + speeds = framerate*self.getSpeeds() + coef = utils.linearRegression(range(len(speeds)), speeds) + print('min/5th perc speed: {} / {}\nspeed diff: {}\nspeed stdev: {}\nregression: {}'.format(min(speeds), scoreatpercentile(speeds, 5), speeds[-2]-speeds[1], std(speeds), coef[0])) + if display: + from matplotlib.pyplot import figure, plot, axis + figure(1) + self.plot() + axis('equal') + figure(2) + plot(list(self.getTimeInterval()), speeds) + + @staticmethod + def distances(obj1, obj2, instant1, _instant2 = None): + from scipy.spatial.distance import cdist + if _instant2 == None: + instant2 = instant1 + else: + instant2 = _instant2 + positions1 = [f.getPositionAtInstant(instant1).astuple() for f in obj1.features if f.existsAtInstant(instant1)] + positions2 = [f.getPositionAtInstant(instant2).astuple() for f in obj2.features if f.existsAtInstant(instant2)] + return cdist(positions1, positions2, metric = 'euclidean') + + @staticmethod + def minDistance(obj1, obj2, instant1, instant2 = None): + return MovingObject.distances(obj1, obj2, instant1, instant2).min() + + @staticmethod + def maxDistance(obj1, obj2, instant, instant2 = None): + return MovingObject.distances(obj1, obj2, instant1, instant2).max() + + def maxSize(self): + '''Returns the max distance between features + at instant there are the most features''' + if hasattr(self, 'features'): + nFeatures = -1 + tMaxFeatures = 0 + for t in self.getTimeInterval(): + n = len([f for f in self.features if f.existsAtInstant(t)]) + if n > nFeatures: + nFeatures = n + tMaxFeatures = t + return MovingObject.maxDistance(self, self, tMaxFeatures) + else: + print('Load features to compute a maximum size') + return None + + def setRoutes(self, startRouteID, endRouteID): + self.startRouteID = startRouteID + self.endRouteID = endRouteID + def getInstantsCrossingLane(self, p1, p2): '''Returns the instant(s) at which the object passes from one side of the segment to the other @@ -610,6 +1080,190 @@ at constant speed''' return predictPositionNoLimit(nTimeSteps, self.getPositionAtInstant(instant), self.getVelocityAtInstant(instant), externalAcceleration) + def projectCurvilinear(self, alignments, ln_mv_av_win=3): + ''' Add, for every object position, the class 'moving.CurvilinearTrajectory()' + (curvilinearPositions instance) which holds information about the + curvilinear coordinates using alignment metadata. + From Paul St-Aubin's PVA tools + ====== + + Input: + ====== + alignments = a list of alignments, where each alignment is a list of + points (class Point). + ln_mv_av_win = moving average window (in points) in which to smooth + lane changes. As per tools_math.cat_mvgavg(), this term + is a search *radius* around the center of the window. + + ''' + + self.curvilinearPositions = CurvilinearTrajectory() + + #For each point + for i in xrange(int(self.length())): + result = getSYfromXY(self.getPositionAt(i), alignments) + + # Error handling + if(result == None): + print('Warning: trajectory {} at point {} {} has alignment errors (spline snapping)\nCurvilinear trajectory could not be computed'.format(self.getNum(), i, self.getPositionAt(i))) + else: + [align, alignPoint, snappedPoint, subsegmentDistance, S, Y] = result + self.curvilinearPositions.addPositionSYL(S, Y, align) + + ## Go back through points and correct lane + #Run through objects looking for outlier point + smoothed_lanes = utils.cat_mvgavg(self.curvilinearPositions.getLanes(),ln_mv_av_win) + ## Recalculate projected point to new lane + lanes = self.curvilinearPositions.getLanes() + if(lanes != smoothed_lanes): + for i in xrange(len(lanes)): + if(lanes[i] != smoothed_lanes[i]): + result = getSYfromXY(self.getPositionAt(i),[alignments[smoothed_lanes[i]]]) + + # Error handling + if(result == None): + ## This can be triggered by tracking errors when the trajectory jumps around passed another alignment. + print(' Warning: trajectory {} at point {} {} has alignment errors during trajectory smoothing and will not be corrected.'.format(self.getNum(), i, self.getPositionAt(i))) + else: + [align, alignPoint, snappedPoint, subsegmentDistance, S, Y] = result + self.curvilinearPositions.setPosition(i, S, Y, align) + + def computeSmoothTrajectory(self, minCommonIntervalLength): + '''Computes the trajectory as the mean of all features + if a feature exists, its position is + + Warning work in progress + TODO? not use the first/last 1-.. positions''' + from numpy import array, median + nFeatures = len(self.features) + if nFeatures == 0: + print('Empty object features\nCannot compute smooth trajectory') + else: + # compute the relative position vectors + relativePositions = {} # relativePositions[(i,j)] is the position of j relative to i + for i in xrange(nFeatures): + for j in xrange(i): + fi = self.features[i] + fj = self.features[j] + inter = fi.commonTimeInterval(fj) + if inter.length() >= minCommonIntervalLength: + xi = array(fi.getXCoordinates()[inter.first-fi.getFirstInstant():int(fi.length())-(fi.getLastInstant()-inter.last)]) + yi = array(fi.getYCoordinates()[inter.first-fi.getFirstInstant():int(fi.length())-(fi.getLastInstant()-inter.last)]) + xj = array(fj.getXCoordinates()[inter.first-fj.getFirstInstant():int(fj.length())-(fj.getLastInstant()-inter.last)]) + yj = array(fj.getYCoordinates()[inter.first-fj.getFirstInstant():int(fj.length())-(fj.getLastInstant()-inter.last)]) + relativePositions[(i,j)] = Point(median(xj-xi), median(yj-yi)) + relativePositions[(j,i)] = -relativePositions[(i,j)] + + ### + # User Type Classification + ### + def classifyUserTypeSpeedMotorized(self, threshold, aggregationFunc = median, ignoreNInstantsAtEnds = 0): + '''Classifies slow and fast road users + slow: non-motorized -> pedestrians + fast: motorized -> cars''' + if ignoreNInstantsAtEnds > 0: + speeds = self.getSpeeds()[ignoreNInstantsAtEnds:-ignoreNInstantsAtEnds] + else: + speeds = self.getSpeeds() + if aggregationFunc(speeds) >= threshold: + self.setUserType(userType2Num['car']) + else: + self.setUserType(userType2Num['pedestrian']) + + def classifyUserTypeSpeed(self, speedProbabilities, aggregationFunc = median): + '''Classifies road user per road user type + speedProbabilities are functions return P(speed|class) + in a dictionary indexed by user type names + Returns probabilities for each class + + for simple threshold classification, simply pass non-overlapping indicator functions (membership) + e.g. def indic(x): + if abs(x-mu) < sigma: + return 1 + else: + return x''' + if not hasattr(self, aggregatedSpeed): + self.aggregatedSpeed = aggregationFunc(self.getSpeeds()) + userTypeProbabilities = {} + for userTypename in speedProbabilities: + userTypeProbabilities[userType2Num[userTypename]] = speedProbabilities[userTypename](self.aggregatedSpeed) + self.setUserType(utils.argmaxDict(userTypeProbabilities)) + return userTypeProbabilities + + def initClassifyUserTypeHoGSVM(self, aggregationFunc = median): + '''Initializes the data structures for classification + + TODO? compute speed for longest feature? + Skip beginning and end of feature for speed? Offer options instead of median''' + self.aggregatedSpeed = aggregationFunc(self.getSpeeds()) + self.userTypes = {} + + def classifyUserTypeHoGSVMAtInstant(self, img, pedBikeCarSVM, instant, homography, width, height, bikeCarSVM = None, pedBikeSpeedTreshold = float('Inf'), bikeCarSpeedThreshold = float('Inf'), px = 0.2, py = 0.2, pixelThreshold = 800): + '''Extract the image box around the object and + applies the SVM model on it''' + from numpy import array + croppedImg, yCropMin, yCropMax, xCropMin, xCropMax = imageBox(img, self, instant, homography, width, height, px, py, pixelThreshold) + if len(croppedImg) > 0: # != [] + hog = array([cvutils.HOG(croppedImg)], dtype = np.float32) + if self.aggregatedSpeed < pedBikeSpeedTreshold or bikeCarSVM == None: + self.userTypes[instant] = int(pedBikeCarSVM.predict(hog)) + elif self.aggregatedSpeed < bikeCarSpeedTreshold: + self.userTypes[instant] = int(bikeCarSVM.predict(hog)) + else: + self.userTypes[instant] = userType2Num['car'] + else: + self.userTypes[instant] = userType2Num['unknown'] + + def classifyUserTypeHoGSVM(self, images, pedBikeCarSVM, homography, width, height, bikeCarSVM = None, pedBikeSpeedTreshold = float('Inf'), bikeCarSpeedThreshold = float('Inf'), speedProbabilities = None, aggregationFunc = median, px = 0.2, py = 0.2, pixelThreshold = 800): + '''Agregates SVM detections in each image and returns probability + (proportion of instants with classification in each category) + + iamges is a dictionary of images indexed by instant + With default parameters, the general (ped-bike-car) classifier will be used + TODO? consider all categories?''' + if not hasattr(self, aggregatedSpeed) or not hasattr(self, userTypes): + print('Initilize the data structures for classification by HoG-SVM') + self.initClassifyUserTypeHoGSVM(aggregationFunc) + + if len(self.userTypes) != self.length(): # if classification has not been done previously + for t in self.getTimeInterval(): + if t not in self.userTypes: + self.classifyUserTypeHoGSVMAtInstant(images[t], pedBikeCarSVM, t, homography, width, height, bikeCarSVM, pedBikeSpeedTreshold, bikeCarSpeedThreshold, px, py, pixelThreshold) + # compute P(Speed|Class) + if speedProbabilities == None: # equiprobable information from speed + userTypeProbabilities = {userType2Num['car']: 1., userType2Num['pedestrian']: 1., userType2Num['bicycle']: 1.} + else: + userTypeProbabilities = {userType2Num[userTypename]: speedProbabilities[userTypename](self.aggregatedSpeed) for userTypename in speedProbabilities} + # result is P(Class|Appearance) x P(Speed|Class) + nInstantsUserType = {userType2Num[userTypename]: 0 for userTypename in userTypeProbabilities}# number of instants the object is classified as userTypename + for t in self.userTypes: + nInstantsUserType[self.userTypes[t]] += 1 + for userTypename in userTypeProbabilities: + userTypeProbabilities[userTypename] *= nInstantsUserType[userTypename] + # class is the user type that maximizes usertype probabilities + self.setUserType(utils.argmaxDict(userTypeProbabilities)) + + def classifyUserTypeArea(self, areas, homography): + '''Classifies the object based on its location (projected to image space) + areas is a dictionary of matrix of the size of the image space + for different road users possible locations, indexed by road user type names + + TODO: areas could be a wrapper object with a contains method that would work for polygons and images (with wrapper class) + skip frames at beginning/end?''' + print('not implemented/tested yet') + if not hasattr(self, projectedPositions): + if homography != None: + self.projectedPositions = obj.positions.project(homography) + else: + self.projectedPositions = obj.positions + possibleUserTypes = {userType: 0 for userType in range(len(userTypenames))} + for p in self.projectedPositions: + for userTypename in areas: + if areas[userTypename][p.x, p.y] != 0: + possibleUserTypes[userType2Enum[userTypename]] += 1 + # what to do: threshold for most common type? self.setUserType() + return possibleUserTypes + @staticmethod def collisionCourseDotProduct(movingObject1, movingObject2, instant): 'A positive result indicates that the road users are getting closer' @@ -620,16 +1274,33 @@ @staticmethod def collisionCourseCosine(movingObject1, movingObject2, instant): 'A positive result indicates that the road users are getting closer' - deltap = movingObject1.getPositionAtInstant(instant)-movingObject2.getPositionAtInstant(instant) - deltav = movingObject2.getVelocityAtInstant(instant)-movingObject1.getVelocityAtInstant(instant) - return Point.dot(deltap, deltav)/(deltap.norm2()*deltav.norm2()) + return Point.cosine(movingObject1.getPositionAtInstant(instant)-movingObject2.getPositionAtInstant(instant), #deltap + movingObject2.getVelocityAtInstant(instant)-movingObject1.getVelocityAtInstant(instant)) #deltav + + +################## +# Annotations +################## + +class BBAnnotation(MovingObject): + '''Class for : a series of ground truth annotations using bounding boxes + Its center is the center of the containing shape + ''' + + def __init__(self, num = None, timeInterval = None, topPositions = None, bottomPositions = None, userType = userType2Num['unknown']): + super(BBAnnotation, self).__init__(num, timeInterval, Trajectory(), userType = userType) + self.topPositions = topPositions.getPositions() + self.bottomPositions = bottomPositions.getPositions() + for i in xrange(int(topPositions.length())): + self.positions.addPosition((topPositions.getPositionAt(i) + bottomPositions.getPositionAt(i)).multiply(0.5)) + def plotRoadUsers(objects, colors): '''Colors is a PlottingPropertyValues instance''' from matplotlib.pyplot import figure, axis figure() for obj in objects: - obj.draw(colors.get(obj.userType)) + obj.plot(colors.get(obj.userType)) axis('equal')