Mercurial Hosting > traffic-intelligence
view python/prediction.py @ 398:3399bd48cb40
Ajout d'une méthode pour obtenir le nombre de FPS
Méthode de capture des trames vidéos plus résistante aux erreur
Utilisation d'un dictionnaire pour les fichier de configuration afin de garder le nom des sections
author | Jean-Philippe Jodoin <jpjodoin@gmail.com> |
---|---|
date | Mon, 29 Jul 2013 13:46:07 -0400 |
parents | 91679eb2ff2c |
children | 55b424d98b68 |
line wrap: on
line source
#! /usr/bin/env python '''Library for motion prediction methods''' import moving import math import random class PredictedTrajectory: '''Class for predicted trajectories with lazy evaluation if the predicted position has not been already computed, compute it it should also have a probability''' def __init__(self): self.probability = 0. self.predictedPositions = {} self.predictedSpeedOrientations = {} #self.collisionPoints = {} #self.crossingZones = {} def predictPosition(self, nTimeSteps): if nTimeSteps > 0 and not nTimeSteps in self.predictedPositions.keys(): self.predictPosition(nTimeSteps-1) self.predictedPositions[nTimeSteps], self.predictedSpeedOrientations[nTimeSteps] = moving.predictPosition(self.predictedPositions[nTimeSteps-1], self.predictedSpeedOrientations[nTimeSteps-1], self.getControl(), self.maxSpeed) return self.predictedPositions[nTimeSteps] def getPredictedTrajectory(self): return moving.Trajectory.fromPointList(self.predictedPositions.values()) def getPredictedSpeeds(self): return [so.norm for so in self.predictedSpeedOrientations.values()] def draw(self, options = '', withOrigin = False, timeStep = 1, **kwargs): self.getPredictedTrajectory().draw(options, withOrigin, timeStep, **kwargs) class PredictedTrajectoryConstant(PredictedTrajectory): '''Predicted trajectory at constant speed or acceleration TODO generalize by passing a series of velocities/accelerations''' def __init__(self, initialPosition, initialVelocity, control = moving.NormAngle(0,0), probability = 1., maxSpeed = None): self.control = control self.maxSpeed = maxSpeed self.probability = probability self.predictedPositions = {0: initialPosition} self.predictedSpeedOrientations = {0: moving.NormAngle.fromPoint(initialVelocity)} def getControl(self): return self.control class PredictedTrajectoryNormalAdaptation(PredictedTrajectory): '''Random small adaptation of vehicle control ''' def __init__(self, initialPosition, initialVelocity, accelerationDistribution, steeringDistribution, probability = 1., maxSpeed = None): '''Constructor accelerationDistribution and steeringDistribution are distributions that return random numbers drawn from them''' self.accelerationDistribution = accelerationDistribution self.steeringDistribution = steeringDistribution self.maxSpeed = maxSpeed self.probability = probability self.predictedPositions = {0: initialPosition} self.predictedSpeedOrientations = {0: moving.NormAngle.fromPoint(initialVelocity)} def getControl(self): return moving.NormAngle(self.accelerationDistribution(),self.steeringDistribution()) class SafetyPoint(moving.Point): '''Can represent a collision point or crossing zone with respective safety indicator, TTC or pPET''' def __init__(self, p, probability = 1., indicator = -1): self.x = p.x self.y = p.y self.probability = probability self.indicator = indicator def __str__(self): return '{0} {1} {2} {3}'.format(self.x, self.y, self.probability, self.indicator) @staticmethod def save(out, points, predictionInstant, objNum1, objNum2): for p in points: out.write('{0} {1} {2} {3}\n'.format(objNum1, objNum2, predictionInstant, p)) @staticmethod def computeExpectedIndicator(points): from numpy import sum return sum([p.indicator*p.probability for p in points])/sum([p.probability for p in points]) def computeCollisionTime(predictedTrajectory1, predictedTrajectory2, collisionDistanceThreshold, timeHorizon): '''Computes the first instant at which two predicted trajectories are within some distance threshold''' t = 1 p1 = predictedTrajectory1.predictPosition(t) p2 = predictedTrajectory2.predictPosition(t) while t <= timeHorizon and (p1-p2).norm2() > collisionDistanceThreshold: p1 = predictedTrajectory1.predictPosition(t) p2 = predictedTrajectory2.predictPosition(t) t += 1 return t, p1, p2 class PredictionParameters: def __init__(self, name, maxSpeed): self.name = name self.maxSpeed = maxSpeed def __str__(self): return '{0} {1}'.format(self.name, self.maxSpeed) def generatePredictedTrajectories(self, obj, instant): return [] def computeCrossingsCollisionsAtInstant(self, currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False): '''returns the lists of collision points and crossing zones''' predictedTrajectories1 = self.generatePredictedTrajectories(obj1, currentInstant) predictedTrajectories2 = self.generatePredictedTrajectories(obj2, currentInstant) collisionPoints = [] crossingZones = [] for et1 in predictedTrajectories1: for et2 in predictedTrajectories2: t, p1, p2 = computeCollisionTime(et1, et2, collisionDistanceThreshold, timeHorizon) if t <= timeHorizon: collisionPoints.append(SafetyPoint((p1+p2).multiply(0.5), et1.probability*et2.probability, t)) elif computeCZ: # check if there is a crossing zone # TODO? zone should be around the points at which the traj are the closest # look for CZ at different times, otherwise it would be a collision # an approximation would be to look for close points at different times, ie the complementary of collision points cz = None t1 = 0 while not cz and t1 < timeHorizon: # t1 <= timeHorizon-1 t2 = 0 while not cz and t2 < timeHorizon: #if (et1.predictPosition(t1)-et2.predictPosition(t2)).norm2() < collisionDistanceThreshold: # cz = (et1.predictPosition(t1)+et2.predictPosition(t2)).multiply(0.5) cz = moving.segmentIntersection(et1.predictPosition(t1), et1.predictPosition(t1+1), et2.predictPosition(t2), et2.predictPosition(t2+1)) if cz: crossingZones.append(SafetyPoint(cz, et1.probability*et2.probability, abs(t1-t2))) t2 += 1 t1 += 1 if debug: from matplotlib.pyplot import figure, axis, title figure() for et in predictedTrajectories1: et.predictPosition(timeHorizon) et.draw('rx') for et in predictedTrajectories2: et.predictPosition(timeHorizon) et.draw('bx') obj1.draw('r') obj2.draw('b') title('instant {0}'.format(currentInstant)) axis('equal') return collisionPoints, crossingZones def computeCrossingsCollisions(self, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None): '''Computes all crossing and collision points at each common instant for two road users. ''' collisionPoints={} crossingZones={} if timeInterval: commonTimeInterval = timeInterval else: commonTimeInterval = obj1.commonTimeInterval(obj2) for i in list(commonTimeInterval)[:-1]: # do not look at the 1 last position/velocities, often with errors collisionPoints[i], crossingZones[i] = self.computeCrossingsCollisionsAtInstant(i, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ, debug) return collisionPoints, crossingZones def computeCollisionProbability(self, obj1, obj2, collisionDistanceThreshold, timeHorizon, debug = False, timeInterval = None): '''Computes only collision probabilities Returns for each instant the collision probability and number of samples drawn''' collisionProbabilities = {} if timeInterval: commonTimeInterval = timeInterval else: commonTimeInterval = obj1.commonTimeInterval(obj2) for i in list(commonTimeInterval)[:-1]: nCollisions = 0 print(obj1.num, obj2.num, i) predictedTrajectories1 = self.generatePredictedTrajectories(obj1, i) predictedTrajectories2 = self.generatePredictedTrajectories(obj2, i) for et1 in predictedTrajectories1: for et2 in predictedTrajectories2: t, p1, p2 = computeCollisionTime(et1, et2, collisionDistanceThreshold, timeHorizon) if t <= timeHorizon: nCollisions += 1 # take into account probabilities ?? nSamples = float(len(predictedTrajectories1)*len(predictedTrajectories2)) collisionProbabilities[i] = [nSamples, float(nCollisions)/nSamples] if debug: from matplotlib.pyplot import figure, axis, title figure() for et in predictedTrajectories1: et.predictPosition(timeHorizon) et.draw('rx') for et in predictedTrajectories2: et.predictPosition(timeHorizon) et.draw('bx') obj1.draw('r') obj2.draw('b') title('instant {0}'.format(i)) axis('equal') return collisionProbabilities class ConstantPredictionParameters(PredictionParameters): def __init__(self, maxSpeed): PredictionParameters.__init__(self, 'constant velocity', maxSpeed) def generatePredictedTrajectories(self, obj, instant): return [PredictedTrajectoryConstant(obj.getPositionAtInstant(instant), obj.getVelocityAtInstant(instant), maxSpeed = self.maxSpeed)] class NormalAdaptationPredictionParameters(PredictionParameters): def __init__(self, maxSpeed, nPredictedTrajectories, maxAcceleration, maxSteering, useFeatures = False): if useFeatures: name = 'point set normal adaptation' else: name = 'normal adaptation' PredictionParameters.__init__(self, name, maxSpeed) self.nPredictedTrajectories = nPredictedTrajectories self.maxAcceleration = maxAcceleration self.maxSteering = maxSteering self.useFeatures = useFeatures self.accelerationDistribution = lambda: random.triangular(-self.maxAcceleration, self.maxAcceleration, 0.) self.steeringDistribution = lambda: random.triangular(-self.maxSteering, self.maxSteering, 0.) def __str__(self): return PredictionParameters.__str__(self)+' {0} {1} {2}'.format(self.nPredictedTrajectories, self.maxAcceleration, self.maxSteering) def generatePredictedTrajectories(self, obj, instant): predictedTrajectories = [] if self.useFeatures: features = [f for f in obj.features if f.existsAtInstant(instant)] positions = [f.getPositionAtInstant(instant) for f in features] velocities = [f.getVelocityAtInstant(instant) for f in features] else: positions = [obj.getPositionAtInstant(instant)] velocities = [obj.getVelocityAtInstant(instant)] for i in xrange(self.nPredictedTrajectories): for initialPosition,initialVelocity in zip(positions, velocities): predictedTrajectories.append(PredictedTrajectoryNormalAdaptation(initialPosition, initialVelocity, self.accelerationDistribution, self.steeringDistribution, maxSpeed = self.maxSpeed)) return predictedTrajectories class PointSetPredictionParameters(PredictionParameters): # todo generate several trajectories with normal adaptatoins from each position (feature) def __init__(self, nPredictedTrajectories, maxSpeed): PredictionParameters.__init__(self, 'point set', maxSpeed) self.nPredictedTrajectories = nPredictedTrajectories def generatePredictedTrajectories(self, obj, instant): predictedTrajectories = [] features = [f for f in obj.features if f.existsAtInstant(instant)] positions = [f.getPositionAtInstant(instant) for f in features] velocities = [f.getVelocityAtInstant(instant) for f in features] for i in xrange(self.nPredictedTrajectories): for initialPosition,initialVelocity in zip(positions, velocities): predictedTrajectories.append(PredictedTrajectoryConstant(initialPosition, initialVelocity, maxSpeed = self.maxSpeed)) return predictedTrajectories class EvasiveActionPredictionParameters(PredictionParameters): def __init__(self, maxSpeed, nPredictedTrajectories, minAcceleration, maxAcceleration, maxSteering, useFeatures = False): if useFeatures: name = 'point set evasive action' else: name = 'evasive action' PredictionParameters.__init__(self, name, maxSpeed) self.nPredictedTrajectories = nPredictedTrajectories self.minAcceleration = minAcceleration self.maxAcceleration = maxAcceleration self.maxSteering = maxSteering self.useFeatures = useFeatures self.accelerationDistribution = lambda: random.triangular(self.minAcceleration, self.maxAcceleration, 0.) self.steeringDistribution = lambda: random.triangular(-self.maxSteering, self.maxSteering, 0.) def __str__(self): return PredictionParameters.__str__(self)+' {0} {1} {2} {3}'.format(self.nPredictedTrajectories, self.minAcceleration, self.maxAcceleration, self.maxSteering) def generatePredictedTrajectories(self, obj, instant): predictedTrajectories = [] if self.useFeatures: features = [f for f in obj.features if f.existsAtInstant(instant)] positions = [f.getPositionAtInstant(instant) for f in features] velocities = [f.getVelocityAtInstant(instant) for f in features] else: positions = [obj.getPositionAtInstant(instant)] velocities = [obj.getVelocityAtInstant(instant)] for i in xrange(self.nPredictedTrajectories): for initialPosition,initialVelocity in zip(positions, velocities): predictedTrajectories.append(PredictedTrajectoryConstant(initialPosition, initialVelocity, moving.NormAngle(self.accelerationDistribution(), self.steeringDistribution()), maxSpeed = self.maxSpeed)) return predictedTrajectories class CVDirectPredictionParameters(PredictionParameters): '''Prediction parameters of prediction at constant velocity using direct computation of the intersecting point''' def __init__(self): PredictionParameters.__init__(self, 'constant velocity (direct computation)', None) def computeCrossingsCollisionsAtInstant(self, currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False): collisionPoints = [] crossingZones = [] p1 = obj1.getPositionAtInstant(currentInstant) p2 = obj2.getPositionAtInstant(currentInstant) if (p1-p2).norm2() <= collisionDistanceThreshold: collisionPoints = [SafetyPoint((p1+p1).multiply(0.5), 1., 0.)] else: v1 = obj1.getVelocityAtInstant(currentInstant) v2 = obj2.getVelocityAtInstant(currentInstant) intersection = moving.intersection(p1, p2, v1, v2) if intersection != None: dp1 = intersection-p1 dp2 = intersection-p2 if moving.Point.dot(dp1, v1) > 0 and moving.Point.dot(dp2, v2) > 0: # if the road users are moving towards the intersection dist1 = dp1.norm2() dist2 = dp2.norm2() s1 = v1.norm2() s2 = v2.norm2() halfCollisionDistanceThreshold = collisionDistanceThreshold/2. timeInterval1 = moving.TimeInterval(max(0,dist1-halfCollisionDistanceThreshold)/s1, (dist1+halfCollisionDistanceThreshold)/s1) timeInterval2 = moving.TimeInterval(max(0,dist2-halfCollisionDistanceThreshold)/s2, (dist2+halfCollisionDistanceThreshold)/s2) collisionTimeInterval = moving.TimeInterval.intersection(timeInterval1, timeInterval2) if computeCZ and collisionTimeInterval.empty(): crossingZones = [SafetyPoint(intersection, 1., timeInterval1.distance(timeInterval2))] else: collisionPoints = [SafetyPoint(intersection, 1., collisionTimeInterval.center())] if debug and intersection!= None: from matplotlib.pyplot import plot, figure, axis, title figure() plot([p1.x, intersection.x], [p1.y, intersection.y], 'r') plot([p2.x, intersection.x], [p2.y, intersection.y], 'b') intersection.draw() obj1.draw('r') obj2.draw('b') title('instant {0}'.format(currentInstant)) axis('equal') return collisionPoints, crossingZones #### # Other Methods #### if __name__ == "__main__": import doctest import unittest suite = doctest.DocFileSuite('tests/prediction.txt') #suite = doctest.DocTestSuite() unittest.TextTestRunner().run(suite) #doctest.testmod() #doctest.testfile("example.txt")