Mercurial Hosting > traffic-intelligence
view python/prediction.py @ 399:c389fae9689a
Added a class to read list of image instead of video. This is controlled by the use of the database-filename and folder-data parameters in the config file.
author | Jean-Philippe Jodoin <jpjodoin@gmail.com> |
---|---|
date | Mon, 29 Jul 2013 17:12:45 -0400 |
parents | 91679eb2ff2c |
children | 55b424d98b68 |
line wrap: on
line source
#! /usr/bin/env python '''Library for motion prediction methods''' import moving import math import random class PredictedTrajectory: '''Class for predicted trajectories with lazy evaluation if the predicted position has not been already computed, compute it it should also have a probability''' def __init__(self): self.probability = 0. self.predictedPositions = {} self.predictedSpeedOrientations = {} #self.collisionPoints = {} #self.crossingZones = {} def predictPosition(self, nTimeSteps): if nTimeSteps > 0 and not nTimeSteps in self.predictedPositions.keys(): self.predictPosition(nTimeSteps-1) self.predictedPositions[nTimeSteps], self.predictedSpeedOrientations[nTimeSteps] = moving.predictPosition(self.predictedPositions[nTimeSteps-1], self.predictedSpeedOrientations[nTimeSteps-1], self.getControl(), self.maxSpeed) return self.predictedPositions[nTimeSteps] def getPredictedTrajectory(self): return moving.Trajectory.fromPointList(self.predictedPositions.values()) def getPredictedSpeeds(self): return [so.norm for so in self.predictedSpeedOrientations.values()] def draw(self, options = '', withOrigin = False, timeStep = 1, **kwargs): self.getPredictedTrajectory().draw(options, withOrigin, timeStep, **kwargs) class PredictedTrajectoryConstant(PredictedTrajectory): '''Predicted trajectory at constant speed or acceleration TODO generalize by passing a series of velocities/accelerations''' def __init__(self, initialPosition, initialVelocity, control = moving.NormAngle(0,0), probability = 1., maxSpeed = None): self.control = control self.maxSpeed = maxSpeed self.probability = probability self.predictedPositions = {0: initialPosition} self.predictedSpeedOrientations = {0: moving.NormAngle.fromPoint(initialVelocity)} def getControl(self): return self.control class PredictedTrajectoryNormalAdaptation(PredictedTrajectory): '''Random small adaptation of vehicle control ''' def __init__(self, initialPosition, initialVelocity, accelerationDistribution, steeringDistribution, probability = 1., maxSpeed = None): '''Constructor accelerationDistribution and steeringDistribution are distributions that return random numbers drawn from them''' self.accelerationDistribution = accelerationDistribution self.steeringDistribution = steeringDistribution self.maxSpeed = maxSpeed self.probability = probability self.predictedPositions = {0: initialPosition} self.predictedSpeedOrientations = {0: moving.NormAngle.fromPoint(initialVelocity)} def getControl(self): return moving.NormAngle(self.accelerationDistribution(),self.steeringDistribution()) class SafetyPoint(moving.Point): '''Can represent a collision point or crossing zone with respective safety indicator, TTC or pPET''' def __init__(self, p, probability = 1., indicator = -1): self.x = p.x self.y = p.y self.probability = probability self.indicator = indicator def __str__(self): return '{0} {1} {2} {3}'.format(self.x, self.y, self.probability, self.indicator) @staticmethod def save(out, points, predictionInstant, objNum1, objNum2): for p in points: out.write('{0} {1} {2} {3}\n'.format(objNum1, objNum2, predictionInstant, p)) @staticmethod def computeExpectedIndicator(points): from numpy import sum return sum([p.indicator*p.probability for p in points])/sum([p.probability for p in points]) def computeCollisionTime(predictedTrajectory1, predictedTrajectory2, collisionDistanceThreshold, timeHorizon): '''Computes the first instant at which two predicted trajectories are within some distance threshold''' t = 1 p1 = predictedTrajectory1.predictPosition(t) p2 = predictedTrajectory2.predictPosition(t) while t <= timeHorizon and (p1-p2).norm2() > collisionDistanceThreshold: p1 = predictedTrajectory1.predictPosition(t) p2 = predictedTrajectory2.predictPosition(t) t += 1 return t, p1, p2 class PredictionParameters: def __init__(self, name, maxSpeed): self.name = name self.maxSpeed = maxSpeed def __str__(self): return '{0} {1}'.format(self.name, self.maxSpeed) def generatePredictedTrajectories(self, obj, instant): return [] def computeCrossingsCollisionsAtInstant(self, currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False): '''returns the lists of collision points and crossing zones''' predictedTrajectories1 = self.generatePredictedTrajectories(obj1, currentInstant) predictedTrajectories2 = self.generatePredictedTrajectories(obj2, currentInstant) collisionPoints = [] crossingZones = [] for et1 in predictedTrajectories1: for et2 in predictedTrajectories2: t, p1, p2 = computeCollisionTime(et1, et2, collisionDistanceThreshold, timeHorizon) if t <= timeHorizon: collisionPoints.append(SafetyPoint((p1+p2).multiply(0.5), et1.probability*et2.probability, t)) elif computeCZ: # check if there is a crossing zone # TODO? zone should be around the points at which the traj are the closest # look for CZ at different times, otherwise it would be a collision # an approximation would be to look for close points at different times, ie the complementary of collision points cz = None t1 = 0 while not cz and t1 < timeHorizon: # t1 <= timeHorizon-1 t2 = 0 while not cz and t2 < timeHorizon: #if (et1.predictPosition(t1)-et2.predictPosition(t2)).norm2() < collisionDistanceThreshold: # cz = (et1.predictPosition(t1)+et2.predictPosition(t2)).multiply(0.5) cz = moving.segmentIntersection(et1.predictPosition(t1), et1.predictPosition(t1+1), et2.predictPosition(t2), et2.predictPosition(t2+1)) if cz: crossingZones.append(SafetyPoint(cz, et1.probability*et2.probability, abs(t1-t2))) t2 += 1 t1 += 1 if debug: from matplotlib.pyplot import figure, axis, title figure() for et in predictedTrajectories1: et.predictPosition(timeHorizon) et.draw('rx') for et in predictedTrajectories2: et.predictPosition(timeHorizon) et.draw('bx') obj1.draw('r') obj2.draw('b') title('instant {0}'.format(currentInstant)) axis('equal') return collisionPoints, crossingZones def computeCrossingsCollisions(self, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None): '''Computes all crossing and collision points at each common instant for two road users. ''' collisionPoints={} crossingZones={} if timeInterval: commonTimeInterval = timeInterval else: commonTimeInterval = obj1.commonTimeInterval(obj2) for i in list(commonTimeInterval)[:-1]: # do not look at the 1 last position/velocities, often with errors collisionPoints[i], crossingZones[i] = self.computeCrossingsCollisionsAtInstant(i, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ, debug) return collisionPoints, crossingZones def computeCollisionProbability(self, obj1, obj2, collisionDistanceThreshold, timeHorizon, debug = False, timeInterval = None): '''Computes only collision probabilities Returns for each instant the collision probability and number of samples drawn''' collisionProbabilities = {} if timeInterval: commonTimeInterval = timeInterval else: commonTimeInterval = obj1.commonTimeInterval(obj2) for i in list(commonTimeInterval)[:-1]: nCollisions = 0 print(obj1.num, obj2.num, i) predictedTrajectories1 = self.generatePredictedTrajectories(obj1, i) predictedTrajectories2 = self.generatePredictedTrajectories(obj2, i) for et1 in predictedTrajectories1: for et2 in predictedTrajectories2: t, p1, p2 = computeCollisionTime(et1, et2, collisionDistanceThreshold, timeHorizon) if t <= timeHorizon: nCollisions += 1 # take into account probabilities ?? nSamples = float(len(predictedTrajectories1)*len(predictedTrajectories2)) collisionProbabilities[i] = [nSamples, float(nCollisions)/nSamples] if debug: from matplotlib.pyplot import figure, axis, title figure() for et in predictedTrajectories1: et.predictPosition(timeHorizon) et.draw('rx') for et in predictedTrajectories2: et.predictPosition(timeHorizon) et.draw('bx') obj1.draw('r') obj2.draw('b') title('instant {0}'.format(i)) axis('equal') return collisionProbabilities class ConstantPredictionParameters(PredictionParameters): def __init__(self, maxSpeed): PredictionParameters.__init__(self, 'constant velocity', maxSpeed) def generatePredictedTrajectories(self, obj, instant): return [PredictedTrajectoryConstant(obj.getPositionAtInstant(instant), obj.getVelocityAtInstant(instant), maxSpeed = self.maxSpeed)] class NormalAdaptationPredictionParameters(PredictionParameters): def __init__(self, maxSpeed, nPredictedTrajectories, maxAcceleration, maxSteering, useFeatures = False): if useFeatures: name = 'point set normal adaptation' else: name = 'normal adaptation' PredictionParameters.__init__(self, name, maxSpeed) self.nPredictedTrajectories = nPredictedTrajectories self.maxAcceleration = maxAcceleration self.maxSteering = maxSteering self.useFeatures = useFeatures self.accelerationDistribution = lambda: random.triangular(-self.maxAcceleration, self.maxAcceleration, 0.) self.steeringDistribution = lambda: random.triangular(-self.maxSteering, self.maxSteering, 0.) def __str__(self): return PredictionParameters.__str__(self)+' {0} {1} {2}'.format(self.nPredictedTrajectories, self.maxAcceleration, self.maxSteering) def generatePredictedTrajectories(self, obj, instant): predictedTrajectories = [] if self.useFeatures: features = [f for f in obj.features if f.existsAtInstant(instant)] positions = [f.getPositionAtInstant(instant) for f in features] velocities = [f.getVelocityAtInstant(instant) for f in features] else: positions = [obj.getPositionAtInstant(instant)] velocities = [obj.getVelocityAtInstant(instant)] for i in xrange(self.nPredictedTrajectories): for initialPosition,initialVelocity in zip(positions, velocities): predictedTrajectories.append(PredictedTrajectoryNormalAdaptation(initialPosition, initialVelocity, self.accelerationDistribution, self.steeringDistribution, maxSpeed = self.maxSpeed)) return predictedTrajectories class PointSetPredictionParameters(PredictionParameters): # todo generate several trajectories with normal adaptatoins from each position (feature) def __init__(self, nPredictedTrajectories, maxSpeed): PredictionParameters.__init__(self, 'point set', maxSpeed) self.nPredictedTrajectories = nPredictedTrajectories def generatePredictedTrajectories(self, obj, instant): predictedTrajectories = [] features = [f for f in obj.features if f.existsAtInstant(instant)] positions = [f.getPositionAtInstant(instant) for f in features] velocities = [f.getVelocityAtInstant(instant) for f in features] for i in xrange(self.nPredictedTrajectories): for initialPosition,initialVelocity in zip(positions, velocities): predictedTrajectories.append(PredictedTrajectoryConstant(initialPosition, initialVelocity, maxSpeed = self.maxSpeed)) return predictedTrajectories class EvasiveActionPredictionParameters(PredictionParameters): def __init__(self, maxSpeed, nPredictedTrajectories, minAcceleration, maxAcceleration, maxSteering, useFeatures = False): if useFeatures: name = 'point set evasive action' else: name = 'evasive action' PredictionParameters.__init__(self, name, maxSpeed) self.nPredictedTrajectories = nPredictedTrajectories self.minAcceleration = minAcceleration self.maxAcceleration = maxAcceleration self.maxSteering = maxSteering self.useFeatures = useFeatures self.accelerationDistribution = lambda: random.triangular(self.minAcceleration, self.maxAcceleration, 0.) self.steeringDistribution = lambda: random.triangular(-self.maxSteering, self.maxSteering, 0.) def __str__(self): return PredictionParameters.__str__(self)+' {0} {1} {2} {3}'.format(self.nPredictedTrajectories, self.minAcceleration, self.maxAcceleration, self.maxSteering) def generatePredictedTrajectories(self, obj, instant): predictedTrajectories = [] if self.useFeatures: features = [f for f in obj.features if f.existsAtInstant(instant)] positions = [f.getPositionAtInstant(instant) for f in features] velocities = [f.getVelocityAtInstant(instant) for f in features] else: positions = [obj.getPositionAtInstant(instant)] velocities = [obj.getVelocityAtInstant(instant)] for i in xrange(self.nPredictedTrajectories): for initialPosition,initialVelocity in zip(positions, velocities): predictedTrajectories.append(PredictedTrajectoryConstant(initialPosition, initialVelocity, moving.NormAngle(self.accelerationDistribution(), self.steeringDistribution()), maxSpeed = self.maxSpeed)) return predictedTrajectories class CVDirectPredictionParameters(PredictionParameters): '''Prediction parameters of prediction at constant velocity using direct computation of the intersecting point''' def __init__(self): PredictionParameters.__init__(self, 'constant velocity (direct computation)', None) def computeCrossingsCollisionsAtInstant(self, currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False): collisionPoints = [] crossingZones = [] p1 = obj1.getPositionAtInstant(currentInstant) p2 = obj2.getPositionAtInstant(currentInstant) if (p1-p2).norm2() <= collisionDistanceThreshold: collisionPoints = [SafetyPoint((p1+p1).multiply(0.5), 1., 0.)] else: v1 = obj1.getVelocityAtInstant(currentInstant) v2 = obj2.getVelocityAtInstant(currentInstant) intersection = moving.intersection(p1, p2, v1, v2) if intersection != None: dp1 = intersection-p1 dp2 = intersection-p2 if moving.Point.dot(dp1, v1) > 0 and moving.Point.dot(dp2, v2) > 0: # if the road users are moving towards the intersection dist1 = dp1.norm2() dist2 = dp2.norm2() s1 = v1.norm2() s2 = v2.norm2() halfCollisionDistanceThreshold = collisionDistanceThreshold/2. timeInterval1 = moving.TimeInterval(max(0,dist1-halfCollisionDistanceThreshold)/s1, (dist1+halfCollisionDistanceThreshold)/s1) timeInterval2 = moving.TimeInterval(max(0,dist2-halfCollisionDistanceThreshold)/s2, (dist2+halfCollisionDistanceThreshold)/s2) collisionTimeInterval = moving.TimeInterval.intersection(timeInterval1, timeInterval2) if computeCZ and collisionTimeInterval.empty(): crossingZones = [SafetyPoint(intersection, 1., timeInterval1.distance(timeInterval2))] else: collisionPoints = [SafetyPoint(intersection, 1., collisionTimeInterval.center())] if debug and intersection!= None: from matplotlib.pyplot import plot, figure, axis, title figure() plot([p1.x, intersection.x], [p1.y, intersection.y], 'r') plot([p2.x, intersection.x], [p2.y, intersection.y], 'b') intersection.draw() obj1.draw('r') obj2.draw('b') title('instant {0}'.format(currentInstant)) axis('equal') return collisionPoints, crossingZones #### # Other Methods #### if __name__ == "__main__": import doctest import unittest suite = doctest.DocFileSuite('tests/prediction.txt') #suite = doctest.DocTestSuite() unittest.TextTestRunner().run(suite) #doctest.testmod() #doctest.testfile("example.txt")