view python/event.py @ 291:9f81218e497a

class VehPairs subsumes createInteractions(objects); legacy code remains
author Paul@BEAST-III
date Tue, 05 Feb 2013 15:45:33 -0500
parents dbe7e53334d7
children 8b2c8a4015f1
line wrap: on
line source

#! /usr/bin/env python
'''Libraries for events
Interactions, pedestrian crossing...'''

#import utils;
import moving
import prediction

__metaclass__ = type

## Subsumes createInteractions(objects) with a VehPair Object
class VehPairs():
    def __init__(self,objects):
        '''
        Create all pairs of two co-existing road users
        TODO: add test to compute categories?
        '''
        #print('    -------------') #Disabled for traffic-intelligence trunk
        #print('    Generating vehicle pairs...') #Disabled for traffic-intelligence trunk
        
        self.pairs = []
        num = 0
        for i in xrange(len(objects)):
            for j in xrange(i):
                commonTimeInterval = objects[i].commonTimeInterval(objects[j])
                if not commonTimeInterval.empty():
                    self.pairs.append(event.Interaction(num, commonTimeInterval, objects[i].num, objects[j].num, objects[i], objects[j]))
                    num += 1
    
    def calculateIndicators(self,predParam,frameRate=15,collisionDistanceThreshold=1.8,timeHorizonMultiplier=5):
        #print('    -------------') #Disabled for traffic-intelligence trunk
        #print('    Calculating time-to-collision...') #Disabled for traffic-intelligence trunk
        
        timeHorizon = frameRate*timeHorizonMultiplier # prediction time Horizon = 1.5 s (reaction time) (5 second)
        
        for params in [predParam]:
            
            #prog = Tools.ProgressBar(0, len(self.pairs), 77)  #Disabled for traffic-intelligence trunk (PVA Tools dependancy)
            for j in xrange(len(self.pairs)):
                #prog.updateAmount(j)  #Disabled for traffic-intelligence trunk (PVA Tools dependancy)
                
                collisionPoints, crossingZones = prediction.computeCrossingsCollisions(self.pairs[j].movingObject1, self.pairs[j].movingObject2, predParam, collisionDistanceThreshold, timeHorizon)      
                
                # Ignore empty collision points
                empty = 1
                for i in collisionPoints:
                    if(collisionPoints[i] != []):
                        empty = 0
                if(empty == 1):
                    self.pairs[j].hasCP = 0
                else:
                    self.pairs[j].hasCP = 1
                self.pairs[j].CP = collisionPoints
                
                # Ignore empty crossing zones
                empty = 1
                for i in crossingZones:
                    if(crossingZones[i] != []):
                        empty = 0
                if(empty == 1):
                    self.pairs[j].hasCZ = 0
                else:
                    self.pairs[j].hasCZ = 1
                self.pairs[j].CZ = crossingZones
        return
    
    def getPairsWCP(self):
        lists = []
        for j in self.pairs:
            if(j.hasCP):
                lists.append(j.num)
        return lists
        
    def getPairsWCZ(self):
        lists = []
        for j in self.pairs:
            if(j.hasCZ):
                lists.append(j.num)
        return lists
    
    def getCPlist(self):
        lists = []
        for j in self.pairs:
            if(j.hasCP):
                for k in j.CP:
                    if(j.CP[k] != []):
                        lists.append(j.CP[k])
        return lists
        
    def getCZlist(self):
        lists = []
        for j in self.pairs:
            if(j.hasCZ):
                for k in j.CZ:
                    if(j.CZ[k] != []):
                        lists.append(j.CZ[k])
        return lists

class Interaction(moving.STObject):
    '''Class for an interaction between two road users 
    or a road user and an obstacle
    
    link to the moving objects
    '''

    categories = {'headon': 0,
                  'rearend': 1,
                  'side': 2,
                  'parallel': 3}

    def __init__(self, num = None, timeInterval = None, roaduserNum1 = None, roaduserNum2 = None, movingObject1 = None, movingObject2 = None, categoryNum = None):
        moving.STObject.__init__(self, num, timeInterval)
        self.roaduserNumbers = set([roaduserNum1, roaduserNum2])
        self.movingObject1 = movingObject1
        self.movingObject2 = movingObject2
        self.categoryNum = categoryNum

    def getIndicator(self, indicatorName):
        if hasattr(self, 'indicators'):
            for i in self.indicators:
                if i.name == indicatorName:
                    return i
        else:
            return None

    def computeIndicators(self):
        '''Computes the collision course cosine only if the cosine is positive'''
        collisionCourseDotProduct = [0]*int(self.timeInterval.length())
        collisionCourseCosine = {}
        distances = [0]*int(self.timeInterval.length())
        for i,instant in enumerate(self.timeInterval):
            deltap = self.movingObject1.getPositionAtInstant(instant)-self.movingObject2.getPositionAtInstant(instant)
            deltav = self.movingObject2.getVelocityAtInstant(instant)-self.movingObject1.getVelocityAtInstant(instant)
            collisionCourseDotProduct[i] = moving.Point.dot(deltap, deltav)
            distances[i] = deltap.norm2()
            if collisionCourseDotProduct[i] > 0:
                collisionCourseCosine[instant] = collisionCourseDotProduct[i]/(distances[i]*deltav.norm2())
        self.indicators = [moving.SeverityIndicator('Collision Course Dot Product', collisionCourseDotProduct, self.timeInterval),
                           moving.SeverityIndicator('Distances', distances, self.timeInterval),
                           moving.SeverityIndicator('Collision Course Cosine', collisionCourseCosine)]


######====>BEGIN LEGACY CODE
def createInteractions(objects):
    '''Create all interactions of two co-existing road users

    todo add test to compute categories?'''
    interactions = []
    num = 0
    for i in xrange(len(objects)):
        for j in xrange(i):
            commonTimeInterval = objects[i].commonTimeInterval(objects[j])
            if not commonTimeInterval.empty():
                interactions.append(Interaction(num, commonTimeInterval, objects[i].num, objects[j].num, objects[i], objects[j]))
                num += 1
    return interactions
#<====END LEGACY CODE#####



class Crossing(moving.STObject):
    '''Class for the event of a street crossing

    TODO: detecter passage sur la chaussee
    identifier origines et destination (ou uniquement chaussee dans FOV)
    carac traversee
    detecter proximite veh (retirer si trop similaire simultanement
    carac interaction'''
    
    def __init__(self, roaduserNum = None, num = None, timeInterval = None):
        moving.STObject.__init__(self, num, timeInterval)
        self.roaduserNum = roaduserNum

    

if __name__ == "__main__":
    import doctest
    import unittest
    #suite = doctest.DocFileSuite('tests/moving.txt')
    suite = doctest.DocTestSuite()
    unittest.TextTestRunner().run(suite)