changeset 604:8ba4b8ad4c86

Motion Pattern Method
author MohamedGomaa
date Tue, 15 Jul 2014 13:25:15 -0400
parents e6ab4caf359c
children e7f6ca76b7db
files python/events.py python/prediction.py
diffstat 2 files changed, 109 insertions(+), 16 deletions(-) [+]
line wrap: on
line diff
--- a/python/events.py	Tue Jul 08 15:22:30 2014 -0400
+++ b/python/events.py	Tue Jul 15 13:25:15 2014 -0400
@@ -1,7 +1,7 @@
 #! /usr/bin/env python
 '''Libraries for events
 Interactions, pedestrian crossing...'''
-
+import sys
 import numpy as np
 from numpy import arccos
 
@@ -141,6 +141,46 @@
                     pPETs[i] = prediction.SafetyPoint.computeExpectedIndicator(self.crossingZones[i])
             self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[9], pPETs))
 
+    def computeCrossingsCollisionsPrototype(self, predictionParameters, collisionDistanceThreshold, timeHorizon, prototypes,nMatching,objects,minSimilarity=0.5,index=None,mostMatched=None, computeCZ = False, debug = False, timeInterval = None,acceptPartialLength=30):
+        '''Computes all crossing and collision points at each common instant for two road users. '''
+        self.collisionPoints={}
+        self.crossingZones={}
+        TTCs = {}
+        route1=(self.roadUser1.startRouteID,self.roadUser1.endRouteID)
+        route2=(self.roadUser2.startRouteID,self.roadUser2.endRouteID)
+
+        if timeInterval:
+            commonTimeInterval = timeInterval
+        else:
+            commonTimeInterval = self.timeInterval
+        for i in list(commonTimeInterval)[:-1]: # do not look at the 1 last position/velocities, often with errors
+            sys.path.append("D:/behaviourAnalysis/libs")
+            import trajLearning
+            if  i-self.roadUser1.timeInterval.first >= acceptPartialLength and i-self.roadUser2.timeInterval.first >= acceptPartialLength:
+                inter1=moving.Interval(self.roadUser1.timeInterval.first,i)
+                inter2=moving.Interval(self.roadUser2.timeInterval.first,i)
+                partialObjPositions1= self.roadUser1.getObjectInTimeInterval(inter1).positions
+                partialObjPositions2= self.roadUser2.getObjectInTimeInterval(inter2).positions
+                prototypeTrajectories1=trajLearning.findPrototypes(prototypes,nMatching,objects,route1,partialObjPositions1,minSimilarity,index,mostMatched)
+                prototypeTrajectories2=trajLearning.findPrototypes(prototypes,nMatching,objects,route2,partialObjPositions2,minSimilarity,index,mostMatched)
+                if prototypeTrajectories1=={}:
+                    print self.roadUser1.num, 'is abnormal at instant', str(i)
+                elif prototypeTrajectories2=={}:
+                    print self.roadUser2.num, 'is abnormal at instant', str(i)
+                else:
+					self.collisionPoints[i], self.crossingZones[i] = predictionParameters.computeCrossingsCollisionsAtInstant(i, self.roadUser1, self.roadUser2, collisionDistanceThreshold, timeHorizon, computeCZ, debug,prototypeTrajectories1,prototypeTrajectories2)
+					if len(self.collisionPoints[i]) > 0:
+						TTCs[i] = prediction.SafetyPoint.computeExpectedIndicator(self.collisionPoints[i])
+        # add probability of collision, and probability of successful evasive action
+        self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[7], TTCs))
+        
+        if computeCZ:
+            pPETs = {}
+            for i in list(commonTimeInterval)[:-1]:
+                if i in self.crossingZones.keys() and len(self.crossingZones[i]) > 0:
+                    pPETs[i] = prediction.SafetyPoint.computeExpectedIndicator(self.crossingZones[i])
+            self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[9], pPETs))
+
     def addVideoFilename(self,videoFilename):
         self.videoFilename= videoFilename	
 
--- a/python/prediction.py	Tue Jul 08 15:22:30 2014 -0400
+++ b/python/prediction.py	Tue Jul 15 13:25:15 2014 -0400
@@ -4,6 +4,7 @@
 import moving
 import math
 import random
+import numpy as np
 
 class PredictedTrajectory:
     '''Class for predicted trajectories with lazy evaluation
@@ -47,6 +48,20 @@
     def getControl(self):
         return self.control
 
+def findNearestOrientation(initialPosition,prototypeTrajectory):
+	distances=[]
+	for p in prototypeTrajectory.positions:
+		distances.append(moving.Point.distanceNorm2(initialPosition, p))
+	t=np.argmin(distances)
+	return moving.NormAngle.fromPoint(prototypeTrajectory.velocities[t]).angle
+#todo: merge with previous function
+def findNearestInstant(initialPosition,prototypeTrajectory):
+	distances=[]
+	for p in prototypeTrajectory.positions:
+		distances.append(moving.Point.distanceNorm2(initialPosition, p))
+	t=np.argmin(distances)
+	return t
+	
 class PredictedTrajectoryPrototype(PredictedTrajectory):
     '''Predicted trajectory that follows a prototype trajectory
     The prototype is in the format of a moving.Trajectory: it could be
@@ -59,20 +74,39 @@
     (applying a constant ratio equal 
     to the ratio of the user instantaneous speed and the trajectory closest speed)'''
 
-    def __init__(self, initialPosition, initialVelocity, prototypeTrajectory, constantSpeed = True, probability = 1.):
+    def __init__(self, initialPosition, initialVelocity, prototypeTrajectory, constantSpeed = True, speedList=[], probability = 1.):
         self.prototypeTrajectory = prototypeTrajectory
         self.constantSpeed = constantSpeed
+        self.speedList = speedList
         self.probability = probability
         self.predictedPositions = {0: initialPosition}
-        self.predictedSpeedOrientations = {0: moving.NormAngle.fromPoint(initialVelocity)}
+        self.predictedSpeedOrientations = {0: moving.NormAngle(moving.NormAngle.fromPoint(initialVelocity).norm, findNearestOrientation(initialPosition,prototypeTrajectory))}#moving.NormAngle.fromPoint(initialVelocity)}
 
     def predictPosition(self, nTimeSteps):
         if nTimeSteps > 0 and not nTimeSteps in self.predictedPositions.keys():
-            if constantSpeed:
+            if self.constantSpeed:
                 # calculate cumulative distance
-                pass
+                speedNorm= self.predictedSpeedOrientations[0].norm #moving.NormAngle.fromPoint(initialVelocity).norm
+                anglePrototype = findNearestOrientation(self.predictedPositions[nTimeSteps-1],self.prototypeTrajectory)
+                self.predictedSpeedOrientations[nTimeSteps]= moving.NormAngle(speedNorm, anglePrototype)
+                self.predictedPositions[nTimeSteps],tmp= moving.predictPosition(self.predictedPositions[nTimeSteps-1], self.predictedSpeedOrientations[nTimeSteps-1], moving.NormAngle(0,0), None)
+                #pass
+            elif self.speedList!=[]:
+                pass			
             else: # see c++ code, calculate ratio
-                pass
+                speedNorm= self.predictedSpeedOrientations[0].norm
+                instant=findNearestInstant(self.predictedPositions[0],self.prototypeTrajectory)
+                prototypeSpeeds= self.prototypeTrajectory.getSpeeds()[instant:]
+                ratio=float(speedNorm)/prototypeSpeeds[0]
+                resampledSpeeds=[sp*ratio for sp in prototypeSpeeds]
+                anglePrototype = findNearestOrientation(self.predictedPositions[nTimeSteps-1],self.prototypeTrajectory)
+                if nTimeSteps<len(resampledSpeeds):
+                    self.predictedSpeedOrientations[nTimeSteps]= moving.NormAngle(resampledSpeeds[nTimeSteps], anglePrototype)
+                    self.predictedPositions[nTimeSteps],tmp= moving.predictPosition(self.predictedPositions[nTimeSteps-1], self.predictedSpeedOrientations[nTimeSteps-1], moving.NormAngle(0,0), None)				
+                else:
+                    self.predictedSpeedOrientations[nTimeSteps]= moving.NormAngle(resampledSpeeds[-1], anglePrototype)
+                    self.predictedPositions[nTimeSteps],tmp= moving.predictPosition(self.predictedPositions[nTimeSteps-1], self.predictedSpeedOrientations[nTimeSteps-1], moving.NormAngle(0,0), None)
+                # pass
         return self.predictedPositions[nTimeSteps]
 
 class PredictedTrajectoryRandomControl(PredictedTrajectory):
@@ -135,10 +169,14 @@
     def generatePredictedTrajectories(self, obj, instant):
         return []
 
-    def computeCrossingsCollisionsAtInstant(self, currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False):
+    def computeCrossingsCollisionsAtInstant(self, currentInstant, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False,prototypeTrajectories1=None,prototypeTrajectories2=None):
         '''returns the lists of collision points and crossing zones'''
-        predictedTrajectories1 = self.generatePredictedTrajectories(obj1, currentInstant)
-        predictedTrajectories2 = self.generatePredictedTrajectories(obj2, currentInstant)
+        if prototypeTrajectories1==None: 
+            predictedTrajectories1 = self.generatePredictedTrajectories(obj1, currentInstant)
+            predictedTrajectories2 = self.generatePredictedTrajectories(obj2, currentInstant)		
+        else:
+            predictedTrajectories1 = self.generatePredictedTrajectories(obj1, currentInstant,prototypeTrajectories1)
+            predictedTrajectories2 = self.generatePredictedTrajectories(obj2, currentInstant,prototypeTrajectories2)
 
         collisionPoints = []
         crossingZones = []
@@ -161,7 +199,8 @@
                             #    cz = (et1.predictPosition(t1)+et2.predictPosition(t2)).multiply(0.5)
                             cz = moving.segmentIntersection(et1.predictPosition(t1), et1.predictPosition(t1+1), et2.predictPosition(t2), et2.predictPosition(t2+1))
                             if cz:
-                                crossingZones.append(SafetyPoint(cz, et1.probability*et2.probability, abs(t1-t2)))
+                                deltaV= (et1.predictPosition(t1)- et1.predictPosition(t1+1) - et2.predictPosition(t2)+ et2.predictPosition(t2+1)).norm2()
+                                crossingZones.append(SafetyPoint(cz, et1.probability*et2.probability, abs(t1-t2)-(float(collisionDistanceThreshold)/deltaV)))
                             t2 += 1
                         t1 += 1                        
 
@@ -182,7 +221,7 @@
 
         return collisionPoints, crossingZones
 
-    def computeCrossingsCollisions(self, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None):
+    def computeCrossingsCollisions(self, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None,prototypeTrajectories1=None,prototypeTrajectories2=None):
         '''Computes all crossing and collision points at each common instant for two road users. '''
         collisionPoints={}
         crossingZones={}
@@ -191,7 +230,7 @@
         else:
             commonTimeInterval = obj1.commonTimeInterval(obj2)
         for i in list(commonTimeInterval)[:-1]: # do not look at the 1 last position/velocities, often with errors
-            collisionPoints[i], crossingZones[i] = self.computeCrossingsCollisionsAtInstant(i, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ, debug)
+            collisionPoints[i], crossingZones[i] = self.computeCrossingsCollisionsAtInstant(i, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ, debug,prototypeTrajectories1,prototypeTrajectories2)
 
         return collisionPoints, crossingZones
 
@@ -412,10 +451,24 @@
 ####
 # Other Methods
 ####
-
-
-
-
+class prototypePredictionParameters(PredictionParameters):
+    def __init__(self, maxSpeed, nPredictedTrajectories,constantSpeed = True, speedList=[]):
+		#TODO speed profiles integration 
+        name = 'prototype'
+        PredictionParameters.__init__(self, name, maxSpeed)
+        self.nPredictedTrajectories = nPredictedTrajectories
+        #self.prototypeTrajectory = prototypeTrajectory
+        self.constantSpeed = constantSpeed
+        self.speedList = speedList
+        #self.probability=probability		
+        
+    def generatePredictedTrajectories(self, obj, instant,prototypeTrajectories):
+        predictedTrajectories = []
+        initialPosition = obj.getPositionAtInstant(instant)
+        initialVelocity = obj.getVelocityAtInstant(instant)
+        for prototypeTraj in prototypeTrajectories.keys():
+			predictedTrajectories.append(PredictedTrajectoryPrototype(initialPosition, initialVelocity, prototypeTraj, constantSpeed = self.constantSpeed, speedList=self.speedList, probability = prototypeTrajectories[prototypeTraj])) 
+        return predictedTrajectories
 
 if __name__ == "__main__":
     import doctest