Mercurial Hosting > traffic-intelligence
changeset 567:072cedc3f33d
first integration of curvilinear transformation from Paul
author | Nicolas Saunier <nicolas.saunier@polymtl.ca> |
---|---|
date | Tue, 05 Aug 2014 17:45:36 -0400 |
parents | 07b1bd0785cd |
children | 538fb47b3007 0057c04f94d5 |
files | python/moving.py |
diffstat | 1 files changed, 258 insertions(+), 2 deletions(-) [+] |
line wrap: on
line diff
--- a/python/moving.py Mon Jul 28 02:57:59 2014 -0400 +++ b/python/moving.py Tue Aug 05 17:45:36 2014 -0400 @@ -189,7 +189,7 @@ def norm2Squared(self): '''2-norm distance (Euclidean distance)''' - return self.x*self.x+self.y*self.y + return self.x**2+self.y**2 def norm2(self): '''2-norm distance (Euclidean distance)''' @@ -307,6 +307,174 @@ prepared_polygon = prep(polygon) return filter(prepared_polygon.contains, points) +# Functions for coordinate transformation +# From Paul St-Aubin's PVA tools +def ppd(*args): + ''' Get point-to-point distance. + + Example usage: + ============== + d = ppd(x1,y1,x2,y2) + d = ppd(P1,P2) + ''' + + if(len(args)==4): [x1,y1,x2,y2] = [args[0],args[1],args[2],args[3]] + elif(len(args)==2 and hasattr(args[0], '__iter__')): [x1,y1,x2,y2] = [args[0][0],args[0][1],args[1][0],args[1][1]] + else: raise Exception("Library error: ppd() in tools_geo takes exactly 2 or 4 arguments.") + return sqrt((x2-x1)**2+(y2-y1)**2) + +def subsec_spline_dist(splines): + ''' Prepare list of spline subsegments from a spline list. + + Output: + ======= + ss_spline_d[spline #][mode][station] + + where: + mode=0: incremental distance + mode=1: cumulative distance + mode=2: cumulative distance with trailing distance + ''' + + from numpy import zeros + ss_spline_d = [] + #Prepare subsegment distances + for spline in range(len(splines)): + ss_spline_d.append([[],[],[]]) + ss_spline_d[spline][0] = zeros(len(splines[spline])-1) #Incremental distance + ss_spline_d[spline][1] = zeros(len(splines[spline])-1) #Cumulative distance + ss_spline_d[spline][2] = zeros(len(splines[spline])) #Cumulative distance with trailing distance + for spline_p in range(len(splines[spline])): + if spline_p > (len(splines[spline]) - 2): + break + ss_spline_d[spline][0][spline_p] = ppd(splines[spline][spline_p][0],splines[spline][spline_p][1],splines[spline][(spline_p+1)][0],splines[spline][(spline_p+1)][1]) # TODO use points and remove ppd + ss_spline_d[spline][1][spline_p] = sum(ss_spline_d[spline][0][0:spline_p]) + ss_spline_d[spline][2][spline_p] = sum(ss_spline_d[spline][0][0:spline_p]) + + ss_spline_d[spline][2][-1] = ss_spline_d[spline][2][-2] + ss_spline_d[spline][0][-1] + + return ss_spline_d + +def ppldb2p(qx,qy, p0x,p0y, p1x,p1y): + ''' Point-projection (Q) on line defined by 2 points (P0,P1). + http://cs.nyu.edu/~yap/classes/visual/03s/hw/h2/math.pdf + ''' + if(p0x == p1x and p0y == p1y): + return False,False + try: + #Approximate slope singularity by giving some slope roundoff; account for roundoff error + if(round(p0x, 10) == round(p1x, 10)): + p1x += 0.0000000001 + if(round(p0y, 10) == round(p1y, 10)): + p1y += 0.0000000001 + #make the calculation + Y = (-(qx)*(p0y-p1y)-(qy*(p0y-p1y)**2)/(p0x-p1x)+p0x**2*(p0y-p1y)/(p0x-p1x)-p0x*p1x*(p0y-p1y)/(p0x-p1x)-p0y*(p0x-p1x))/(p1x-p0x-(p0y-p1y)**2/(p0x-p1x)) + X = (-Y*(p1y-p0y)+qx*(p1x-p0x)+qy*(p1y-p0y))/(p1x-p0x) + except ZeroDivisionError: + print('Error: Division by zero in ppldb2p. Please report this error with the full traceback:') + print('qx={0}, qy={1}, p0x={2}, p0y={3}, p1x={4}, p1y={5}...'.format(qx, qy, p0x, p0y, p1x, p1y)) + import pdb; pdb.set_trace() + return X,Y + +def getSYfromXY(qx, qy, splines, spline_assumption_threshold=0.5, mode=0): + ''' Snap a point (coordinates qx, qy) to it's nearest subsegment of it's nearest spline (from the list splines). + + To force snapping to a single spline, pass the spline alone through splines (e.g. splines=[splines[splineNum]]). + + Output: + ======= + [spline index, + subsegment leading point index, + snapped x coordinate, + snapped y coordinate, + subsegment distance, + spline distance, + orthogonal point offset] + ''' + + #(buckle in, it gets ugly from here on out) + ss_spline_d = subsec_spline_dist(splines) + + temp_dist_min = float('inf') + #For each spline + for spline in range(len(splines)): + #For each spline point + for spline_p in range(len(splines[spline])): + if(spline_p > (len(splines[spline]) - 2)): + break + #Get point-intersection distance + X,Y = ppldb2p(qx,qy,splines[spline][spline_p][0],splines[spline][spline_p][1],splines[spline][spline_p+1][0],splines[spline][spline_p+1][1]) + if(X == False and Y == False): + print('Error: Spline {0}, segment {1} has identical bounds and therefore is not a vector. Projection cannot continue.'.format(spline, spline_p)) + return [False,False,False,False,False,False,False] + #Check to see if point is not contained by subspline + if ss_spline_d[spline][0][spline_p]*1.05 < max(ppd(splines[spline][spline_p][0],splines[spline][spline_p][1],X,Y),ppd(splines[spline][spline_p+1][0],splines[spline][spline_p+1][1],X,Y)): continue + + #Ok + temp_dist = ppd(qx,qy,X,Y) + if(temp_dist < temp_dist_min): + temp_dist_min = temp_dist + snappedSpline = spline + snappedSplineLeadingPoint = spline_p + snapped_x = X + snapped_y = Y + #Jump loop if significantly close + if(temp_dist < spline_assumption_threshold): break + + try: + #Get sub-segment distance + subsegmentDistance = ppd(splines[snappedSpline][snappedSplineLeadingPoint][0],splines[snappedSpline][snappedSplineLeadingPoint][1],snapped_x,snapped_y) + #Get total segment distance + splineDistanceS = ss_spline_d[snappedSpline][1][snappedSplineLeadingPoint] + subsegmentDistance + #Get offset distance + offsetY = ppd(qx,qy, snapped_x,snapped_y) + + if(mode): + direction = getOrientation([splines[snappedSpline][snappedSplineLeadingPoint][0],splines[snappedSpline][snappedSplineLeadingPoint][1]] , [splines[snappedSpline][snappedSplineLeadingPoint+1][0],splines[snappedSpline][snappedSplineLeadingPoint+1][1]] , [qx,qy]) + if(direction == 'left'): offsetY = -offsetY + + return [snappedSpline, snappedSplineLeadingPoint, snapped_x, snapped_y, subsegmentDistance, splineDistanceS, offsetY] + except: + return [False,False,False,False,False,False,False] + + +def getXYfromSY(s, y, splineNum, splines, mode = 0): + ''' Find X,Y coordinate from S,Y data. + if Mode = 0 : return Snapped X,Y + if Mode !=0 : return Real X,Y + ''' + + #(buckle in, it gets ugly from here on out) + ss_spline_d = subsec_spline_dist(splines) + + #Find subsegment + snapped_x = None + snapped_y = None + for spline_ss_index in range(len(ss_spline_d[splineNum][1])): + if(s < ss_spline_d[splineNum][1][spline_ss_index]): + ss_value = s - ss_spline_d[splineNum][1][spline_ss_index-1] + #Get normal vector and then snap + vector_l_x = (splines[splineNum][spline_ss_index][0] - splines[splineNum][spline_ss_index-1][0]) + vector_l_y = (splines[splineNum][spline_ss_index][1] - splines[splineNum][spline_ss_index-1][1]) + magnitude = sqrt(vector_l_x**2 + vector_l_y**2) + n_vector_x = vector_l_x/magnitude + n_vector_y = vector_l_y/magnitude + snapped_x = splines[splineNum][spline_ss_index-1][0] + ss_value*n_vector_x + snapped_y = splines[splineNum][spline_ss_index-1][1] + ss_value*n_vector_y + + #Real values (including orthogonal projection of y)) + real_x = snapped_x - y*n_vector_y + real_y = snapped_y + y*n_vector_x + break + + if mode == 0 or (not snapped_x): + if(not snapped_x): + snapped_x = splines[splineNum][-1][0] + snapped_y = splines[splineNum][-1][1] + return [snapped_x,snapped_y] + else: + return [real_x,real_y] + class NormAngle(object): '''Alternate encoding of a point, by its norm and orientation''' @@ -877,10 +1045,98 @@ at constant speed''' return predictPositionNoLimit(nTimeSteps, self.getPositionAtInstant(instant), self.getVelocityAtInstant(instant), externalAcceleration) + def transformToCurvilinear(objects, alignments, ln_mv_av_win=3, verbose=0): + ''' Add, for every object position, the class 'moving.CurvilinearTrajectory()' + (curvilinearPositions instance) which holds information about the + curvilinear coordinates using alignment metadata. + From Paul St-Aubin's PVA tools + ====== + + Input: + ====== + objects = list of objects supplied by Traffic Intelligence + alignments = a list of alignments, where each alignment is a list of + points, where each point is a list of coordinates, e.g. + alignments[alpha][1][x] is coordinate x for point number + 1 of the spline that represents alignment alpha. + alignments can also be a compatible object that mimics a + 3-dimensional list using the __getitem__() method as is + the case in the PVAT specification. + ln_mv_av_win = moving average window (in points) in which to smooth + lane changes. As per tools_math.cat_mvgavg(), this term + is a search *radius* around the center of the window. + + Output: (objects, dropped_traj) + ======= + objects = modified list of objects + dropped_traj = list of objects or object segments that were + truncated. The format is identical to that of objects. + ''' + if(len(objects) <= 0): return None, None + if(verbose >= 2): + print(' -------------') + print(' Transforming coordinates...') + + lane_readjustments = 0 + dropped_traj = [] + original_object_length = len(objects) + reset_objects = 0 + + #if(not isinstance(objects, list)): + # objects = [objects] + # reset_objects = 1 + #if(not isinstance(objects[0], TrafIntMoving.MovingObject)): + # return objects, dropped_traj + + #For each object + for i in range(len(objects)): + objects[i].curvilinearPositions = CurvilinearTrajectory([],[],[]) + + #For each point + for point in range(len(objects[i].getXCoordinates())): + [align, alignPoint, snapped_x, snapped_y, subsegmentDistance, S, Y] = PvaTools.Geo.getSYfromXY(objects[i].getXCoordinates()[point],objects[i].getYCoordinates()[point],alignments) + + # Error handling + if(align is False): + if(verbose >= 2): print(' Warning: trajectory '+str(i)+' at point '+str(point)+' has alignment errors (spline snapping)') + dropped_traj.append(objects[i]) + objects[i] = None + break + else: objects[i].curvilinearPositions.addPosition(S, Y, align) + + if(objects[i] == None): continue + + ## Go back through points and correct lane + #Run through objects looking for outlier point + smoothed_lanes = PvaTools.Math.cat_mvgavg(objects[i].curvilinearPositions.getLanes(),window=ln_mv_av_win) + ## Recalculate smoothed lanes + if(objects[i].curvilinearPositions.getLanes() != smoothed_lanes): + for point in range(len(objects[i].getXCoordinates())): + if(objects[i].curvilinearPositions.getLanes()[point] != smoothed_lanes[point]): + [align, alignPoint, snapped_x, snapped_y, subsegmentDistance, S, Y] = PvaTools.Geo.getSYfromXY(objects[i].getXCoordinates()[point],objects[i].getYCoordinates()[point],[alignments[smoothed_lanes[point]]]) + + # Error handling + if(align is False): + ## This can be triggered by tracking errors when the trajectory jumps around passed another alignment. + if(verbose >= 4): print(' Warning: trajectory '+str(i)+' at point '+str(point)+' has alignment errors during trajectory smoothing and will not be corrected.') + else: + objects[i].curvilinearPositions.setPosition(point, S, Y, align) + + #Resize objects + if(len(dropped_traj) > 0): + objects = filter(None, objects) + if(verbose >= 2): print(' Filtering report: Trajectories dropped: '+str(len(dropped_traj))) + if(verbose >= 2): print(' Filtering report: Lane observation corrections per object: '+str(lane_readjustments/original_object_length)) + + if(reset_objects and len(objects) > 0): return objects[0], dropped_traj + else: return objects, dropped_traj + + def computeSmoothTrajectory(self, minCommonIntervalLength): '''Computes the trajectory as the mean of all features if a feature exists, its position is - + + Warning work in progress TODO? not use the first/last 1-.. positions''' from numpy import array, median nFeatures = len(self.features)