Mercurial Hosting > traffic-intelligence
changeset 359:619ae9a9a788
implemented prediction method at constant velocity with direct intersection computation
author | Nicolas Saunier <nicolas.saunier@polymtl.ca> |
---|---|
date | Thu, 11 Jul 2013 02:17:12 -0400 |
parents | c41ff9f3c263 |
children | 450dc0648aaa |
files | python/moving.py python/prediction.py scripts/safety-analysis.py |
diffstat | 3 files changed, 105 insertions(+), 43 deletions(-) [+] |
line wrap: on
line diff
--- a/python/moving.py Thu Jul 11 00:17:25 2013 -0400 +++ b/python/moving.py Thu Jul 11 02:17:12 2013 -0400 @@ -30,6 +30,9 @@ def empty(self): return self.first > self.last + def center(self): + return (self.first+self.last)/2. + def length(self): '''Returns the length of the interval''' return float(max(0,self.last-self.first)) @@ -306,33 +309,39 @@ def similar(f1, f2, maxDistance2, maxDeltavelocity2): return (f1.position-f2.position).norm2Squared()<maxDistance2 and (f1.velocity-f2.velocity).norm2Squared()<maxDeltavelocity2 +def intersection(p1, p2, dp1, dp2): + '''Returns the intersection point between the two lines + defined by the respective vectors (dp) and origin points (p)''' + from numpy import matrix + from numpy.linalg import linalg + A = matrix([[dp1.y, -dp1.x], + [dp2.y, -dp2.x]]) + B = matrix([[dp1.y*p1.x-dp1.x*p1.y], + [dp2.y*p2.x-dp2.x*p2.y]]) + + if linalg.det(A) == 0: + return None + else: + intersection = linalg.solve(A,B) + return Point(intersection[0,0], intersection[1,0]) + def segmentIntersection(p1, p2, p3, p4): '''Returns the intersecting point of the segments [p1, p2] and [p3, p4], None otherwise''' - from numpy import matrix - from numpy.linalg import linalg, det if (Interval.intersection(Interval(p1.x,p2.x,True), Interval(p3.x,p4.x,True)).empty()) or (Interval.intersection(Interval(p1.y,p2.y,True), Interval(p3.y,p4.y,True)).empty()): return None else: - dp1 = p2-p1#[s1[0][1]-s1[0][0], s1[1][1]-s1[1][0]] - dp2 = p4-p3#[s2[0][1]-s2[0][0], s2[1][1]-s2[1][0]] - - A = matrix([[dp1.y, -dp1.x], - [dp2.y, -dp2.x]]) - B = matrix([[dp1.y*p1.x-dp1.x*p1.y], - [dp2.y*p3.x-dp2.x*p3.y]]) - - if linalg.det(A) == 0:#crossProduct(ds1, ds2) == 0: + dp1 = p2-p1 + dp3 = p4-p3 + inter = intersection(p1, p3, dp1, dp3) + if (inter != None + and utils.inBetween(p1.x, p2.x, inter.x) + and utils.inBetween(p3.x, p4.x, inter.x) + and utils.inBetween(p1.y, p2.y, inter.y) + and utils.inBetween(p3.y, p4.y, inter.y)): + return inter + else: return None - else: - intersection = linalg.solve(A,B) - if (utils.inBetween(p1.x, p2.x, intersection[0,0]) - and utils.inBetween(p3.x, p4.x, intersection[0,0]) - and utils.inBetween(p1.y, p2.y, intersection[1,0]) - and utils.inBetween(p3.y, p4.y, intersection[1,0])): - return Point(intersection[0,0], intersection[1,0]) - else: - return None # TODO: implement a better algorithm for intersections of sets of segments http://en.wikipedia.org/wiki/Line_segment_intersection
--- a/python/prediction.py Thu Jul 11 00:17:25 2013 -0400 +++ b/python/prediction.py Thu Jul 11 02:17:12 2013 -0400 @@ -63,6 +63,28 @@ def getControl(self): return moving.NormAngle(self.accelerationDistribution(),self.steeringDistribution()) +class SafetyPoint(moving.Point): + '''Can represent a collision point or crossing zone + with respective safety indicator, TTC or pPET''' + def __init__(self, p, probability = 1., indicator = -1): + self.x = p.x + self.y = p.y + self.probability = probability + self.indicator = indicator + + def __str__(self): + return '{0} {1} {2} {3}'.format(self.x, self.y, self.probability, self.indicator) + + @staticmethod + def save(out, points, predictionInstant, objNum1, objNum2): + for p in points: + out.write('{0} {1} {2} {3}\n'.format(objNum1, objNum2, predictionInstant, p)) + + @staticmethod + def computeExpectedIndicator(points): + from numpy import sum + return sum([p.indicator*p.probability for p in points])/sum([p.probability for p in points]) + def computeCollisionTime(predictedTrajectory1, predictedTrajectory2, collisionDistanceThreshold, timeHorizon): '''Computes the first instant at which two predicted trajectories are within some distance threshold''' t = 1 @@ -86,9 +108,7 @@ return [] def computeCrossingsCollisionsAtInstant(self, currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False): - '''returns the lists of collision points and crossing zones - - Check: Predicting all the points together, as if they represent the whole vehicle''' + '''returns the lists of collision points and crossing zones''' predictedTrajectories1 = self.generatePredictedTrajectories(obj1, currentInstant) predictedTrajectories2 = self.generatePredictedTrajectories(obj2, currentInstant) @@ -288,27 +308,58 @@ maxSpeed = self.maxSpeed)) return predictedTrajectories -class SafetyPoint(moving.Point): - '''Can represent a collision point or crossing zone - with respective safety indicator, TTC or pPET''' - def __init__(self, p, probability = 1., indicator = -1): - self.x = p.x - self.y = p.y - self.probability = probability - self.indicator = indicator + +class CVDirectPredictionParameters(PredictionParameters): + '''Prediction parameters of prediction at constant velocity + using direct computation of the intersecting point''' + + def __init__(self, maxSpeed): + PredictionParameters.__init__(self, 'constant velocity (direct computation)', maxSpeed) + + def computeCrossingsCollisionsAtInstant(self, currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False): + collisionPoints = [] + crossingZones = [] + + p1 = obj1.getPositionAtInstant(currentInstant) + p2 = obj2.getPositionAtInstant(currentInstant) + if (p1-p2).norm2() <= collisionDistanceThreshold: + collisionPoints = [SafetyPoint((p1+p1).multiply(0.5), 1., 0.)] + else: + v1 = obj1.getVelocityAtInstant(currentInstant) + v2 = obj2.getVelocityAtInstant(currentInstant) + intersection = moving.intersection(p1, p2, v1, v2) - def __str__(self): - return '{0} {1} {2} {3}'.format(self.x, self.y, self.probability, self.indicator) + if intersection != None: + dp1 = intersection-p1 + dp2 = intersection-p2 + if moving.Point.dot(dp1, v1) > 0 and moving.Point.dot(dp2, v2) > 0: # if the road users are moving towards the intersection + dist1 = dp1.norm2() + dist2 = dp2.norm2() + s1 = v1.norm2() + s2 = v2.norm2() + halfCollisionDistanceThreshold = collisionDistanceThreshold/2. + timeInterval1 = moving.TimeInterval(max(0,dist1-halfCollisionDistanceThreshold)/s1, (dist1+halfCollisionDistanceThreshold)/s1) + timeInterval2 = moving.TimeInterval(max(0,dist2-halfCollisionDistanceThreshold)/s2, (dist2+halfCollisionDistanceThreshold)/s2) + collisionTimeInterval = moving.TimeInterval.intersection(timeInterval1, timeInterval2) + if not collisionTimeInterval.empty(): + collisionPoints = [SafetyPoint(intersection, 1., collisionTimeInterval.center())] + else: + crossingZones = [SafetyPoint(intersection, 1., timeInterval1.distance(timeInterval2))] + + if debug and intersection!= None: + from matplotlib.pyplot import plot, figure, axis, title + figure() + plot([p1.x, intersection.x], [p1.y, intersection.y], 'r') + plot([p2.x, intersection.x], [p2.y, intersection.y], 'b') + intersection.draw() + obj1.draw('r') + obj2.draw('b') + title('instant {0}'.format(currentInstant)) + axis('equal') - @staticmethod - def save(out, points, predictionInstant, objNum1, objNum2): - for p in points: - out.write('{0} {1} {2} {3}\n'.format(objNum1, objNum2, predictionInstant, p)) + return collisionPoints, crossingZones - @staticmethod - def computeExpectedIndicator(points): - from numpy import sum - return sum([p.indicator*p.probability for p in points])/sum([p.probability for p in points]) + ####
--- a/scripts/safety-analysis.py Thu Jul 11 00:17:25 2013 -0400 +++ b/scripts/safety-analysis.py Thu Jul 11 02:17:12 2013 -0400 @@ -12,7 +12,7 @@ parser = argparse.ArgumentParser(description='The program processes indicators for all pairs of road users in the scene') parser.add_argument('--cfg', dest = 'configFilename', help = 'name of the configuration file') -parser.add_argument('--prediction-method', dest = 'predictionMethod', help = 'prediction method (constant velocity, normal adaptation, point set prediction)', choices = ['cv', 'na', 'ps']) +parser.add_argument('--prediction-method', dest = 'predictionMethod', help = 'prediction method (constant velocity (vector computation), constant velocity, normal adaptation, point set prediction)', choices = ['cvd', 'cv', 'na', 'ps']) parser.add_argument('--display-cp', dest = 'displayCollisionPoints', help = 'display collision points') args = parser.parse_args() @@ -25,7 +25,9 @@ else: predictionMethod = params.predictionMethod -if predictionMethod == 'cv': +if predictionMethod == 'cvd': + predictionParameters = prediction.CVDirectPredictionParameters(params.maxPredictedSpeed) +elif predictionMethod == 'cv': predictionParameters = prediction.ConstantPredictionParameters(params.maxPredictedSpeed) elif predictionMethod == 'na': predictionParameters = prediction.NormalAdaptationPredictionParameters(params.maxPredictedSpeed,