changeset 299:7e5fb4abd070

renaming event to events and correcting errors in indicator computation
author Nicolas Saunier <nicolas.saunier@polymtl.ca>
date Tue, 12 Feb 2013 18:08:00 -0500
parents 4a8b6a2de82f
children f65b828e5521
files python/event.py python/events.py
diffstat 2 files changed, 226 insertions(+), 224 deletions(-) [+]
line wrap: on
line diff
--- a/python/event.py	Tue Feb 12 17:52:13 2013 -0500
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,224 +0,0 @@
-#! /usr/bin/env python
-'''Libraries for events
-Interactions, pedestrian crossing...'''
-
-import numpy as np
-
-import multiprocessing
-import itertools
-
-import moving
-import prediction
-
-__metaclass__ = type
-
-class Interaction(moving.STObject):
-    '''Class for an interaction between two road users 
-    or a road user and an obstacle
-    
-    link to the moving objects
-    contains the indicators in a dictionary with the names as keys
-    '''
-
-    categories = {'headon': 0,
-                  'rearend': 1,
-                  'side': 2,
-                  'parallel': 3}
-
-    def __init__(self, num = None, timeInterval = None, roaduserNum1 = None, roaduserNum2 = None, movingObject1 = None, movingObject2 = None, categoryNum = None):
-        moving.STObject.__init__(self, num, timeInterval)
-        self.roaduserNumbers = set([roaduserNum1, roaduserNum2])
-        self.movingObject1 = movingObject1
-        self.movingObject2 = movingObject2
-        self.categoryNum = categoryNum
-        self.indicators = {}
-
-    def getIndicator(self, indicatorName):
-        return self.indicators[indicatorName]
-
-    def addIndicator(self, indicator):
-        self.indicators[indicator.name] = indicator
-
-    def computeIndicators(self):
-        '''Computes the collision course cosine only if the cosine is positive'''
-        collisionCourseDotProducts = {}#[0]*int(self.timeInterval.length())
-        collisionCourseCosines = {}
-        distances = {}#[0]*int(self.timeInterval.length())
-        speedDifferentials = {}
-        for instant in self.timeInterval:
-            deltap = self.movingObject1.getPositionAtInstant(instant)-self.movingObject2.getPositionAtInstant(instant)
-            deltav = self.movingObject2.getVelocityAtInstant(instant)-self.movingObject1.getVelocityAtInstant(instant)
-            collisionCourseDotProduct[instant] = moving.Point.dot(deltap, deltav)
-            distances[instant] = deltap.norm2()
-            speedDifferentials[instant] = deltav.norm2()
-            if collisionCourseDotProduct[instant] > 0:
-                collisionCourseCosine[instant] = collisionCourseDotProduct[instant]/(distances[instant]*speedDifferentials[instant])
-        # todo shorten the time intervals based on the interaction definition
-        self.addIndicator(moving.SeverityIndicator('Collision Course Dot Product', collisionCourseDotProducts))
-        self.addIndicator(moving.SeverityIndicator('Distance', distances))
-        self.addIndicator(moving.SeverityIndicator('Speed Differential', speedDifferentials))
-        self.addIndicator(moving.SeverityIndicator('Collision Course Cosine', collisionCourseCosines))
-
-
-def createInteractions(objects):
-    '''Create all interactions of two co-existing road users
-
-    todo add test to compute categories?'''
-    interactions = []
-    num = 0
-    for i in xrange(len(objects)):
-        for j in xrange(i):
-            commonTimeInterval = objects[i].commonTimeInterval(objects[j])
-            if not commonTimeInterval.empty():
-                interactions.append(Interaction(num, commonTimeInterval, objects[i].num, objects[j].num, objects[i], objects[j]))
-                num += 1
-    return interactions
-
-
-# TODO:
-#http://stackoverflow.com/questions/3288595/multiprocessing-using-pool-map-on-a-function-defined-in-a-class
-#http://www.rueckstiess.net/research/snippets/show/ca1d7d90
-def calculateIndicatorPipe(pairs, predParam, timeHorizon=75,collisionDistanceThreshold=1.8):  
-    collisionPoints, crossingZones = prediction.computeCrossingsCollisions(pairs.movingObject1, pairs.movingObject2, predParam, collisionDistanceThreshold, timeHorizon)      
-    #print pairs.num    
-    # Ignore empty collision points
-    empty = 1
-    for i in collisionPoints:
-        if(collisionPoints[i] != []):
-            empty = 0
-    if(empty == 1):
-        pairs.hasCP = 0
-    else:
-        pairs.hasCP = 1
-    pairs.CP = collisionPoints
-    
-    # Ignore empty crossing zones
-    empty = 1
-    for i in crossingZones:
-        if(crossingZones[i] != []):
-            empty = 0
-    if(empty == 1):
-        pairs.hasCZ = 0
-    else:
-        pairs.hasCZ = 1
-    pairs.CZ = crossingZones
-    return pairs
-
-def calculateIndicatorPipe_star(a_b):
-    """Convert `f([1,2])` to `f(1,2)` call."""
-    return calculateIndicatorPipe(*a_b)
-
-class VehPairs():
-    '''Create a veh-pairs object from objects list'''
-    def __init__(self,objects):
-        self.pairs = createInteractions(objects)
-        self.interactionCount = 0
-        self.CPcount = 0
-        self.CZcount = 0
-    
-    # Process indicator calculation with support for multi-threading
-    def calculateIndicators(self,predParam,threads=1,timeHorizon=75,collisionDistanceThreshold=1.8):       
-        if(threads > 1):
-            pool = multiprocessing.Pool(threads)
-            self.pairs = pool.map(calculateIndicatorPipe_star, itertools.izip(self.pairs, itertools.repeat(predParam)))
-            pool.close()
-        else:
-            #prog = Tools.ProgressBar(0, len(self.pairs), 77) #Removed in traffic-intelligenc port
-            for j in xrange(len(self.pairs)):
-                #prog.updateAmount(j) #Removed in traffic-intelligenc port
-                collisionPoints, crossingZones = prediction.computeCrossingsCollisions(self.pairs[j].movingObject1, self.pairs[j].movingObject2, predParam, collisionDistanceThreshold, timeHorizon)      
-                
-                # Ignore empty collision points
-                empty = 1
-                for i in collisionPoints:
-                    if(collisionPoints[i] != []):
-                        empty = 0
-                if(empty == 1):
-                    self.pairs[j].hasCP = 0
-                else:
-                    self.pairs[j].hasCP = 1
-                self.pairs[j].CP = collisionPoints
-                
-                # Ignore empty crossing zones
-                empty = 1
-                for i in crossingZones:
-                    if(crossingZones[i] != []):
-                        empty = 0
-                if(empty == 1):
-                    self.pairs[j].hasCZ = 0
-                else:
-                    self.pairs[j].hasCZ = 1
-                self.pairs[j].CZ = crossingZones       
-                
-        for j in self.pairs:
-            self.interactionCount = self.interactionCount + len(j.CP)
-        self.CPcount = len(self.getCPlist())
-        self.Czcount = len(self.getCZlist())
-    
-    
-    def getPairsWCP(self):
-        lists = []
-        for j in self.pairs:
-            if(j.hasCP):
-                lists.append(j.num)
-        return lists
-        
-    def getPairsWCZ(self):
-        lists = []
-        for j in self.pairs:
-            if(j.hasCZ):
-                lists.append(j.num)
-        return lists
-    
-    def getCPlist(self,indicatorThreshold=99999):
-        lists = []
-        for j in self.pairs:
-            if(j.hasCP):
-                for k in j.CP:
-                    if(j.CP[k] != [] and j.CP[k][0].indicator < indicatorThreshold):
-                        lists.append([k,j.CP[k][0]])
-        return lists
-     
-    def getCZlist(self,indicatorThreshold=99999):
-        lists = []
-        for j in self.pairs:
-            if(j.hasCZ):
-                for k in j.CZ:
-                    if(j.CZ[k] != [] and j.CZ[k][0].indicator < indicatorThreshold):
-                        lists.append([k,j.CZ[k][0]])
-        return lists
-        
-    def genIndicatorHistogram(self, CPlist=False, bins=range(0,100,1)):
-        if(not CPlist):
-            CPlist = self.getCPlist()
-        if(not CPlist):
-            return False
-        TTC_list = []
-        for i in CPlist:
-            TTC_list.append(i[1].indicator)
-        histo = np.histogram(TTC_list,bins=bins)
-        histo += (histo[0].astype(float)/np.sum(histo[0]),)
-        return histo
-
-class Crossing(moving.STObject):
-    '''Class for the event of a street crossing
-
-    TODO: detecter passage sur la chaussee
-    identifier origines et destination (ou uniquement chaussee dans FOV)
-    carac traversee
-    detecter proximite veh (retirer si trop similaire simultanement
-    carac interaction'''
-    
-    def __init__(self, roaduserNum = None, num = None, timeInterval = None):
-        moving.STObject.__init__(self, num, timeInterval)
-        self.roaduserNum = roaduserNum
-
-    
-
-if __name__ == "__main__":
-    import doctest
-    import unittest
-    #suite = doctest.DocFileSuite('tests/moving.txt')
-    suite = doctest.DocTestSuite()
-    unittest.TextTestRunner().run(suite)
-    
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/python/events.py	Tue Feb 12 18:08:00 2013 -0500
@@ -0,0 +1,226 @@
+#! /usr/bin/env python
+'''Libraries for events
+Interactions, pedestrian crossing...'''
+
+import numpy as np
+
+import multiprocessing
+import itertools
+
+import moving
+import prediction
+import indicators
+
+__metaclass__ = type
+
+class Interaction(moving.STObject):
+    '''Class for an interaction between two road users 
+    or a road user and an obstacle
+    
+    link to the moving objects
+    contains the indicators in a dictionary with the names as keys
+    '''
+
+    categories = {'headon': 0,
+                  'rearend': 1,
+                  'side': 2,
+                  'parallel': 3}
+
+    def __init__(self, num = None, timeInterval = None, roaduserNum1 = None, roaduserNum2 = None, movingObject1 = None, movingObject2 = None, categoryNum = None):
+        moving.STObject.__init__(self, num, timeInterval)
+        self.roaduserNumbers = set([roaduserNum1, roaduserNum2])
+        self.movingObject1 = movingObject1
+        self.movingObject2 = movingObject2
+        self.categoryNum = categoryNum
+        self.indicators = {}
+
+    def getIndicator(self, indicatorName):
+        return self.indicators[indicatorName]
+
+    def addIndicator(self, indicator):
+        self.indicators[indicator.name] = indicator
+
+    def computeIndicators(self):
+        '''Computes the collision course cosine only if the cosine is positive'''
+        collisionCourseDotProducts = {}#[0]*int(self.timeInterval.length())
+        collisionCourseCosines = {}
+        distances = {}#[0]*int(self.timeInterval.length())
+        speedDifferentials = {}
+        for instant in self.timeInterval:
+            deltap = self.movingObject1.getPositionAtInstant(instant)-self.movingObject2.getPositionAtInstant(instant)
+            deltav = self.movingObject2.getVelocityAtInstant(instant)-self.movingObject1.getVelocityAtInstant(instant)
+            collisionCourseDotProducts[instant] = moving.Point.dot(deltap, deltav)
+            distances[instant] = deltap.norm2()
+            speedDifferentials[instant] = deltav.norm2()
+            if collisionCourseDotProducts[instant] > 0:
+                collisionCourseCosines[instant] = collisionCourseDotProducts[instant]/(distances[instant]*speedDifferentials[instant])
+        # todo shorten the time intervals based on the interaction definition
+        # todos change cosines to angles
+        self.addIndicator(indicators.SeverityIndicator('Collision Course Dot Product', collisionCourseDotProducts))
+        self.addIndicator(indicators.SeverityIndicator('Distance', distances))
+        self.addIndicator(indicators.SeverityIndicator('Speed Differential', speedDifferentials))
+        self.addIndicator(indicators.SeverityIndicator('Collision Course Cosine', collisionCourseCosines))
+
+
+def createInteractions(objects):
+    '''Create all interactions of two co-existing road users
+
+    todo add test to compute categories?'''
+    interactions = []
+    num = 0
+    for i in xrange(len(objects)):
+        for j in xrange(i):
+            commonTimeInterval = objects[i].commonTimeInterval(objects[j])
+            if not commonTimeInterval.empty():
+                interactions.append(Interaction(num, commonTimeInterval, objects[i].num, objects[j].num, objects[i], objects[j]))
+                num += 1
+    return interactions
+
+
+# TODO:
+#http://stackoverflow.com/questions/3288595/multiprocessing-using-pool-map-on-a-function-defined-in-a-class
+#http://www.rueckstiess.net/research/snippets/show/ca1d7d90
+def calculateIndicatorPipe(pairs, predParam, timeHorizon=75,collisionDistanceThreshold=1.8):  
+    collisionPoints, crossingZones = prediction.computeCrossingsCollisions(pairs.movingObject1, pairs.movingObject2, predParam, collisionDistanceThreshold, timeHorizon)      
+    #print pairs.num    
+    # Ignore empty collision points
+    empty = 1
+    for i in collisionPoints:
+        if(collisionPoints[i] != []):
+            empty = 0
+    if(empty == 1):
+        pairs.hasCP = 0
+    else:
+        pairs.hasCP = 1
+    pairs.CP = collisionPoints
+    
+    # Ignore empty crossing zones
+    empty = 1
+    for i in crossingZones:
+        if(crossingZones[i] != []):
+            empty = 0
+    if(empty == 1):
+        pairs.hasCZ = 0
+    else:
+        pairs.hasCZ = 1
+    pairs.CZ = crossingZones
+    return pairs
+
+def calculateIndicatorPipe_star(a_b):
+    """Convert `f([1,2])` to `f(1,2)` call."""
+    return calculateIndicatorPipe(*a_b)
+
+class VehPairs():
+    '''Create a veh-pairs object from objects list'''
+    def __init__(self,objects):
+        self.pairs = createInteractions(objects)
+        self.interactionCount = 0
+        self.CPcount = 0
+        self.CZcount = 0
+    
+    # Process indicator calculation with support for multi-threading
+    def calculateIndicators(self,predParam,threads=1,timeHorizon=75,collisionDistanceThreshold=1.8):       
+        if(threads > 1):
+            pool = multiprocessing.Pool(threads)
+            self.pairs = pool.map(calculateIndicatorPipe_star, itertools.izip(self.pairs, itertools.repeat(predParam)))
+            pool.close()
+        else:
+            #prog = Tools.ProgressBar(0, len(self.pairs), 77) #Removed in traffic-intelligenc port
+            for j in xrange(len(self.pairs)):
+                #prog.updateAmount(j) #Removed in traffic-intelligenc port
+                collisionPoints, crossingZones = prediction.computeCrossingsCollisions(self.pairs[j].movingObject1, self.pairs[j].movingObject2, predParam, collisionDistanceThreshold, timeHorizon)      
+                
+                # Ignore empty collision points
+                empty = 1
+                for i in collisionPoints:
+                    if(collisionPoints[i] != []):
+                        empty = 0
+                if(empty == 1):
+                    self.pairs[j].hasCP = 0
+                else:
+                    self.pairs[j].hasCP = 1
+                self.pairs[j].CP = collisionPoints
+                
+                # Ignore empty crossing zones
+                empty = 1
+                for i in crossingZones:
+                    if(crossingZones[i] != []):
+                        empty = 0
+                if(empty == 1):
+                    self.pairs[j].hasCZ = 0
+                else:
+                    self.pairs[j].hasCZ = 1
+                self.pairs[j].CZ = crossingZones       
+                
+        for j in self.pairs:
+            self.interactionCount = self.interactionCount + len(j.CP)
+        self.CPcount = len(self.getCPlist())
+        self.Czcount = len(self.getCZlist())
+    
+    
+    def getPairsWCP(self):
+        lists = []
+        for j in self.pairs:
+            if(j.hasCP):
+                lists.append(j.num)
+        return lists
+        
+    def getPairsWCZ(self):
+        lists = []
+        for j in self.pairs:
+            if(j.hasCZ):
+                lists.append(j.num)
+        return lists
+    
+    def getCPlist(self,indicatorThreshold=99999):
+        lists = []
+        for j in self.pairs:
+            if(j.hasCP):
+                for k in j.CP:
+                    if(j.CP[k] != [] and j.CP[k][0].indicator < indicatorThreshold):
+                        lists.append([k,j.CP[k][0]])
+        return lists
+     
+    def getCZlist(self,indicatorThreshold=99999):
+        lists = []
+        for j in self.pairs:
+            if(j.hasCZ):
+                for k in j.CZ:
+                    if(j.CZ[k] != [] and j.CZ[k][0].indicator < indicatorThreshold):
+                        lists.append([k,j.CZ[k][0]])
+        return lists
+        
+    def genIndicatorHistogram(self, CPlist=False, bins=range(0,100,1)):
+        if(not CPlist):
+            CPlist = self.getCPlist()
+        if(not CPlist):
+            return False
+        TTC_list = []
+        for i in CPlist:
+            TTC_list.append(i[1].indicator)
+        histo = np.histogram(TTC_list,bins=bins)
+        histo += (histo[0].astype(float)/np.sum(histo[0]),)
+        return histo
+
+class Crossing(moving.STObject):
+    '''Class for the event of a street crossing
+
+    TODO: detecter passage sur la chaussee
+    identifier origines et destination (ou uniquement chaussee dans FOV)
+    carac traversee
+    detecter proximite veh (retirer si trop similaire simultanement
+    carac interaction'''
+    
+    def __init__(self, roaduserNum = None, num = None, timeInterval = None):
+        moving.STObject.__init__(self, num, timeInterval)
+        self.roaduserNum = roaduserNum
+
+    
+
+if __name__ == "__main__":
+    import doctest
+    import unittest
+    #suite = doctest.DocFileSuite('tests/moving.txt')
+    suite = doctest.DocTestSuite()
+    unittest.TextTestRunner().run(suite)
+