Mercurial Hosting > traffic-intelligence
changeset 604:8ba4b8ad4c86
Motion Pattern Method
author | MohamedGomaa |
---|---|
date | Tue, 15 Jul 2014 13:25:15 -0400 |
parents | e6ab4caf359c |
children | e7f6ca76b7db |
files | python/events.py python/prediction.py |
diffstat | 2 files changed, 109 insertions(+), 16 deletions(-) [+] |
line wrap: on
line diff
--- a/python/events.py Tue Jul 08 15:22:30 2014 -0400 +++ b/python/events.py Tue Jul 15 13:25:15 2014 -0400 @@ -1,7 +1,7 @@ #! /usr/bin/env python '''Libraries for events Interactions, pedestrian crossing...''' - +import sys import numpy as np from numpy import arccos @@ -141,6 +141,46 @@ pPETs[i] = prediction.SafetyPoint.computeExpectedIndicator(self.crossingZones[i]) self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[9], pPETs)) + def computeCrossingsCollisionsPrototype(self, predictionParameters, collisionDistanceThreshold, timeHorizon, prototypes,nMatching,objects,minSimilarity=0.5,index=None,mostMatched=None, computeCZ = False, debug = False, timeInterval = None,acceptPartialLength=30): + '''Computes all crossing and collision points at each common instant for two road users. ''' + self.collisionPoints={} + self.crossingZones={} + TTCs = {} + route1=(self.roadUser1.startRouteID,self.roadUser1.endRouteID) + route2=(self.roadUser2.startRouteID,self.roadUser2.endRouteID) + + if timeInterval: + commonTimeInterval = timeInterval + else: + commonTimeInterval = self.timeInterval + for i in list(commonTimeInterval)[:-1]: # do not look at the 1 last position/velocities, often with errors + sys.path.append("D:/behaviourAnalysis/libs") + import trajLearning + if i-self.roadUser1.timeInterval.first >= acceptPartialLength and i-self.roadUser2.timeInterval.first >= acceptPartialLength: + inter1=moving.Interval(self.roadUser1.timeInterval.first,i) + inter2=moving.Interval(self.roadUser2.timeInterval.first,i) + partialObjPositions1= self.roadUser1.getObjectInTimeInterval(inter1).positions + partialObjPositions2= self.roadUser2.getObjectInTimeInterval(inter2).positions + prototypeTrajectories1=trajLearning.findPrototypes(prototypes,nMatching,objects,route1,partialObjPositions1,minSimilarity,index,mostMatched) + prototypeTrajectories2=trajLearning.findPrototypes(prototypes,nMatching,objects,route2,partialObjPositions2,minSimilarity,index,mostMatched) + if prototypeTrajectories1=={}: + print self.roadUser1.num, 'is abnormal at instant', str(i) + elif prototypeTrajectories2=={}: + print self.roadUser2.num, 'is abnormal at instant', str(i) + else: + self.collisionPoints[i], self.crossingZones[i] = predictionParameters.computeCrossingsCollisionsAtInstant(i, self.roadUser1, self.roadUser2, collisionDistanceThreshold, timeHorizon, computeCZ, debug,prototypeTrajectories1,prototypeTrajectories2) + if len(self.collisionPoints[i]) > 0: + TTCs[i] = prediction.SafetyPoint.computeExpectedIndicator(self.collisionPoints[i]) + # add probability of collision, and probability of successful evasive action + self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[7], TTCs)) + + if computeCZ: + pPETs = {} + for i in list(commonTimeInterval)[:-1]: + if i in self.crossingZones.keys() and len(self.crossingZones[i]) > 0: + pPETs[i] = prediction.SafetyPoint.computeExpectedIndicator(self.crossingZones[i]) + self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[9], pPETs)) + def addVideoFilename(self,videoFilename): self.videoFilename= videoFilename
--- a/python/prediction.py Tue Jul 08 15:22:30 2014 -0400 +++ b/python/prediction.py Tue Jul 15 13:25:15 2014 -0400 @@ -4,6 +4,7 @@ import moving import math import random +import numpy as np class PredictedTrajectory: '''Class for predicted trajectories with lazy evaluation @@ -47,6 +48,20 @@ def getControl(self): return self.control +def findNearestOrientation(initialPosition,prototypeTrajectory): + distances=[] + for p in prototypeTrajectory.positions: + distances.append(moving.Point.distanceNorm2(initialPosition, p)) + t=np.argmin(distances) + return moving.NormAngle.fromPoint(prototypeTrajectory.velocities[t]).angle +#todo: merge with previous function +def findNearestInstant(initialPosition,prototypeTrajectory): + distances=[] + for p in prototypeTrajectory.positions: + distances.append(moving.Point.distanceNorm2(initialPosition, p)) + t=np.argmin(distances) + return t + class PredictedTrajectoryPrototype(PredictedTrajectory): '''Predicted trajectory that follows a prototype trajectory The prototype is in the format of a moving.Trajectory: it could be @@ -59,20 +74,39 @@ (applying a constant ratio equal to the ratio of the user instantaneous speed and the trajectory closest speed)''' - def __init__(self, initialPosition, initialVelocity, prototypeTrajectory, constantSpeed = True, probability = 1.): + def __init__(self, initialPosition, initialVelocity, prototypeTrajectory, constantSpeed = True, speedList=[], probability = 1.): self.prototypeTrajectory = prototypeTrajectory self.constantSpeed = constantSpeed + self.speedList = speedList self.probability = probability self.predictedPositions = {0: initialPosition} - self.predictedSpeedOrientations = {0: moving.NormAngle.fromPoint(initialVelocity)} + self.predictedSpeedOrientations = {0: moving.NormAngle(moving.NormAngle.fromPoint(initialVelocity).norm, findNearestOrientation(initialPosition,prototypeTrajectory))}#moving.NormAngle.fromPoint(initialVelocity)} def predictPosition(self, nTimeSteps): if nTimeSteps > 0 and not nTimeSteps in self.predictedPositions.keys(): - if constantSpeed: + if self.constantSpeed: # calculate cumulative distance - pass + speedNorm= self.predictedSpeedOrientations[0].norm #moving.NormAngle.fromPoint(initialVelocity).norm + anglePrototype = findNearestOrientation(self.predictedPositions[nTimeSteps-1],self.prototypeTrajectory) + self.predictedSpeedOrientations[nTimeSteps]= moving.NormAngle(speedNorm, anglePrototype) + self.predictedPositions[nTimeSteps],tmp= moving.predictPosition(self.predictedPositions[nTimeSteps-1], self.predictedSpeedOrientations[nTimeSteps-1], moving.NormAngle(0,0), None) + #pass + elif self.speedList!=[]: + pass else: # see c++ code, calculate ratio - pass + speedNorm= self.predictedSpeedOrientations[0].norm + instant=findNearestInstant(self.predictedPositions[0],self.prototypeTrajectory) + prototypeSpeeds= self.prototypeTrajectory.getSpeeds()[instant:] + ratio=float(speedNorm)/prototypeSpeeds[0] + resampledSpeeds=[sp*ratio for sp in prototypeSpeeds] + anglePrototype = findNearestOrientation(self.predictedPositions[nTimeSteps-1],self.prototypeTrajectory) + if nTimeSteps<len(resampledSpeeds): + self.predictedSpeedOrientations[nTimeSteps]= moving.NormAngle(resampledSpeeds[nTimeSteps], anglePrototype) + self.predictedPositions[nTimeSteps],tmp= moving.predictPosition(self.predictedPositions[nTimeSteps-1], self.predictedSpeedOrientations[nTimeSteps-1], moving.NormAngle(0,0), None) + else: + self.predictedSpeedOrientations[nTimeSteps]= moving.NormAngle(resampledSpeeds[-1], anglePrototype) + self.predictedPositions[nTimeSteps],tmp= moving.predictPosition(self.predictedPositions[nTimeSteps-1], self.predictedSpeedOrientations[nTimeSteps-1], moving.NormAngle(0,0), None) + # pass return self.predictedPositions[nTimeSteps] class PredictedTrajectoryRandomControl(PredictedTrajectory): @@ -135,10 +169,14 @@ def generatePredictedTrajectories(self, obj, instant): return [] - def computeCrossingsCollisionsAtInstant(self, currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False): + def computeCrossingsCollisionsAtInstant(self, currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False,prototypeTrajectories1=None,prototypeTrajectories2=None): '''returns the lists of collision points and crossing zones''' - predictedTrajectories1 = self.generatePredictedTrajectories(obj1, currentInstant) - predictedTrajectories2 = self.generatePredictedTrajectories(obj2, currentInstant) + if prototypeTrajectories1==None: + predictedTrajectories1 = self.generatePredictedTrajectories(obj1, currentInstant) + predictedTrajectories2 = self.generatePredictedTrajectories(obj2, currentInstant) + else: + predictedTrajectories1 = self.generatePredictedTrajectories(obj1, currentInstant,prototypeTrajectories1) + predictedTrajectories2 = self.generatePredictedTrajectories(obj2, currentInstant,prototypeTrajectories2) collisionPoints = [] crossingZones = [] @@ -161,7 +199,8 @@ # cz = (et1.predictPosition(t1)+et2.predictPosition(t2)).multiply(0.5) cz = moving.segmentIntersection(et1.predictPosition(t1), et1.predictPosition(t1+1), et2.predictPosition(t2), et2.predictPosition(t2+1)) if cz: - crossingZones.append(SafetyPoint(cz, et1.probability*et2.probability, abs(t1-t2))) + deltaV= (et1.predictPosition(t1)- et1.predictPosition(t1+1) - et2.predictPosition(t2)+ et2.predictPosition(t2+1)).norm2() + crossingZones.append(SafetyPoint(cz, et1.probability*et2.probability, abs(t1-t2)-(float(collisionDistanceThreshold)/deltaV))) t2 += 1 t1 += 1 @@ -182,7 +221,7 @@ return collisionPoints, crossingZones - def computeCrossingsCollisions(self, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None): + def computeCrossingsCollisions(self, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None,prototypeTrajectories1=None,prototypeTrajectories2=None): '''Computes all crossing and collision points at each common instant for two road users. ''' collisionPoints={} crossingZones={} @@ -191,7 +230,7 @@ else: commonTimeInterval = obj1.commonTimeInterval(obj2) for i in list(commonTimeInterval)[:-1]: # do not look at the 1 last position/velocities, often with errors - collisionPoints[i], crossingZones[i] = self.computeCrossingsCollisionsAtInstant(i, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ, debug) + collisionPoints[i], crossingZones[i] = self.computeCrossingsCollisionsAtInstant(i, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ, debug,prototypeTrajectories1,prototypeTrajectories2) return collisionPoints, crossingZones @@ -412,10 +451,24 @@ #### # Other Methods #### - - - - +class prototypePredictionParameters(PredictionParameters): + def __init__(self, maxSpeed, nPredictedTrajectories,constantSpeed = True, speedList=[]): + #TODO speed profiles integration + name = 'prototype' + PredictionParameters.__init__(self, name, maxSpeed) + self.nPredictedTrajectories = nPredictedTrajectories + #self.prototypeTrajectory = prototypeTrajectory + self.constantSpeed = constantSpeed + self.speedList = speedList + #self.probability=probability + + def generatePredictedTrajectories(self, obj, instant,prototypeTrajectories): + predictedTrajectories = [] + initialPosition = obj.getPositionAtInstant(instant) + initialVelocity = obj.getVelocityAtInstant(instant) + for prototypeTraj in prototypeTrajectories.keys(): + predictedTrajectories.append(PredictedTrajectoryPrototype(initialPosition, initialVelocity, prototypeTraj, constantSpeed = self.constantSpeed, speedList=self.speedList, probability = prototypeTrajectories[prototypeTraj])) + return predictedTrajectories if __name__ == "__main__": import doctest