Mercurial Hosting > traffic-intelligence
changeset 574:e24eeb244698
first implementation of projection to curvilinear coordinates
author | Nicolas Saunier <nicolas.saunier@polymtl.ca> |
---|---|
date | Wed, 13 Aug 2014 00:00:08 -0400 |
parents | cae4e5f3fe9f |
children | 13df64a9ff9d |
files | python/moving.py python/utils.py |
diffstat | 2 files changed, 63 insertions(+), 53 deletions(-) [+] |
line wrap: on
line diff
--- a/python/moving.py Tue Aug 12 17:47:16 2014 -0400 +++ b/python/moving.py Wed Aug 13 00:00:08 2014 -0400 @@ -411,16 +411,19 @@ if offsetY < goodEnoughSplineDistance: break #Get sub-segment distance - subsegmentDistance = utils.pointDistanceL2(splines[snappedSpline][snappedSplineLeadingPoint][0],splines[snappedSpline][snappedSplineLeadingPoint][1],snapped_x,snapped_y) - #Get total segment distance - splineDistanceS = ss_spline_d[snappedSpline][1][snappedSplineLeadingPoint] + subsegmentDistance - orthogonalSplineVector = Point(splines[snappedSpline][snappedSplineLeadingPoint+1][1]-splines[snappedSpline][snappedSplineLeadingPoint][1], - splines[snappedSpline][snappedSplineLeadingPoint][0]-splines[snappedSpline][snappedSplineLeadingPoint+1][0])#positive orthogonal vector of vector (x,y) is (y, -x) - offsetVector = Point(qx-snapped_x, qy-snapped_y) - if Point.dot(orthogonalSplineVector, offsetVector) < 0: - minOffsetY = -minOffsetY - return [snappedSpline, snappedSplineLeadingPoint, snapped_x, snapped_y, subsegmentDistance, splineDistanceS, minOffsetY] - + if minOffsetY != float('inf'): + subsegmentDistance = utils.pointDistanceL2(splines[snappedSpline][snappedSplineLeadingPoint][0],splines[snappedSpline][snappedSplineLeadingPoint][1],snapped_x,snapped_y) + #Get total segment distance + splineDistanceS = ss_spline_d[snappedSpline][1][snappedSplineLeadingPoint] + subsegmentDistance + orthogonalSplineVector = Point(splines[snappedSpline][snappedSplineLeadingPoint+1][1]-splines[snappedSpline][snappedSplineLeadingPoint][1], + splines[snappedSpline][snappedSplineLeadingPoint][0]-splines[snappedSpline][snappedSplineLeadingPoint+1][0])#positive orthogonal vector of vector (x,y) is (y, -x) + offsetVector = Point(qx-snapped_x, qy-snapped_y) + if Point.dot(orthogonalSplineVector, offsetVector) < 0: + minOffsetY = -minOffsetY + return [snappedSpline, snappedSplineLeadingPoint, snapped_x, snapped_y, subsegmentDistance, splineDistanceS, minOffsetY] + else: + return None + def getXYfromSY(s, y, splineNum, splines, mode = 0): ''' Find X,Y coordinate from S,Y data. if mode = 0 : return Snapped X,Y @@ -1037,7 +1040,7 @@ at constant speed''' return predictPositionNoLimit(nTimeSteps, self.getPositionAtInstant(instant), self.getVelocityAtInstant(instant), externalAcceleration) - def transformToCurvilinear(objects, alignments, ln_mv_av_win=3, verbose=0): + def projectCurvilinear(self, alignments, ln_mv_av_win=3): ''' Add, for every object position, the class 'moving.CurvilinearTrajectory()' (curvilinearPositions instance) which holds information about the curvilinear coordinates using alignment metadata. @@ -1064,15 +1067,6 @@ dropped_traj = list of objects or object segments that were truncated. The format is identical to that of objects. ''' - if(len(objects) <= 0): return None, None - if(verbose >= 2): - print(' -------------') - print(' Transforming coordinates...') - - lane_readjustments = 0 - dropped_traj = [] - original_object_length = len(objects) - reset_objects = 0 #if(not isinstance(objects, list)): # objects = [objects] @@ -1081,47 +1075,50 @@ # return objects, dropped_traj #For each object - for i in range(len(objects)): - objects[i].curvilinearPositions = CurvilinearTrajectory([],[],[]) + #for i in range(len(objects)): + self.curvilinearPositions = CurvilinearTrajectory() - #For each point - for point in range(len(objects[i].getXCoordinates())): - [align, alignPoint, snapped_x, snapped_y, subsegmentDistance, S, Y] = PvaTools.Geo.getSYfromXY(objects[i].getXCoordinates()[point],objects[i].getYCoordinates()[point],alignments) + #For each point + for point in range(len(self.getXCoordinates())): + result = getSYfromXY(self.getXCoordinates()[point],self.getYCoordinates()[point],alignments) - # Error handling - if(align is False): - if(verbose >= 2): print(' Warning: trajectory '+str(i)+' at point '+str(point)+' has alignment errors (spline snapping)') - dropped_traj.append(objects[i]) - objects[i] = None - break - else: objects[i].curvilinearPositions.addPosition(S, Y, align) + # Error handling + if(result == None): + print('Warning: trajectory {} at point {} has alignment errors (spline snapping)\nCurvilinear trajectory could not be computed'.format(self.getNum(), point)) + #dropped_traj.append(self) + #self = None + #break + else: + [align, alignPoint, snapped_x, snapped_y, subsegmentDistance, S, Y] = result + self.curvilinearPositions.addPositionSYL(S, Y, align) - if(objects[i] == None): continue + #if(self == None): continue - ## Go back through points and correct lane - #Run through objects looking for outlier point - smoothed_lanes = PvaTools.Math.cat_mvgavg(objects[i].curvilinearPositions.getLanes(),window=ln_mv_av_win) - ## Recalculate smoothed lanes - if(objects[i].curvilinearPositions.getLanes() != smoothed_lanes): - for point in range(len(objects[i].getXCoordinates())): - if(objects[i].curvilinearPositions.getLanes()[point] != smoothed_lanes[point]): - [align, alignPoint, snapped_x, snapped_y, subsegmentDistance, S, Y] = PvaTools.Geo.getSYfromXY(objects[i].getXCoordinates()[point],objects[i].getYCoordinates()[point],[alignments[smoothed_lanes[point]]]) + ## Go back through points and correct lane + #Run through objects looking for outlier point + smoothed_lanes = utils.cat_mvgavg(self.curvilinearPositions.getLanes(),ln_mv_av_win) + ## Recalculate projected point to new lane + if(self.curvilinearPositions.getLanes() != smoothed_lanes): + for point in range(len(self.getXCoordinates())): + if(self.curvilinearPositions.getLanes()[point] != smoothed_lanes[point]): + result = getSYfromXY(self.getXCoordinates()[point],self.getYCoordinates()[point],[alignments[smoothed_lanes[point]]]) - # Error handling - if(align is False): - ## This can be triggered by tracking errors when the trajectory jumps around passed another alignment. - if(verbose >= 4): print(' Warning: trajectory '+str(i)+' at point '+str(point)+' has alignment errors during trajectory smoothing and will not be corrected.') - else: - objects[i].curvilinearPositions.setPosition(point, S, Y, align) + # Error handling + if(result == None): + ## This can be triggered by tracking errors when the trajectory jumps around passed another alignment. + print(' Warning: trajectory '+str(i)+' at point '+str(point)+' has alignment errors during trajectory smoothing and will not be corrected.') + else: + [align, alignPoint, snapped_x, snapped_y, subsegmentDistance, S, Y] = result + self.curvilinearPositions.setPosition(point, S, Y, align) #Resize objects - if(len(dropped_traj) > 0): - objects = filter(None, objects) - if(verbose >= 2): print(' Filtering report: Trajectories dropped: '+str(len(dropped_traj))) - if(verbose >= 2): print(' Filtering report: Lane observation corrections per object: '+str(lane_readjustments/original_object_length)) + # if(len(dropped_traj) > 0): + # objects = filter(None, objects) + # if(verbose >= 2): print(' Filtering report: Trajectories dropped: '+str(len(dropped_traj))) + #if(verbose >= 2): print(' Filtering report: Lane observation corrections per object: '+str(lane_readjustments/original_object_length)) - if(reset_objects and len(objects) > 0): return objects[0], dropped_traj - else: return objects, dropped_traj + #if(reset_objects and len(objects) > 0): return objects[0], dropped_traj + #else: return objects, dropped_traj def computeSmoothTrajectory(self, minCommonIntervalLength):
--- a/python/utils.py Tue Aug 12 17:47:16 2014 -0400 +++ b/python/utils.py Wed Aug 13 00:00:08 2014 -0400 @@ -251,6 +251,19 @@ def crossProduct(l1, l2): return l1[0]*l2[1]-l1[1]*l2[0] +def cat_mvgavg(cat_list, halfWidth): + ''' Return a list of categories/values smoothed according to a window. + halfWidth is the search radius on either side''' + from copy import deepcopy + catgories = deepcopy(cat_list) + smoothed = catgories + for point in range(len(catgories)): + lower_bound_check = max(0,point-halfWidth) + upper_bound_check = min(len(catgories)-1,point+halfWidth+1) + window_values = catgories[lower_bound_check:upper_bound_check] + smoothed[point] = max(set(window_values), key=window_values.count) + return smoothed + def filterMovingWindow(inputSignal, halfWidth): '''Returns an array obtained after the smoothing of the input by a moving average The first and last points are copied from the original.'''