Mercurial Hosting > traffic-intelligence
changeset 299:7e5fb4abd070
renaming event to events and correcting errors in indicator computation
author | Nicolas Saunier <nicolas.saunier@polymtl.ca> |
---|---|
date | Tue, 12 Feb 2013 18:08:00 -0500 |
parents | 4a8b6a2de82f |
children | f65b828e5521 |
files | python/event.py python/events.py |
diffstat | 2 files changed, 226 insertions(+), 224 deletions(-) [+] |
line wrap: on
line diff
--- a/python/event.py Tue Feb 12 17:52:13 2013 -0500 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,224 +0,0 @@ -#! /usr/bin/env python -'''Libraries for events -Interactions, pedestrian crossing...''' - -import numpy as np - -import multiprocessing -import itertools - -import moving -import prediction - -__metaclass__ = type - -class Interaction(moving.STObject): - '''Class for an interaction between two road users - or a road user and an obstacle - - link to the moving objects - contains the indicators in a dictionary with the names as keys - ''' - - categories = {'headon': 0, - 'rearend': 1, - 'side': 2, - 'parallel': 3} - - def __init__(self, num = None, timeInterval = None, roaduserNum1 = None, roaduserNum2 = None, movingObject1 = None, movingObject2 = None, categoryNum = None): - moving.STObject.__init__(self, num, timeInterval) - self.roaduserNumbers = set([roaduserNum1, roaduserNum2]) - self.movingObject1 = movingObject1 - self.movingObject2 = movingObject2 - self.categoryNum = categoryNum - self.indicators = {} - - def getIndicator(self, indicatorName): - return self.indicators[indicatorName] - - def addIndicator(self, indicator): - self.indicators[indicator.name] = indicator - - def computeIndicators(self): - '''Computes the collision course cosine only if the cosine is positive''' - collisionCourseDotProducts = {}#[0]*int(self.timeInterval.length()) - collisionCourseCosines = {} - distances = {}#[0]*int(self.timeInterval.length()) - speedDifferentials = {} - for instant in self.timeInterval: - deltap = self.movingObject1.getPositionAtInstant(instant)-self.movingObject2.getPositionAtInstant(instant) - deltav = self.movingObject2.getVelocityAtInstant(instant)-self.movingObject1.getVelocityAtInstant(instant) - collisionCourseDotProduct[instant] = moving.Point.dot(deltap, deltav) - distances[instant] = deltap.norm2() - speedDifferentials[instant] = deltav.norm2() - if collisionCourseDotProduct[instant] > 0: - collisionCourseCosine[instant] = collisionCourseDotProduct[instant]/(distances[instant]*speedDifferentials[instant]) - # todo shorten the time intervals based on the interaction definition - self.addIndicator(moving.SeverityIndicator('Collision Course Dot Product', collisionCourseDotProducts)) - self.addIndicator(moving.SeverityIndicator('Distance', distances)) - self.addIndicator(moving.SeverityIndicator('Speed Differential', speedDifferentials)) - self.addIndicator(moving.SeverityIndicator('Collision Course Cosine', collisionCourseCosines)) - - -def createInteractions(objects): - '''Create all interactions of two co-existing road users - - todo add test to compute categories?''' - interactions = [] - num = 0 - for i in xrange(len(objects)): - for j in xrange(i): - commonTimeInterval = objects[i].commonTimeInterval(objects[j]) - if not commonTimeInterval.empty(): - interactions.append(Interaction(num, commonTimeInterval, objects[i].num, objects[j].num, objects[i], objects[j])) - num += 1 - return interactions - - -# TODO: -#http://stackoverflow.com/questions/3288595/multiprocessing-using-pool-map-on-a-function-defined-in-a-class -#http://www.rueckstiess.net/research/snippets/show/ca1d7d90 -def calculateIndicatorPipe(pairs, predParam, timeHorizon=75,collisionDistanceThreshold=1.8): - collisionPoints, crossingZones = prediction.computeCrossingsCollisions(pairs.movingObject1, pairs.movingObject2, predParam, collisionDistanceThreshold, timeHorizon) - #print pairs.num - # Ignore empty collision points - empty = 1 - for i in collisionPoints: - if(collisionPoints[i] != []): - empty = 0 - if(empty == 1): - pairs.hasCP = 0 - else: - pairs.hasCP = 1 - pairs.CP = collisionPoints - - # Ignore empty crossing zones - empty = 1 - for i in crossingZones: - if(crossingZones[i] != []): - empty = 0 - if(empty == 1): - pairs.hasCZ = 0 - else: - pairs.hasCZ = 1 - pairs.CZ = crossingZones - return pairs - -def calculateIndicatorPipe_star(a_b): - """Convert `f([1,2])` to `f(1,2)` call.""" - return calculateIndicatorPipe(*a_b) - -class VehPairs(): - '''Create a veh-pairs object from objects list''' - def __init__(self,objects): - self.pairs = createInteractions(objects) - self.interactionCount = 0 - self.CPcount = 0 - self.CZcount = 0 - - # Process indicator calculation with support for multi-threading - def calculateIndicators(self,predParam,threads=1,timeHorizon=75,collisionDistanceThreshold=1.8): - if(threads > 1): - pool = multiprocessing.Pool(threads) - self.pairs = pool.map(calculateIndicatorPipe_star, itertools.izip(self.pairs, itertools.repeat(predParam))) - pool.close() - else: - #prog = Tools.ProgressBar(0, len(self.pairs), 77) #Removed in traffic-intelligenc port - for j in xrange(len(self.pairs)): - #prog.updateAmount(j) #Removed in traffic-intelligenc port - collisionPoints, crossingZones = prediction.computeCrossingsCollisions(self.pairs[j].movingObject1, self.pairs[j].movingObject2, predParam, collisionDistanceThreshold, timeHorizon) - - # Ignore empty collision points - empty = 1 - for i in collisionPoints: - if(collisionPoints[i] != []): - empty = 0 - if(empty == 1): - self.pairs[j].hasCP = 0 - else: - self.pairs[j].hasCP = 1 - self.pairs[j].CP = collisionPoints - - # Ignore empty crossing zones - empty = 1 - for i in crossingZones: - if(crossingZones[i] != []): - empty = 0 - if(empty == 1): - self.pairs[j].hasCZ = 0 - else: - self.pairs[j].hasCZ = 1 - self.pairs[j].CZ = crossingZones - - for j in self.pairs: - self.interactionCount = self.interactionCount + len(j.CP) - self.CPcount = len(self.getCPlist()) - self.Czcount = len(self.getCZlist()) - - - def getPairsWCP(self): - lists = [] - for j in self.pairs: - if(j.hasCP): - lists.append(j.num) - return lists - - def getPairsWCZ(self): - lists = [] - for j in self.pairs: - if(j.hasCZ): - lists.append(j.num) - return lists - - def getCPlist(self,indicatorThreshold=99999): - lists = [] - for j in self.pairs: - if(j.hasCP): - for k in j.CP: - if(j.CP[k] != [] and j.CP[k][0].indicator < indicatorThreshold): - lists.append([k,j.CP[k][0]]) - return lists - - def getCZlist(self,indicatorThreshold=99999): - lists = [] - for j in self.pairs: - if(j.hasCZ): - for k in j.CZ: - if(j.CZ[k] != [] and j.CZ[k][0].indicator < indicatorThreshold): - lists.append([k,j.CZ[k][0]]) - return lists - - def genIndicatorHistogram(self, CPlist=False, bins=range(0,100,1)): - if(not CPlist): - CPlist = self.getCPlist() - if(not CPlist): - return False - TTC_list = [] - for i in CPlist: - TTC_list.append(i[1].indicator) - histo = np.histogram(TTC_list,bins=bins) - histo += (histo[0].astype(float)/np.sum(histo[0]),) - return histo - -class Crossing(moving.STObject): - '''Class for the event of a street crossing - - TODO: detecter passage sur la chaussee - identifier origines et destination (ou uniquement chaussee dans FOV) - carac traversee - detecter proximite veh (retirer si trop similaire simultanement - carac interaction''' - - def __init__(self, roaduserNum = None, num = None, timeInterval = None): - moving.STObject.__init__(self, num, timeInterval) - self.roaduserNum = roaduserNum - - - -if __name__ == "__main__": - import doctest - import unittest - #suite = doctest.DocFileSuite('tests/moving.txt') - suite = doctest.DocTestSuite() - unittest.TextTestRunner().run(suite) -
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/python/events.py Tue Feb 12 18:08:00 2013 -0500 @@ -0,0 +1,226 @@ +#! /usr/bin/env python +'''Libraries for events +Interactions, pedestrian crossing...''' + +import numpy as np + +import multiprocessing +import itertools + +import moving +import prediction +import indicators + +__metaclass__ = type + +class Interaction(moving.STObject): + '''Class for an interaction between two road users + or a road user and an obstacle + + link to the moving objects + contains the indicators in a dictionary with the names as keys + ''' + + categories = {'headon': 0, + 'rearend': 1, + 'side': 2, + 'parallel': 3} + + def __init__(self, num = None, timeInterval = None, roaduserNum1 = None, roaduserNum2 = None, movingObject1 = None, movingObject2 = None, categoryNum = None): + moving.STObject.__init__(self, num, timeInterval) + self.roaduserNumbers = set([roaduserNum1, roaduserNum2]) + self.movingObject1 = movingObject1 + self.movingObject2 = movingObject2 + self.categoryNum = categoryNum + self.indicators = {} + + def getIndicator(self, indicatorName): + return self.indicators[indicatorName] + + def addIndicator(self, indicator): + self.indicators[indicator.name] = indicator + + def computeIndicators(self): + '''Computes the collision course cosine only if the cosine is positive''' + collisionCourseDotProducts = {}#[0]*int(self.timeInterval.length()) + collisionCourseCosines = {} + distances = {}#[0]*int(self.timeInterval.length()) + speedDifferentials = {} + for instant in self.timeInterval: + deltap = self.movingObject1.getPositionAtInstant(instant)-self.movingObject2.getPositionAtInstant(instant) + deltav = self.movingObject2.getVelocityAtInstant(instant)-self.movingObject1.getVelocityAtInstant(instant) + collisionCourseDotProducts[instant] = moving.Point.dot(deltap, deltav) + distances[instant] = deltap.norm2() + speedDifferentials[instant] = deltav.norm2() + if collisionCourseDotProducts[instant] > 0: + collisionCourseCosines[instant] = collisionCourseDotProducts[instant]/(distances[instant]*speedDifferentials[instant]) + # todo shorten the time intervals based on the interaction definition + # todos change cosines to angles + self.addIndicator(indicators.SeverityIndicator('Collision Course Dot Product', collisionCourseDotProducts)) + self.addIndicator(indicators.SeverityIndicator('Distance', distances)) + self.addIndicator(indicators.SeverityIndicator('Speed Differential', speedDifferentials)) + self.addIndicator(indicators.SeverityIndicator('Collision Course Cosine', collisionCourseCosines)) + + +def createInteractions(objects): + '''Create all interactions of two co-existing road users + + todo add test to compute categories?''' + interactions = [] + num = 0 + for i in xrange(len(objects)): + for j in xrange(i): + commonTimeInterval = objects[i].commonTimeInterval(objects[j]) + if not commonTimeInterval.empty(): + interactions.append(Interaction(num, commonTimeInterval, objects[i].num, objects[j].num, objects[i], objects[j])) + num += 1 + return interactions + + +# TODO: +#http://stackoverflow.com/questions/3288595/multiprocessing-using-pool-map-on-a-function-defined-in-a-class +#http://www.rueckstiess.net/research/snippets/show/ca1d7d90 +def calculateIndicatorPipe(pairs, predParam, timeHorizon=75,collisionDistanceThreshold=1.8): + collisionPoints, crossingZones = prediction.computeCrossingsCollisions(pairs.movingObject1, pairs.movingObject2, predParam, collisionDistanceThreshold, timeHorizon) + #print pairs.num + # Ignore empty collision points + empty = 1 + for i in collisionPoints: + if(collisionPoints[i] != []): + empty = 0 + if(empty == 1): + pairs.hasCP = 0 + else: + pairs.hasCP = 1 + pairs.CP = collisionPoints + + # Ignore empty crossing zones + empty = 1 + for i in crossingZones: + if(crossingZones[i] != []): + empty = 0 + if(empty == 1): + pairs.hasCZ = 0 + else: + pairs.hasCZ = 1 + pairs.CZ = crossingZones + return pairs + +def calculateIndicatorPipe_star(a_b): + """Convert `f([1,2])` to `f(1,2)` call.""" + return calculateIndicatorPipe(*a_b) + +class VehPairs(): + '''Create a veh-pairs object from objects list''' + def __init__(self,objects): + self.pairs = createInteractions(objects) + self.interactionCount = 0 + self.CPcount = 0 + self.CZcount = 0 + + # Process indicator calculation with support for multi-threading + def calculateIndicators(self,predParam,threads=1,timeHorizon=75,collisionDistanceThreshold=1.8): + if(threads > 1): + pool = multiprocessing.Pool(threads) + self.pairs = pool.map(calculateIndicatorPipe_star, itertools.izip(self.pairs, itertools.repeat(predParam))) + pool.close() + else: + #prog = Tools.ProgressBar(0, len(self.pairs), 77) #Removed in traffic-intelligenc port + for j in xrange(len(self.pairs)): + #prog.updateAmount(j) #Removed in traffic-intelligenc port + collisionPoints, crossingZones = prediction.computeCrossingsCollisions(self.pairs[j].movingObject1, self.pairs[j].movingObject2, predParam, collisionDistanceThreshold, timeHorizon) + + # Ignore empty collision points + empty = 1 + for i in collisionPoints: + if(collisionPoints[i] != []): + empty = 0 + if(empty == 1): + self.pairs[j].hasCP = 0 + else: + self.pairs[j].hasCP = 1 + self.pairs[j].CP = collisionPoints + + # Ignore empty crossing zones + empty = 1 + for i in crossingZones: + if(crossingZones[i] != []): + empty = 0 + if(empty == 1): + self.pairs[j].hasCZ = 0 + else: + self.pairs[j].hasCZ = 1 + self.pairs[j].CZ = crossingZones + + for j in self.pairs: + self.interactionCount = self.interactionCount + len(j.CP) + self.CPcount = len(self.getCPlist()) + self.Czcount = len(self.getCZlist()) + + + def getPairsWCP(self): + lists = [] + for j in self.pairs: + if(j.hasCP): + lists.append(j.num) + return lists + + def getPairsWCZ(self): + lists = [] + for j in self.pairs: + if(j.hasCZ): + lists.append(j.num) + return lists + + def getCPlist(self,indicatorThreshold=99999): + lists = [] + for j in self.pairs: + if(j.hasCP): + for k in j.CP: + if(j.CP[k] != [] and j.CP[k][0].indicator < indicatorThreshold): + lists.append([k,j.CP[k][0]]) + return lists + + def getCZlist(self,indicatorThreshold=99999): + lists = [] + for j in self.pairs: + if(j.hasCZ): + for k in j.CZ: + if(j.CZ[k] != [] and j.CZ[k][0].indicator < indicatorThreshold): + lists.append([k,j.CZ[k][0]]) + return lists + + def genIndicatorHistogram(self, CPlist=False, bins=range(0,100,1)): + if(not CPlist): + CPlist = self.getCPlist() + if(not CPlist): + return False + TTC_list = [] + for i in CPlist: + TTC_list.append(i[1].indicator) + histo = np.histogram(TTC_list,bins=bins) + histo += (histo[0].astype(float)/np.sum(histo[0]),) + return histo + +class Crossing(moving.STObject): + '''Class for the event of a street crossing + + TODO: detecter passage sur la chaussee + identifier origines et destination (ou uniquement chaussee dans FOV) + carac traversee + detecter proximite veh (retirer si trop similaire simultanement + carac interaction''' + + def __init__(self, roaduserNum = None, num = None, timeInterval = None): + moving.STObject.__init__(self, num, timeInterval) + self.roaduserNum = roaduserNum + + + +if __name__ == "__main__": + import doctest + import unittest + #suite = doctest.DocFileSuite('tests/moving.txt') + suite = doctest.DocTestSuite() + unittest.TextTestRunner().run(suite) +