changeset 949:d6c1c05d11f5

modified multithreading at the interaction level for safety computations
author Nicolas Saunier <nicolas.saunier@polymtl.ca>
date Fri, 21 Jul 2017 17:52:56 -0400
parents 584b9405e494
children c03d2c0a4c04
files python/events.py python/ml.py python/moving.py python/prediction.py scripts/learn-motion-patterns.py scripts/safety-analysis.py
diffstat 6 files changed, 90 insertions(+), 57 deletions(-) [+]
line wrap: on
line diff
--- a/python/events.py	Fri Jul 21 12:11:55 2017 -0400
+++ b/python/events.py	Fri Jul 21 17:52:56 2017 -0400
@@ -8,7 +8,7 @@
 import numpy as np
 
 import multiprocessing
-import itertools
+import itertools, logging
 
 
 def findRoute(prototypes,objects,i,j,noiseEntryNums,noiseExitNums,minSimilarity= 0.3, spatialThreshold=1.0, delta=180):
@@ -216,19 +216,15 @@
                 minDistances[instant] = moving.MovingObject.minDistance(self.roadUser1, self.roadUser2, instant)
             self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[3], minDistances, mostSevereIsMax = False))
 
-    def computeCrossingsCollisions(self, predictionParameters, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None, nProcesses = 1):#, usePrototypes=False, route1= (-1,-1), route2=(-1,-1), prototypes={}, secondStepPrototypes={}, nMatching={}, objects=[], noiseEntryNums=[], noiseExitNums=[], minSimilarity=0.1, mostMatched=None, useDestination=True, useSpeedPrototype=True, acceptPartialLength=30, step=1):
+    def computeCrossingsCollisions(self, predictionParameters, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None):
         '''Computes all crossing and collision points at each common instant for two road users. '''
         TTCs = {}
         collisionProbabilities = {}
-        # if usePrototypes:
-        #     route1= getRoute(self.roadUser1,prototypes,objects,noiseEntryNums,noiseExitNums,useDestination)
-        #     route2= getRoute(self.roadUser2,prototypes,objects,noiseEntryNums,noiseExitNums,useDestination)
-
         if timeInterval is not None:
             commonTimeInterval = timeInterval
         else:
             commonTimeInterval = self.timeInterval
-        self.collisionPoints, crossingZones = predictionParameters.computeCrossingsCollisions(self.roadUser1, self.roadUser2, collisionDistanceThreshold, timeHorizon, computeCZ, debug, commonTimeInterval, nProcesses)#,usePrototypes,route1,route2,prototypes,secondStepPrototypes,nMatching,objects,noiseEntryNums,noiseExitNums,minSimilarity,mostMatched,useDestination,useSpeedPrototype,acceptPartialLength, step)
+        self.collisionPoints, crossingZones = predictionParameters.computeCrossingsCollisions(self.roadUser1, self.roadUser2, collisionDistanceThreshold, timeHorizon, computeCZ, debug, commonTimeInterval)
         for i, cps in self.collisionPoints.iteritems():
             TTCs[i] = prediction.SafetyPoint.computeExpectedIndicator(cps)
             collisionProbabilities[i] = sum([p.probability for p in cps])
@@ -293,6 +289,16 @@
     else:
         return None
 
+def computeIndicators(interactions, computeMotionPrediction, computePET, predictionParameters, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None):
+    for inter in interactions:
+        print('processing interaction {}'.format(inter.getNum())) # logging.debug('processing interaction {}'.format(inter.getNum()))
+        inter.computeIndicators()
+        if computeMotionPrediction:
+            inter.computeCrossingsCollisions(predictionParameters, collisionDistanceThreshold, timeHorizon, computeCZ, debug, timeInterval)
+        if computePET:
+            inter.computePET(collisionDistanceTreshold)
+    return interactions
+    
 def aggregateSafetyPoints(interactions, pointType = 'collision'):
     '''Put all collision points or crossing zones in a list for display'''
     allPoints = []
--- a/python/ml.py	Fri Jul 21 12:11:55 2017 -0400
+++ b/python/ml.py	Fri Jul 21 17:52:56 2017 -0400
@@ -182,6 +182,8 @@
 
     if optimizeCentroid is True, each time an element is added, we recompute the centroid trajectory as the most similar to all in the cluster
 
+    initialPrototypeIndices are indices in instances
+
     TODO: check how similarity evolves in clusters'''
     if len(instances) == 0:
         print('no instances to cluster (empty list)')
@@ -211,7 +213,8 @@
         prototypeIndices = initialPrototypeIndices # think of the format: if indices, have to be in instances
     for i in prototypeIndices:
         clusters.append([i])
-    for i in indices[1:]:
+        indices.remove(i)
+    for i in indices:
         for j in prototypeIndices:
             if similarities[i][j] < 0:
                 similarities[i][j] = similarityFunc(instances[i], instances[j])
--- a/python/moving.py	Fri Jul 21 12:11:55 2017 -0400
+++ b/python/moving.py	Fri Jul 21 17:52:56 2017 -0400
@@ -5,7 +5,7 @@
 from base import VideoFilenameAddable
 
 from math import sqrt, atan2, cos, sin
-from numpy import median, array, zeros, hypot, NaN, std, floor, float32, argwhere
+from numpy import median, array, arange, zeros, ones, hypot, NaN, std, floor, float32, argwhere, minimum
 from matplotlib.pyplot import plot, text
 from scipy.stats import scoreatpercentile
 from scipy.spatial.distance import cdist
@@ -1458,6 +1458,15 @@
         indices, intersections = self.positions.getIntersections(p1, p2)
         return [t+self.getFirstInstant() for t in indices]
 
+    def computeTrajectorySimilarities(self, prototypes, lcss):
+        'Computes the similarities to the prototypes using the LCSS'
+        if not hasattr(self, 'prototypeSimilarities'):
+            self.prototypeSimilarities = []
+            for proto in prototypes:
+                lcss.similarities(proto.getMovingObject().getPositions().asArray().T, self.getPositions().asArray().T)
+                similarities = lcss.similarityTable[-1, :-1].astype(float)
+                self.prototypeSimilarities.append(similarities/minimum(arange(1.,len(similarities)+1), proto.getMovingObject().length()*ones(len(similarities))))
+    
     @staticmethod
     def computePET(obj1, obj2, collisionDistanceThreshold):
         '''Post-encroachment time based on distance threshold
--- a/python/prediction.py	Fri Jul 21 12:11:55 2017 -0400
+++ b/python/prediction.py	Fri Jul 21 17:52:56 2017 -0400
@@ -7,7 +7,7 @@
 import math, random
 from copy import copy
 import numpy as np
-from multiprocessing import Pool
+#from multiprocessing import Pool
 
 
 class PredictedTrajectory(object):
@@ -64,7 +64,7 @@
     (applying a constant ratio equal 
     to the ratio of the user instantaneous speed and the trajectory closest speed)'''
 
-    def __init__(self, initialPosition, initialVelocity, prototype, constantSpeed = False, probability = 1.):
+    def __init__(self, initialPosition, initialVelocity, prototype, constantSpeed = False, nFramesIgnore = 3, probability = 1.):
         ''' prototype is a MovingObject
 
         Prediction at constant speed will not work for unrealistic trajectories 
@@ -72,6 +72,7 @@
         but is good for realistic motion (eg features)'''
         self.prototype = prototype
         self.constantSpeed = constantSpeed
+        self.nFramesIgnore = nFramesIgnore
         self.probability = probability
         self.predictedPositions = {0: initialPosition}
         self.closestPointIdx = prototype.getPositions().getClosestPoint(initialPosition)
@@ -92,10 +93,10 @@
                 while i < trajLength and traj.getCumulativeDistance(i) < traveledDistance:
                     i += 1
                 if i == trajLength:
-                    v = self.prototype.getVelocityAt(-1)
+                    v = self.prototype.getVelocityAt(-1-self.nFramesIgnore)
                     self.predictedPositions[nTimeSteps] = deltaPosition.rotate(v.angle()-self.theta)+traj[i-1]+v*((traveledDistance-traj.getCumulativeDistance(i-1))/v.norm2())
                 else:
-                    v = self.prototype.getVelocityAt(i-1)
+                    v = self.prototype.getVelocityAt(min(i-1, int(self.prototype.length())-1-self.nFramesIgnore))
                     self.predictedPositions[nTimeSteps] = deltaPosition.rotate(v.angle()-self.theta)+traj[i-1]+(traj[i]-traj[i-1])*((traveledDistance-traj.getCumulativeDistance(i-1))/traj.getDistance(i-1))
             else:
                 traj = self.prototype.getPositions()
@@ -103,10 +104,10 @@
                 nSteps = self.ratio*nTimeSteps+self.closestPointIdx
                 i = int(np.floor(nSteps))
                 if nSteps < trajLength-1:
-                    v = self.prototype.getVelocityAt(i)
+                    v = self.prototype.getVelocityAt(min(i, int(self.prototype.length())-1-self.nFramesIgnore))
                     self.predictedPositions[nTimeSteps] = deltaPosition.rotate(v.angle()-self.theta)+traj[i]+(traj[i+1]-traj[i])*(nSteps-i)
                 else:
-                    v = self.prototype.getVelocityAt(-1)
+                    v = self.prototype.getVelocityAt(-1-self.nFramesIgnore)
                     self.predictedPositions[nTimeSteps] = deltaPosition.rotate(v.angle()-self.theta)+traj[-1]+v*(nSteps-trajLength+1)
         return self.predictedPositions[nTimeSteps]
 
@@ -306,7 +307,7 @@
     def generatePredictedTrajectories(self, obj, instant):
         return []
 
-    def computeCrossingsCollisions(self, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None, nProcesses = 1):
+    def computeCrossingsCollisions(self, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None):#, nProcesses = 1):
         '''Computes all crossing and collision points at each common instant for two road users. '''
         collisionPoints={}
         crossingZones={}
@@ -314,23 +315,23 @@
             commonTimeInterval = timeInterval
         else:
             commonTimeInterval = obj1.commonTimeInterval(obj2)
-        if nProcesses == 1:
-            for i in list(commonTimeInterval)[:-1]: # do not look at the 1 last position/velocities, often with errors
-                i, cp, cz = computeCrossingsCollisionsAtInstant(self, i, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ, debug)
-                if len(cp) != 0:
-                    collisionPoints[i] = cp
-                if len(cz) != 0:
-                    crossingZones[i] = cz
-        else:
-            pool = Pool(processes = nProcesses)
-            jobs = [pool.apply_async(computeCrossingsCollisionsAtInstant, args = (self, i, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ, debug)) for i in list(commonTimeInterval)[:-1]]
-            for j in jobs:
-                i, cp, cz = j.get()
-                if len(cp) != 0:
-                    collisionPoints[i] = cp
-                if len(cz) != 0:
-                    crossingZones[i] = cz
-            pool.close()
+        #if nProcesses == 1:
+        for i in list(commonTimeInterval)[:-1]: # do not look at the 1 last position/velocities, often with errors
+            i, cp, cz = computeCrossingsCollisionsAtInstant(self, i, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ, debug)
+            if len(cp) != 0:
+                collisionPoints[i] = cp
+            if len(cz) != 0:
+                crossingZones[i] = cz
+        # else:
+        #     pool = Pool(processes = nProcesses)
+        #     jobs = [pool.apply_async(computeCrossingsCollisionsAtInstant, args = (self, i, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ, debug)) for i in list(commonTimeInterval)[:-1]]
+        #     for j in jobs:
+        #         i, cp, cz = j.get()
+        #         if len(cp) != 0:
+        #             collisionPoints[i] = cp
+        #         if len(cz) != 0:
+        #             crossingZones[i] = cz
+        #     pool.close()
         return collisionPoints, crossingZones
 
     def computeCollisionProbability(self, obj1, obj2, collisionDistanceThreshold, timeHorizon, debug = False, timeInterval = None):
@@ -558,18 +559,16 @@
         self.constantSpeed = constantSpeed
         self.useFeatures = useFeatures
 
+    def getLcss(self):
+        return self.lcss
+        
     def addPredictedTrajectories(self, predictedTrajectories, obj, instant):
-        if not hasattr(obj, 'prototypeSimilarities'):
-            obj.prototypeSimilarities = []
-            for proto in self.prototypes:
-                self.lcss.similarities(proto.getMovingObject().getPositions().asArray().T, obj.getPositions().asArray().T)
-                similarities = self.lcss.similarityTable[-1, :-1].astype(float)
-                obj.prototypeSimilarities.append(similarities/np.minimum(np.arange(1.,len(similarities)+1), proto.getMovingObject().length()*np.ones(len(similarities))))
+        obj.computeTrajectorySimilarities(self.prototypes, self.lcss)
         for proto, similarities in zip(self.prototypes, obj.prototypeSimilarities):
             if similarities[instant-obj.getFirstInstant()] >= self.minSimilarity:
                 initialPosition = obj.getPositionAtInstant(instant)
                 initialVelocity = obj.getVelocityAtInstant(instant)
-                predictedTrajectories.append(PredictedTrajectoryPrototype(initialPosition, initialVelocity, proto.getMovingObject(), constantSpeed = self.constantSpeed, probability = proto.getNMatchings())) 
+                predictedTrajectories.append(PredictedTrajectoryPrototype(initialPosition, initialVelocity, proto.getMovingObject(), constantSpeed = self.constantSpeed, probability = proto.getNMatchings()))
         
     def generatePredictedTrajectories(self, obj, instant):
         predictedTrajectories = []
--- a/scripts/learn-motion-patterns.py	Fri Jul 21 12:11:55 2017 -0400
+++ b/scripts/learn-motion-patterns.py	Fri Jul 21 17:52:56 2017 -0400
@@ -56,22 +56,37 @@
 else:
     objects = storage.loadTrajectoriesFromSqlite(args.databaseFilename, trajectoryType, withFeatures = (args.trajectoryType == 'objectfeatures'), objectNumbers = args.nTrajectories, timeStep = args.positionSubsamplingRate)
 
-if args.inputPrototypeDatabaseFilename is not None:
-    prototypeIndices, dbFilenames, trajectoryTypes, nMatchings, prototypes = storage.loadPrototypesFromSqlite(args.inputPrototypeDatabaseFilename, True)
     
 trajectories = [o.getPositions().asArray().T for o in objects]
+if args.inputPrototypeDatabaseFilename is not None:
+    initialPrototypes = storage.loadPrototypesFromSqlite(args.inputPrototypeDatabaseFilename, True)
+    trajectories = [p.getMovingObject().getPositions().asArray().T for p in initialPrototypes]+trajectories
+    initialPrototypeIndices = range(len(initialPrototypes))
+else:
+    initialPrototypes = []
+    initialPrototypeIndices = None
 
 lcss = utils.LCSS(metric = args.metric, epsilon = args.epsilon)
 nTrajectories = len(trajectories)
 
 similarities = -np.ones((nTrajectories, nTrajectories))
-
-prototypeIndices, labels = ml.prototypeCluster(trajectories, similarities, args.minSimilarity, lambda x,y : lcss.computeNormalized(x, y), args.minClusterSize, args.optimizeCentroid, args.randomInitialization, True, None) # this line can be called again without reinitializing similarities
+# the next line can be called again without reinitializing similarities
+prototypeIndices, labels = ml.prototypeCluster(trajectories, similarities, args.minSimilarity, lambda x,y : lcss.computeNormalized(x, y), args.minClusterSize, args.optimizeCentroid, args.randomInitialization, args.inputPrototypeDatabaseFilename is not None, initialPrototypeIndices) # assignment is done only if working on the same database, otherwise the matchings will not compare and one has to to matchings on a large scale at once
 
 clusterSizes = ml.computeClusterSizes(labels, prototypeIndices, -1)
 print(clusterSizes)
 
-prototypes = [moving.Prototype(args.databaseFilename, objects[i].getNum(), prototypeType, clusterSizes[i]) for i in prototypeIndices]
+prototypes = []
+for i in prototypeIndices:
+    if i<len(initialPrototypes):
+        initialPrototypes[i].nMatchings = 0
+        prototypes.append(initialPrototypes[i])
+    else:
+        if args.inputPrototypeDatabaseFilename is None:
+            nmatchings = clusterSizes[i]
+        else:
+            nmatchings = 0
+        prototypes.append(moving.Prototype(args.databaseFilename, objects[i].getNum(), prototypeType, nmatchings)
 if args.outputPrototypeDatabaseFilename is None:
     outputPrototypeDatabaseFilename = args.databaseFilename
 else:
--- a/scripts/safety-analysis.py	Fri Jul 21 12:11:55 2017 -0400
+++ b/scripts/safety-analysis.py	Fri Jul 21 17:52:56 2017 -0400
@@ -3,6 +3,7 @@
 import storage, prediction, events, moving
 
 import sys, argparse, random
+from multiprocessing import Pool
 
 import matplotlib.pyplot as plt
 import numpy as np
@@ -70,22 +71,22 @@
 objects = storage.loadTrajectoriesFromSqlite(params.databaseFilename, 'object', args.nObjects, withFeatures = (params.useFeaturesForPrediction or predictionMethod == 'ps' or predictionMethod == 'mp'))
 
 interactions = events.createInteractions(objects)
-for inter in interactions:
-    print('processing interaction {}'.format(inter.getNum())
-    inter.computeIndicators()
-    if not args.noMotionPrediction:
-        inter.computeCrossingsCollisions(predictionParameters, params.collisionDistance, params.predictionTimeHorizon, params.crossingZones, nProcesses = args.nProcesses)
-
-if args.computePET:
-    for inter in interactions:
-        inter.computePET(params.collisionDistance)
-    
-storage.saveIndicatorsToSqlite(params.databaseFilename, interactions)
+if args.nProcesses == 1:
+    processed = events.computeIndicators(interactions, not args.noMotionPrediction, args.computePET, predictionParameters, params.collisionDistance, params.predictionTimeHorizon, params.crossingZones, False, None)
+else:
+    pool = Pool(processes = args.nProcesses)
+    nInteractionPerProcess = int(np.ceil(len(interactions)/float(args.nProcesses)))
+    jobs = [pool.apply_async(events.computeIndicators, args = (interactions[i*nInteractionPerProcess:(i+1)*nInteractionPerProcess], not args.noMotionPrediction, args.computePET, predictionParameters, params.collisionDistance, params.predictionTimeHorizon, params.crossingZones, False, None)) for i in range(args.nProcesses)]
+    processed = []
+    for job in jobs:
+        processed += job.get()
+    pool.close()
+storage.saveIndicatorsToSqlite(params.databaseFilename, processed)
 
 if args.displayCollisionPoints:
     plt.figure()
     allCollisionPoints = []
-    for inter in interactions:
+    for inter in processed:
         for collisionPoints in inter.collisionPoints.values():
             allCollisionPoints += collisionPoints
     moving.Point.plotAll(allCollisionPoints)