Mercurial Hosting > traffic-intelligence
changeset 949:d6c1c05d11f5
modified multithreading at the interaction level for safety computations
author | Nicolas Saunier <nicolas.saunier@polymtl.ca> |
---|---|
date | Fri, 21 Jul 2017 17:52:56 -0400 |
parents | 584b9405e494 |
children | c03d2c0a4c04 |
files | python/events.py python/ml.py python/moving.py python/prediction.py scripts/learn-motion-patterns.py scripts/safety-analysis.py |
diffstat | 6 files changed, 90 insertions(+), 57 deletions(-) [+] |
line wrap: on
line diff
--- a/python/events.py Fri Jul 21 12:11:55 2017 -0400 +++ b/python/events.py Fri Jul 21 17:52:56 2017 -0400 @@ -8,7 +8,7 @@ import numpy as np import multiprocessing -import itertools +import itertools, logging def findRoute(prototypes,objects,i,j,noiseEntryNums,noiseExitNums,minSimilarity= 0.3, spatialThreshold=1.0, delta=180): @@ -216,19 +216,15 @@ minDistances[instant] = moving.MovingObject.minDistance(self.roadUser1, self.roadUser2, instant) self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[3], minDistances, mostSevereIsMax = False)) - def computeCrossingsCollisions(self, predictionParameters, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None, nProcesses = 1):#, usePrototypes=False, route1= (-1,-1), route2=(-1,-1), prototypes={}, secondStepPrototypes={}, nMatching={}, objects=[], noiseEntryNums=[], noiseExitNums=[], minSimilarity=0.1, mostMatched=None, useDestination=True, useSpeedPrototype=True, acceptPartialLength=30, step=1): + def computeCrossingsCollisions(self, predictionParameters, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None): '''Computes all crossing and collision points at each common instant for two road users. ''' TTCs = {} collisionProbabilities = {} - # if usePrototypes: - # route1= getRoute(self.roadUser1,prototypes,objects,noiseEntryNums,noiseExitNums,useDestination) - # route2= getRoute(self.roadUser2,prototypes,objects,noiseEntryNums,noiseExitNums,useDestination) - if timeInterval is not None: commonTimeInterval = timeInterval else: commonTimeInterval = self.timeInterval - self.collisionPoints, crossingZones = predictionParameters.computeCrossingsCollisions(self.roadUser1, self.roadUser2, collisionDistanceThreshold, timeHorizon, computeCZ, debug, commonTimeInterval, nProcesses)#,usePrototypes,route1,route2,prototypes,secondStepPrototypes,nMatching,objects,noiseEntryNums,noiseExitNums,minSimilarity,mostMatched,useDestination,useSpeedPrototype,acceptPartialLength, step) + self.collisionPoints, crossingZones = predictionParameters.computeCrossingsCollisions(self.roadUser1, self.roadUser2, collisionDistanceThreshold, timeHorizon, computeCZ, debug, commonTimeInterval) for i, cps in self.collisionPoints.iteritems(): TTCs[i] = prediction.SafetyPoint.computeExpectedIndicator(cps) collisionProbabilities[i] = sum([p.probability for p in cps]) @@ -293,6 +289,16 @@ else: return None +def computeIndicators(interactions, computeMotionPrediction, computePET, predictionParameters, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None): + for inter in interactions: + print('processing interaction {}'.format(inter.getNum())) # logging.debug('processing interaction {}'.format(inter.getNum())) + inter.computeIndicators() + if computeMotionPrediction: + inter.computeCrossingsCollisions(predictionParameters, collisionDistanceThreshold, timeHorizon, computeCZ, debug, timeInterval) + if computePET: + inter.computePET(collisionDistanceTreshold) + return interactions + def aggregateSafetyPoints(interactions, pointType = 'collision'): '''Put all collision points or crossing zones in a list for display''' allPoints = []
--- a/python/ml.py Fri Jul 21 12:11:55 2017 -0400 +++ b/python/ml.py Fri Jul 21 17:52:56 2017 -0400 @@ -182,6 +182,8 @@ if optimizeCentroid is True, each time an element is added, we recompute the centroid trajectory as the most similar to all in the cluster + initialPrototypeIndices are indices in instances + TODO: check how similarity evolves in clusters''' if len(instances) == 0: print('no instances to cluster (empty list)') @@ -211,7 +213,8 @@ prototypeIndices = initialPrototypeIndices # think of the format: if indices, have to be in instances for i in prototypeIndices: clusters.append([i]) - for i in indices[1:]: + indices.remove(i) + for i in indices: for j in prototypeIndices: if similarities[i][j] < 0: similarities[i][j] = similarityFunc(instances[i], instances[j])
--- a/python/moving.py Fri Jul 21 12:11:55 2017 -0400 +++ b/python/moving.py Fri Jul 21 17:52:56 2017 -0400 @@ -5,7 +5,7 @@ from base import VideoFilenameAddable from math import sqrt, atan2, cos, sin -from numpy import median, array, zeros, hypot, NaN, std, floor, float32, argwhere +from numpy import median, array, arange, zeros, ones, hypot, NaN, std, floor, float32, argwhere, minimum from matplotlib.pyplot import plot, text from scipy.stats import scoreatpercentile from scipy.spatial.distance import cdist @@ -1458,6 +1458,15 @@ indices, intersections = self.positions.getIntersections(p1, p2) return [t+self.getFirstInstant() for t in indices] + def computeTrajectorySimilarities(self, prototypes, lcss): + 'Computes the similarities to the prototypes using the LCSS' + if not hasattr(self, 'prototypeSimilarities'): + self.prototypeSimilarities = [] + for proto in prototypes: + lcss.similarities(proto.getMovingObject().getPositions().asArray().T, self.getPositions().asArray().T) + similarities = lcss.similarityTable[-1, :-1].astype(float) + self.prototypeSimilarities.append(similarities/minimum(arange(1.,len(similarities)+1), proto.getMovingObject().length()*ones(len(similarities)))) + @staticmethod def computePET(obj1, obj2, collisionDistanceThreshold): '''Post-encroachment time based on distance threshold
--- a/python/prediction.py Fri Jul 21 12:11:55 2017 -0400 +++ b/python/prediction.py Fri Jul 21 17:52:56 2017 -0400 @@ -7,7 +7,7 @@ import math, random from copy import copy import numpy as np -from multiprocessing import Pool +#from multiprocessing import Pool class PredictedTrajectory(object): @@ -64,7 +64,7 @@ (applying a constant ratio equal to the ratio of the user instantaneous speed and the trajectory closest speed)''' - def __init__(self, initialPosition, initialVelocity, prototype, constantSpeed = False, probability = 1.): + def __init__(self, initialPosition, initialVelocity, prototype, constantSpeed = False, nFramesIgnore = 3, probability = 1.): ''' prototype is a MovingObject Prediction at constant speed will not work for unrealistic trajectories @@ -72,6 +72,7 @@ but is good for realistic motion (eg features)''' self.prototype = prototype self.constantSpeed = constantSpeed + self.nFramesIgnore = nFramesIgnore self.probability = probability self.predictedPositions = {0: initialPosition} self.closestPointIdx = prototype.getPositions().getClosestPoint(initialPosition) @@ -92,10 +93,10 @@ while i < trajLength and traj.getCumulativeDistance(i) < traveledDistance: i += 1 if i == trajLength: - v = self.prototype.getVelocityAt(-1) + v = self.prototype.getVelocityAt(-1-self.nFramesIgnore) self.predictedPositions[nTimeSteps] = deltaPosition.rotate(v.angle()-self.theta)+traj[i-1]+v*((traveledDistance-traj.getCumulativeDistance(i-1))/v.norm2()) else: - v = self.prototype.getVelocityAt(i-1) + v = self.prototype.getVelocityAt(min(i-1, int(self.prototype.length())-1-self.nFramesIgnore)) self.predictedPositions[nTimeSteps] = deltaPosition.rotate(v.angle()-self.theta)+traj[i-1]+(traj[i]-traj[i-1])*((traveledDistance-traj.getCumulativeDistance(i-1))/traj.getDistance(i-1)) else: traj = self.prototype.getPositions() @@ -103,10 +104,10 @@ nSteps = self.ratio*nTimeSteps+self.closestPointIdx i = int(np.floor(nSteps)) if nSteps < trajLength-1: - v = self.prototype.getVelocityAt(i) + v = self.prototype.getVelocityAt(min(i, int(self.prototype.length())-1-self.nFramesIgnore)) self.predictedPositions[nTimeSteps] = deltaPosition.rotate(v.angle()-self.theta)+traj[i]+(traj[i+1]-traj[i])*(nSteps-i) else: - v = self.prototype.getVelocityAt(-1) + v = self.prototype.getVelocityAt(-1-self.nFramesIgnore) self.predictedPositions[nTimeSteps] = deltaPosition.rotate(v.angle()-self.theta)+traj[-1]+v*(nSteps-trajLength+1) return self.predictedPositions[nTimeSteps] @@ -306,7 +307,7 @@ def generatePredictedTrajectories(self, obj, instant): return [] - def computeCrossingsCollisions(self, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None, nProcesses = 1): + def computeCrossingsCollisions(self, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None):#, nProcesses = 1): '''Computes all crossing and collision points at each common instant for two road users. ''' collisionPoints={} crossingZones={} @@ -314,23 +315,23 @@ commonTimeInterval = timeInterval else: commonTimeInterval = obj1.commonTimeInterval(obj2) - if nProcesses == 1: - for i in list(commonTimeInterval)[:-1]: # do not look at the 1 last position/velocities, often with errors - i, cp, cz = computeCrossingsCollisionsAtInstant(self, i, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ, debug) - if len(cp) != 0: - collisionPoints[i] = cp - if len(cz) != 0: - crossingZones[i] = cz - else: - pool = Pool(processes = nProcesses) - jobs = [pool.apply_async(computeCrossingsCollisionsAtInstant, args = (self, i, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ, debug)) for i in list(commonTimeInterval)[:-1]] - for j in jobs: - i, cp, cz = j.get() - if len(cp) != 0: - collisionPoints[i] = cp - if len(cz) != 0: - crossingZones[i] = cz - pool.close() + #if nProcesses == 1: + for i in list(commonTimeInterval)[:-1]: # do not look at the 1 last position/velocities, often with errors + i, cp, cz = computeCrossingsCollisionsAtInstant(self, i, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ, debug) + if len(cp) != 0: + collisionPoints[i] = cp + if len(cz) != 0: + crossingZones[i] = cz + # else: + # pool = Pool(processes = nProcesses) + # jobs = [pool.apply_async(computeCrossingsCollisionsAtInstant, args = (self, i, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ, debug)) for i in list(commonTimeInterval)[:-1]] + # for j in jobs: + # i, cp, cz = j.get() + # if len(cp) != 0: + # collisionPoints[i] = cp + # if len(cz) != 0: + # crossingZones[i] = cz + # pool.close() return collisionPoints, crossingZones def computeCollisionProbability(self, obj1, obj2, collisionDistanceThreshold, timeHorizon, debug = False, timeInterval = None): @@ -558,18 +559,16 @@ self.constantSpeed = constantSpeed self.useFeatures = useFeatures + def getLcss(self): + return self.lcss + def addPredictedTrajectories(self, predictedTrajectories, obj, instant): - if not hasattr(obj, 'prototypeSimilarities'): - obj.prototypeSimilarities = [] - for proto in self.prototypes: - self.lcss.similarities(proto.getMovingObject().getPositions().asArray().T, obj.getPositions().asArray().T) - similarities = self.lcss.similarityTable[-1, :-1].astype(float) - obj.prototypeSimilarities.append(similarities/np.minimum(np.arange(1.,len(similarities)+1), proto.getMovingObject().length()*np.ones(len(similarities)))) + obj.computeTrajectorySimilarities(self.prototypes, self.lcss) for proto, similarities in zip(self.prototypes, obj.prototypeSimilarities): if similarities[instant-obj.getFirstInstant()] >= self.minSimilarity: initialPosition = obj.getPositionAtInstant(instant) initialVelocity = obj.getVelocityAtInstant(instant) - predictedTrajectories.append(PredictedTrajectoryPrototype(initialPosition, initialVelocity, proto.getMovingObject(), constantSpeed = self.constantSpeed, probability = proto.getNMatchings())) + predictedTrajectories.append(PredictedTrajectoryPrototype(initialPosition, initialVelocity, proto.getMovingObject(), constantSpeed = self.constantSpeed, probability = proto.getNMatchings())) def generatePredictedTrajectories(self, obj, instant): predictedTrajectories = []
--- a/scripts/learn-motion-patterns.py Fri Jul 21 12:11:55 2017 -0400 +++ b/scripts/learn-motion-patterns.py Fri Jul 21 17:52:56 2017 -0400 @@ -56,22 +56,37 @@ else: objects = storage.loadTrajectoriesFromSqlite(args.databaseFilename, trajectoryType, withFeatures = (args.trajectoryType == 'objectfeatures'), objectNumbers = args.nTrajectories, timeStep = args.positionSubsamplingRate) -if args.inputPrototypeDatabaseFilename is not None: - prototypeIndices, dbFilenames, trajectoryTypes, nMatchings, prototypes = storage.loadPrototypesFromSqlite(args.inputPrototypeDatabaseFilename, True) trajectories = [o.getPositions().asArray().T for o in objects] +if args.inputPrototypeDatabaseFilename is not None: + initialPrototypes = storage.loadPrototypesFromSqlite(args.inputPrototypeDatabaseFilename, True) + trajectories = [p.getMovingObject().getPositions().asArray().T for p in initialPrototypes]+trajectories + initialPrototypeIndices = range(len(initialPrototypes)) +else: + initialPrototypes = [] + initialPrototypeIndices = None lcss = utils.LCSS(metric = args.metric, epsilon = args.epsilon) nTrajectories = len(trajectories) similarities = -np.ones((nTrajectories, nTrajectories)) - -prototypeIndices, labels = ml.prototypeCluster(trajectories, similarities, args.minSimilarity, lambda x,y : lcss.computeNormalized(x, y), args.minClusterSize, args.optimizeCentroid, args.randomInitialization, True, None) # this line can be called again without reinitializing similarities +# the next line can be called again without reinitializing similarities +prototypeIndices, labels = ml.prototypeCluster(trajectories, similarities, args.minSimilarity, lambda x,y : lcss.computeNormalized(x, y), args.minClusterSize, args.optimizeCentroid, args.randomInitialization, args.inputPrototypeDatabaseFilename is not None, initialPrototypeIndices) # assignment is done only if working on the same database, otherwise the matchings will not compare and one has to to matchings on a large scale at once clusterSizes = ml.computeClusterSizes(labels, prototypeIndices, -1) print(clusterSizes) -prototypes = [moving.Prototype(args.databaseFilename, objects[i].getNum(), prototypeType, clusterSizes[i]) for i in prototypeIndices] +prototypes = [] +for i in prototypeIndices: + if i<len(initialPrototypes): + initialPrototypes[i].nMatchings = 0 + prototypes.append(initialPrototypes[i]) + else: + if args.inputPrototypeDatabaseFilename is None: + nmatchings = clusterSizes[i] + else: + nmatchings = 0 + prototypes.append(moving.Prototype(args.databaseFilename, objects[i].getNum(), prototypeType, nmatchings) if args.outputPrototypeDatabaseFilename is None: outputPrototypeDatabaseFilename = args.databaseFilename else:
--- a/scripts/safety-analysis.py Fri Jul 21 12:11:55 2017 -0400 +++ b/scripts/safety-analysis.py Fri Jul 21 17:52:56 2017 -0400 @@ -3,6 +3,7 @@ import storage, prediction, events, moving import sys, argparse, random +from multiprocessing import Pool import matplotlib.pyplot as plt import numpy as np @@ -70,22 +71,22 @@ objects = storage.loadTrajectoriesFromSqlite(params.databaseFilename, 'object', args.nObjects, withFeatures = (params.useFeaturesForPrediction or predictionMethod == 'ps' or predictionMethod == 'mp')) interactions = events.createInteractions(objects) -for inter in interactions: - print('processing interaction {}'.format(inter.getNum()) - inter.computeIndicators() - if not args.noMotionPrediction: - inter.computeCrossingsCollisions(predictionParameters, params.collisionDistance, params.predictionTimeHorizon, params.crossingZones, nProcesses = args.nProcesses) - -if args.computePET: - for inter in interactions: - inter.computePET(params.collisionDistance) - -storage.saveIndicatorsToSqlite(params.databaseFilename, interactions) +if args.nProcesses == 1: + processed = events.computeIndicators(interactions, not args.noMotionPrediction, args.computePET, predictionParameters, params.collisionDistance, params.predictionTimeHorizon, params.crossingZones, False, None) +else: + pool = Pool(processes = args.nProcesses) + nInteractionPerProcess = int(np.ceil(len(interactions)/float(args.nProcesses))) + jobs = [pool.apply_async(events.computeIndicators, args = (interactions[i*nInteractionPerProcess:(i+1)*nInteractionPerProcess], not args.noMotionPrediction, args.computePET, predictionParameters, params.collisionDistance, params.predictionTimeHorizon, params.crossingZones, False, None)) for i in range(args.nProcesses)] + processed = [] + for job in jobs: + processed += job.get() + pool.close() +storage.saveIndicatorsToSqlite(params.databaseFilename, processed) if args.displayCollisionPoints: plt.figure() allCollisionPoints = [] - for inter in interactions: + for inter in processed: for collisionPoints in inter.collisionPoints.values(): allCollisionPoints += collisionPoints moving.Point.plotAll(allCollisionPoints)