Mercurial Hosting > traffic-intelligence
changeset 939:a2f3f3ca241e
work in progress
author | Nicolas Saunier <nicolas.saunier@polymtl.ca> |
---|---|
date | Mon, 17 Jul 2017 17:56:52 -0400 |
parents | fbf12382f3f8 |
children | d8ab183a7351 |
files | python/moving.py python/prediction.py scripts/safety-analysis.py |
diffstat | 3 files changed, 34 insertions(+), 19 deletions(-) [+] |
line wrap: on
line diff
--- a/python/moving.py Mon Jul 17 16:11:18 2017 -0400 +++ b/python/moving.py Mon Jul 17 17:56:52 2017 -0400 @@ -221,7 +221,7 @@ else: return Point(-self.y, self.x) - def multiply(self, alpha): + def __mul__(self, alpha): 'Warning, returns a new Point' return Point(self.x*alpha, self.y*alpha) @@ -430,7 +430,7 @@ return Point(X,Y) def getSYfromXY(p, splines, goodEnoughSplineDistance = 0.5): - ''' Snap a point p to it's nearest subsegment of it's nearest spline (from the list splines). + ''' Snap a point p to its nearest subsegment of it's nearest spline (from the list splines). A spline is a list of points (class Point), most likely a trajectory. Output: @@ -544,7 +544,7 @@ def predictPositionNoLimit(nTimeSteps, initialPosition, initialVelocity, initialAcceleration = Point(0,0)): '''Predicts the position in nTimeSteps at constant speed/acceleration''' - return initialVelocity + initialAcceleration.multiply(nTimeSteps),initialPosition+initialVelocity.multiply(nTimeSteps) + initialAcceleration.multiply(nTimeSteps**2*0.5) + return initialVelocity + initialAcceleration.__mul__(nTimeSteps),initialPosition+initialVelocity.__mul__(nTimeSteps) + initialAcceleration.__mul__(nTimeSteps**2*0.5) def predictPosition(position, speedOrientation, control, maxSpeed = None): '''Predicts the position (moving.Point) at the next time step with given control input (deltaSpeed, deltaTheta) @@ -568,8 +568,8 @@ def __add__(self, other): return FlowVector(self.position+other.position, self.velocity+other.velocity) - def multiply(self, alpha): - return FlowVector(self.position.multiply(alpha), self.velocity.multiply(alpha)) + def __mul__(self, alpha): + return FlowVector(self.position.__mul__(alpha), self.velocity.__mul__(alpha)) def plot(self, options = '', **kwargs): plot([self.position.x, self.position.x+self.velocity.x], [self.position.y, self.position.y+self.velocity.y], options, **kwargs) @@ -590,7 +590,7 @@ return None else: ua = (dp34.x*(p1.y-p3.y)-dp34.y*(p1.x-p3.x))/det - return p1+dp12.multiply(ua) + return p1+dp12.__mul__(ua) # def intersection(p1, p2, dp1, dp2): # '''Returns the intersection point between the two lines @@ -795,7 +795,7 @@ return Trajectory([[a-b for a,b in zip(self.getXCoordinates(),traj2.getXCoordinates())], [a-b for a,b in zip(self.getYCoordinates(),traj2.getYCoordinates())]]) - def multiply(self, alpha): + def __mul__(self, alpha): '''Returns a new trajectory of the same length''' return Trajectory([[alpha*x for x in self.getXCoordinates()], [alpha*y for y in self.getYCoordinates()]]) @@ -876,11 +876,11 @@ distances2.append(Point.distanceNorm2(p1, p2)) if distances2[-1] < minDist2: minDist2 = distances2[-1] - i = len(distances)-1 - if maxDist2 is None or (maxDist2 is not None and minDist2 < maxDist2): + i = len(distances2)-1 + if maxDist2 is not None and minDist2 < maxDist2: + return None + else: return i - else: - return None def similarOrientation(self, refDirection, cosineThreshold, minProportion = 0.5): '''Indicates whether the minProportion (<=1.) (eg half) of the trajectory elements (vectors for velocity) @@ -1140,7 +1140,7 @@ n = len([f for f in obj.getFeatures() if f.existsAtInstant(t)]) else: n = 1. - p += obj.getPositionAtInstant(t).multiply(n) + p += obj.getPositionAtInstant(t).__mul__(n) nTotal += n assert nTotal>0, 'there should be at least one point for each instant' positions.addPosition(p.divide(nTotal)) @@ -1156,7 +1156,7 @@ n = len([f for f in obj.getFeatures() if f.existsAtInstant(t)]) else: n = 1. - p += obj.getVelocityAtInstant(t).multiply(n) + p += obj.getVelocityAtInstant(t).__mul__(n) nTotal += n assert n>0, 'there should be at least one point for each instant' velocities.addPosition(p.divide(nTotal)) @@ -1731,7 +1731,7 @@ self.bottomRightPositions = bottomRightPositions.getPositions() def computeCentroidTrajectory(self, homography = None): - self.positions = self.topLeftPositions.add(self.bottomRightPositions).multiply(0.5) + self.positions = self.topLeftPositions.add(self.bottomRightPositions).__mul__(0.5) if homography is not None: self.positions = self.positions.homographyProject(homography)
--- a/python/prediction.py Mon Jul 17 16:11:18 2017 -0400 +++ b/python/prediction.py Mon Jul 17 17:56:52 2017 -0400 @@ -72,20 +72,30 @@ to the ratio of the user instantaneous speed and the trajectory closest speed)''' def __init__(self, initialPosition, initialVelocity, prototypeTrajectory, constantSpeed = False, probability = 1.): + ''' prototypeTrajectory is a MovingObject''' self.prototypeTrajectory = prototypeTrajectory self.constantSpeed = constantSpeed self.probability = probability self.predictedPositions = {0: initialPosition} - self.closestPointIdx = prototypeTrajectory.getClosestPoint(initialPosition) - self.deltaPosition = prototypeTrajectory[self.closestPointIdx]-initialPosition - self.initialSpeed = initialVelocity.norm() + self.closestPointIdx = prototypeTrajectory.getPositions().getClosestPoint(initialPosition) + self.deltaPosition = initialPosition-prototypeTrajectory.getPositionAt(self.closestPointIdx) + self.initialSpeed = initialVelocity.norm2() #self.predictedSpeedOrientations = {0: moving.NormAngle(moving.NormAngle.fromPoint(initialVelocity).norm, findNearestParams(initialPosition,prototypeTrajectory)[1])}#moving.NormAngle.fromPoint(initialVelocity)} def predictPosition(self, nTimeSteps): if nTimeSteps > 0 and not nTimeSteps in self.predictedPositions.keys(): if self.constantSpeed: # calculate cumulative distance - pass + traj = self.prototypeTrajectory.getPositions() + traveledDistance = nTimeSteps*self.initialSpeed + traj.getCumulativeDistance(self.closestPointIdx) + i = self.closestPointIdx + while i < traj.length() and traj.getCumulativeDistance(i) < traveledDistance: + i += 1 + if i == traj.length(): + v = self.prototypeTrajectory.getVelocityAt(-1) + self.predictedPositions[nTimeSteps] = self.deltaPosition+traj[i-1]+v*((traveledDistance-traj.getCumulativeDistance(i-1))/v.norm2()) + else: + self.predictedPositions[nTimeSteps] = self.deltaPosition+traj[i-1]+(traj[i]-traj[i-1])*((traveledDistance-traj.getCumulativeDistance(i-1))/traj.getDistance(i-1)) else: # see c++ code, calculate ratio speedNorm= self.predictedSpeedOrientations[0].norm instant=findNearestParams(self.predictedPositions[0],self.prototypeTrajectory)[0]
--- a/scripts/safety-analysis.py Mon Jul 17 16:11:18 2017 -0400 +++ b/scripts/safety-analysis.py Mon Jul 17 17:56:52 2017 -0400 @@ -14,7 +14,8 @@ parser.add_argument('--cfg', dest = 'configFilename', help = 'name of the configuration file', required = True) parser.add_argument('-n', dest = 'nObjects', help = 'number of objects to analyse', type = int) # TODO analyze only -parser.add_argument('--prediction-method', dest = 'predictionMethod', help = 'prediction method (constant velocity (cvd: vector computation (approximate); cve: equation solving; cv: discrete time (approximate)), normal adaptation, point set prediction)', choices = ['cvd', 'cve', 'cv', 'na', 'ps']) +parser.add_argument('--prediction-method', dest = 'predictionMethod', help = 'prediction method (constant velocity (cvd: vector computation (approximate); cve: equation solving; cv: discrete time (approximate)), normal adaptation, point set prediction)', choices = ['cvd', 'cve', 'cv', 'na', 'ps', 'proto']) +parser.add_argument('--cfg', dest = 'prototypeDatabaseFilename', help = 'name of the database containing the prototypes') parser.add_argument('--pet', dest = 'computePET', help = 'computes PET', action = 'store_true') parser.add_argument('--display-cp', dest = 'displayCollisionPoints', help = 'display collision points', action = 'store_true') parser.add_argument('--nthreads', dest = 'nProcesses', help = 'number of processes to run in parallel', type = int, default = 1) @@ -47,6 +48,10 @@ params.useFeaturesForPrediction) elif predictionMethod == 'ps': predictionParameters = prediction.PointSetPredictionParameters(params.maxPredictedSpeed) +elif predictionMethod == 'proto': + prototypes = storage.loadPrototypesFromSqlite(args.prototypeDatabaseFilename) + for p in prototypes: + p.getMovingObject().getPositions().computeCumulativeDistances() # no else required, since parameters is required as argument # evasiveActionPredictionParameters = prediction.EvasiveActionPredictionParameters(params.maxPredictedSpeed,