Mercurial Hosting > traffic-intelligence
diff python/events.py @ 614:5e09583275a4
Merged Nicolas/trafficintelligence into default
author | Mohamed Gomaa <eng.m.gom3a@gmail.com> |
---|---|
date | Fri, 05 Dec 2014 12:13:53 -0500 |
parents | 07b1bd0785cd |
children | 84690dfe5560 |
line wrap: on
line diff
--- a/python/events.py Thu Apr 18 15:29:33 2013 -0400 +++ b/python/events.py Fri Dec 05 12:13:53 2014 -0500 @@ -8,9 +8,7 @@ import multiprocessing import itertools -import moving -import prediction -import indicators +import moving, prediction, indicators, utils __metaclass__ = type @@ -22,75 +20,205 @@ contains the indicators in a dictionary with the names as keys ''' - categories = {'headon': 0, + categories = {'Head On': 0, 'rearend': 1, 'side': 2, 'parallel': 3} - def __init__(self, num = None, timeInterval = None, roaduserNum1 = None, roaduserNum2 = None, movingObject1 = None, movingObject2 = None, categoryNum = None): + indicatorNames = ['Collision Course Dot Product', + 'Collision Course Angle', + 'Distance', + 'Minimum Distance', + 'Velocity Angle', + 'Speed Differential', + 'Collision Probability', + 'Time to Collision', + 'Probability of Successful Evasive Action', + 'predicted Post Encroachment Time'] + + indicatorNameToIndices = utils.inverseEnumeration(indicatorNames) + + indicatorShortNames = ['CCDP', + 'CCA', + 'Dist', + 'MinDist', + 'VA', + 'SD', + 'PoC', + 'TTC', + 'P(SEA)', + 'pPET'] + + indicatorUnits = ['', + 'rad', + 'm', + 'm', + 'rad', + 'm/s', + '', + 's', + '', + ''] + + def __init__(self, num = None, timeInterval = None, roaduserNum1 = None, roaduserNum2 = None, roadUser1 = None, roadUser2 = None, categoryNum = None): moving.STObject.__init__(self, num, timeInterval) - self.roaduserNumbers = set([roaduserNum1, roaduserNum2]) - self.movingObject1 = movingObject1 - self.movingObject2 = movingObject2 + if timeInterval == None and roadUser1 != None and roadUser2 != None: + self.timeInterval = roadUser1.commonTimeInterval(roadUser2) + self.roadUser1 = roadUser1 + self.roadUser2 = roadUser2 + if roaduserNum1 != None and roaduserNum2 != None: + self.roadUserNumbers = set([roaduserNum1, roaduserNum2]) + elif roadUser1 != None and roadUser2 != None: + self.roadUserNumbers = set(roadUser1.getNum(), roadUser2.getNum()) + else: + self.roadUserNumbers = None self.categoryNum = categoryNum self.indicators = {} + self.interactionInterval = None + + def getRoadUserNumbers(self): + return self.roadUserNumbers def getIndicator(self, indicatorName): - return self.indicators[indicatorName] + return self.indicators.get(indicatorName, None) def addIndicator(self, indicator): - self.indicators[indicator.name] = indicator + if indicator: + self.indicators[indicator.name] = indicator def computeIndicators(self): '''Computes the collision course cosine only if the cosine is positive''' collisionCourseDotProducts = {}#[0]*int(self.timeInterval.length()) - collisionCourseCosines = {} + collisionCourseAngles = {} + velocityAngles = {} distances = {}#[0]*int(self.timeInterval.length()) speedDifferentials = {} + interactionInstants = [] for instant in self.timeInterval: - deltap = self.movingObject1.getPositionAtInstant(instant)-self.movingObject2.getPositionAtInstant(instant) - deltav = self.movingObject2.getVelocityAtInstant(instant)-self.movingObject1.getVelocityAtInstant(instant) + deltap = self.roadUser1.getPositionAtInstant(instant)-self.roadUser2.getPositionAtInstant(instant) + v1 = self.roadUser1.getVelocityAtInstant(instant) + v2 = self.roadUser2.getVelocityAtInstant(instant) + deltav = v2-v1 + velocityAngles[instant] = arccos(moving.Point.dot(v1, v2)/(v1.norm2()*v2.norm2())) collisionCourseDotProducts[instant] = moving.Point.dot(deltap, deltav) - distances[instant] = deltap.norm2() # todo compute closest feature distance, if features + distances[instant] = deltap.norm2() speedDifferentials[instant] = deltav.norm2() if collisionCourseDotProducts[instant] > 0: - collisionCourseCosines[instant] = arccos(collisionCourseDotProducts[instant]/(distances[instant]*speedDifferentials[instant])) + interactionInstants.append(instant) + collisionCourseAngles[instant] = arccos(collisionCourseDotProducts[instant]/(distances[instant]*speedDifferentials[instant])) + + if len(interactionInstants) >= 2: + self.interactionInterval = moving.TimeInterval(interactionInstants[0], interactionInstants[-1]) + else: + self.interactionInterval = moving.TimeInterval() + self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[0], collisionCourseDotProducts)) + self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[1], collisionCourseAngles)) + self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[2], distances)) + self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[4], velocityAngles)) + self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[5], speedDifferentials)) + + # if we have features, compute other indicators + if len(self.roadUser1.features) != 0 and len(self.roadUser2.features) != 0: + minDistance={} + for instant in self.timeInterval: + minDistance[instant] = moving.MovingObject.minDistance(self.roadUser1, self.roadUser2, instant) + self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[3], minDistance)) - # todo shorten the time intervals based on the interaction definition - self.addIndicator(indicators.SeverityIndicator('Collision Course Dot Product', collisionCourseDotProducts)) - self.addIndicator(indicators.SeverityIndicator('Distance', distances)) - self.addIndicator(indicators.SeverityIndicator('Speed Differential', speedDifferentials)) - self.addIndicator(indicators.SeverityIndicator('Collision Course Cosine', collisionCourseCosines)) + def computeCrossingsCollisions(self, predictionParameters, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None, nProcesses = 1): + '''Computes all crossing and collision points at each common instant for two road users. ''' + self.collisionPoints={} + self.crossingZones={} + TTCs = {} - # todo test for interaction instants and interval, compute indicators + if timeInterval: + commonTimeInterval = timeInterval + else: + commonTimeInterval = self.timeInterval + self.collisionPoints, self.crossingZones = prediction.computeCrossingsCollisions(predictionParameters, self.roadUser1, self.roadUser2, collisionDistanceThreshold, timeHorizon, computeCZ, debug, commonTimeInterval, nProcesses) + for i, cp in self.collisionPoints.iteritems(): + TTCs[i] = prediction.SafetyPoint.computeExpectedIndicator(cp) + # add probability of collision, and probability of successful evasive action + self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[7], TTCs)) + + if computeCZ: + pPETs = {} + for i in list(commonTimeInterval)[:-1]: + if len(self.crossingZones[i]) > 0: + pPETs[i] = prediction.SafetyPoint.computeExpectedIndicator(self.crossingZones[i]) + self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[9], pPETs)) def addVideoFilename(self,videoFilename): self.videoFilename= videoFilename def addInteractionType(self,interactionType): - ''' interaction types: conflict or collision if they are known''' - self.interactionType= interactionType + ''' interaction types: conflict or collision if they are known''' + self.interactionType= interactionType -def createInteractions(objects): - '''Create all interactions of two co-existing road users +def createInteractions(objects, _others = None): + '''Create all interactions of two co-existing road users''' + if _others != None: + others = _others - todo add test to compute categories?''' interactions = [] num = 0 for i in xrange(len(objects)): - for j in xrange(i): - commonTimeInterval = objects[i].commonTimeInterval(objects[j]) + if _others == None: + others = objects[:i] + for j in xrange(len(others)): + commonTimeInterval = objects[i].commonTimeInterval(others[j]) if not commonTimeInterval.empty(): - interactions.append(Interaction(num, commonTimeInterval, objects[i].num, objects[j].num, objects[i], objects[j])) + interactions.append(Interaction(num, commonTimeInterval, objects[i].num, others[j].num, objects[i], others[j])) num += 1 return interactions +def prototypeCluster(interactions, similarityMatrix, alignmentMatrix, indicatorName, minSimilarity): + '''Finds exemplar indicator time series for all interactions + Returns the prototype indices (in the interaction list) and the label of each indicator (interaction) + + if an indicator profile (time series) is different enough (<minSimilarity), + it will become a new prototype. + Non-prototype interactions will be assigned to an existing prototype''' + + # sort indicators based on length + indices = range(similarityMatrix.shape[0]) + def compare(i, j): + if len(interactions[i].getIndicator(indicatorName)) > len(interactions[j].getIndicator(indicatorName)): + return -1 + elif len(interactions[i].getIndicator(indicatorName)) == len(interactions[j].getIndicator(indicatorName)): + return 0 + else: + return 1 + indices.sort(compare) + # go through all indicators + prototypeIndices = [indices[0]] + for i in indices[1:]: + if similarityMatrix[i][prototypeIndices].max() < minSimilarity: + prototypeIndices.append(i) + + # assignment + labels = [-1]*similarityMatrix.shape[0] + indices = [i for i in range(similarityMatrix.shape[0]) if i not in prototypeIndices] + for i in prototypeIndices: + labels[i] = i + for i in indices: + prototypeIndex = similarityMatrix[i][prototypeIndices].argmax() + labels[i] = prototypeIndices[prototypeIndex] + + return prototypeIndices, labels + +def prototypeMultivariateCluster(interactions, similarityMatrics, indicatorNames, minSimilarities, minClusterSize): + '''Finds exmaple indicator time series (several indicators) for all interactions + + if any interaction indicator time series is different enough (<minSimilarity), + it will become a new prototype. + Non-prototype interactions will be assigned to an existing prototype if all indicators are similar enough''' + pass # TODO: #http://stackoverflow.com/questions/3288595/multiprocessing-using-pool-map-on-a-function-defined-in-a-class #http://www.rueckstiess.net/research/snippets/show/ca1d7d90 def calculateIndicatorPipe(pairs, predParam, timeHorizon=75,collisionDistanceThreshold=1.8): - collisionPoints, crossingZones = prediction.computeCrossingsCollisions(pairs.movingObject1, pairs.movingObject2, predParam, collisionDistanceThreshold, timeHorizon) + collisionPoints, crossingZones = prediction.computeCrossingsCollisions(pairs.roadUser1, pairs.roadUser2, predParam, collisionDistanceThreshold, timeHorizon) #print pairs.num # Ignore empty collision points empty = 1 @@ -137,7 +265,7 @@ #prog = Tools.ProgressBar(0, len(self.pairs), 77) #Removed in traffic-intelligenc port for j in xrange(len(self.pairs)): #prog.updateAmount(j) #Removed in traffic-intelligenc port - collisionPoints, crossingZones = prediction.computeCrossingsCollisions(self.pairs[j].movingObject1, self.pairs[j].movingObject2, predParam, collisionDistanceThreshold, timeHorizon) + collisionPoints, crossingZones = prediction.computeCrossingsCollisions(self.pairs[j].roadUser1, self.pairs[j].roadUser2, predParam, collisionDistanceThreshold, timeHorizon) # Ignore empty collision points empty = 1 @@ -181,7 +309,7 @@ lists.append(j.num) return lists - def getCPlist(self,indicatorThreshold=99999): + def getCPlist(self,indicatorThreshold=float('Inf')): lists = [] for j in self.pairs: if(j.hasCP): @@ -190,7 +318,7 @@ lists.append([k,j.CP[k][0]]) return lists - def getCZlist(self,indicatorThreshold=99999): + def getCZlist(self,indicatorThreshold=float('Inf')): lists = [] for j in self.pairs: if(j.hasCZ): @@ -229,7 +357,7 @@ if __name__ == "__main__": import doctest import unittest - #suite = doctest.DocFileSuite('tests/moving.txt') - suite = doctest.DocTestSuite() + suite = doctest.DocFileSuite('tests/events.txt') + #suite = doctest.DocTestSuite() unittest.TextTestRunner().run(suite)