view python/events.py @ 601:e1f3b789c632

add a definition of interaction and collision course intervals
author Mohamed Gomaa
date Thu, 02 May 2013 11:35:45 -0400
parents 414b2e7cd873
children
line wrap: on
line source

#! /usr/bin/env python
'''Libraries for events
Interactions, pedestrian crossing...'''

import numpy as np
from numpy import arccos

import multiprocessing
import itertools

import moving
import prediction
import indicators

__metaclass__ = type

class Interaction(moving.STObject):
    '''Class for an interaction between two road users 
    or a road user and an obstacle
    
    link to the moving objects
    contains the indicators in a dictionary with the names as keys
    '''

    categories = {'headon': 0,
                  'rearend': 1,
                  'side': 2,
                  'parallel': 3}

    def __init__(self, num = None, timeInterval = None, roaduserNum1 = None, roaduserNum2 = None, movingObject1 = None, movingObject2 = None, categoryNum = None):
        moving.STObject.__init__(self, num, timeInterval)
        self.roaduserNumbers = set([roaduserNum1, roaduserNum2])
        self.movingObject1 = movingObject1
        self.movingObject2 = movingObject2
        self.categoryNum = categoryNum
        self.indicators = {}

    def getIndicator(self, indicatorName):
        return self.indicators[indicatorName]

    def addIndicator(self, indicator):
        self.indicators[indicator.name] = indicator

    def computeIndicators(self):
        '''Computes the collision course cosine only if the cosine is positive'''
        collisionCourseDotProducts = {}#[0]*int(self.timeInterval.length())
        collisionCourseAngles = {}
        distances = {}#[0]*int(self.timeInterval.length())
        speedDifferentials = {}
        velocityAngle= {}
        for instant in self.timeInterval:
            deltap = self.movingObject1.getPositionAtInstant(instant)-self.movingObject2.getPositionAtInstant(instant)
            deltav = self.movingObject2.getVelocityAtInstant(instant)-self.movingObject1.getVelocityAtInstant(instant)
            collisionCourseDotProducts[instant] = moving.Point.dot(deltap, deltav)
            velocityDotProduct= moving.Point.dot(self.movingObject1.getVelocityAtInstant(instant),self.movingObject2.getVelocityAtInstant(instant))
            velocityAngle[instant]= np.arccos(velocityDotProduct/ (self.movingObject1.getVelocityAtInstant(instant).norm2() * self.movingObject2.getVelocityAtInstant(instant).norm2()))
            distances[instant] = deltap.norm2()
            speedDifferentials[instant] = deltav.norm2()
            if collisionCourseDotProducts[instant] > 0:
                collisionCourseAngles[instant] = arccos(collisionCourseDotProducts[instant]/(distances[instant]*speedDifferentials[instant]))

        # todo shorten the time intervals based on the interaction definition
        self.addIndicator(indicators.SeverityIndicator('Collision Course Dot Product', collisionCourseDotProducts))
        self.addIndicator(indicators.SeverityIndicator('Distance', distances))
        self.addIndicator(indicators.SeverityIndicator('Speed Differential', speedDifferentials))
        self.addIndicator(indicators.SeverityIndicator('Velocity Angle', velocityAngle))
        self.addIndicator(indicators.SeverityIndicator('Collision Course Angle', collisionCourseAngles))

        # todo test for interaction instants and interval, compute indicators

        # if we have features, compute other indicators
        if self.movingObject1.features and self.movingObject2.features:
            from scipy.spatial.distance import cdist
            minDistance={}
            for instant in self.timeInterval:
                positions1 = [f.getPositionAtInstant(instant).astuple() for f in self.movingObject1.features if f.existsAtInstant(instant)]
                positions2 = [f.getPositionAtInstant(instant).astuple() for f in self.movingObject2.features if f.existsAtInstant(instant)]
                distance = cdist(positions1, positions2, metric = 'euclidean')
                minDistance[instant] = distance.min()
            self.addIndicator(indicators.SeverityIndicator('Minimum Distance', minDistance))

    def computeCollisionPoints(self, predictionParameters, collisionDistanceThreshold, timeHorizon):
        if self.movingObject1.features and self.movingObject2.features:
            collisionPoints,crossingZones = prediction.computeCrossingsCollisions(self.movingObject1, self.movingObject2, predictionParameters, collisionDistanceThreshold, timeHorizon,asWholeVehicle=True)
            self.addIndicator(indicators.SeverityIndicator('collisionPoints', collisionPoints))	
        else:
            print('Features not associated with objects')

    def defineInteractionInterval(self):
        ''' interaction defined as the first and last time that the collion course is positive'''
        keys=sorted(self.getIndicator('Collision Course Angle').values.keys())
        if keys!=[]:
            self.interactionInterval= moving.TimeInterval(keys[0],keys[-1])
        else:
            print('no interaction')
            self.interactionInterval=None
			
    def defineCollisionPointInterval(self):
        ''' collision point defined as the first and last time that the collion point exist'''
        values= self.getIndicator('TTC').values
        keys=sorted(values.keys())
		#keysReverse=sorted(values.keys(),reverse=True)		
        keysWithoutinf= [key for key in keys if values[key]!=np.inf]
        if keysWithoutinf!=[]:
            self.collisionPointInterval= moving.TimeInterval(min(keysWithoutinf),max(keysWithoutinf))
        else:
            print('no collision points')
            self.collisionPointInterval=None	
			
    def removeIndicatorExtraValues(self,indicatorName):
        ''' refine the indicators value wrt interaction definition'''
        self.defineInteractionInterval()
        refinedIndicator = {}
        if self.interactionInterval!= None:
            for i in xrange(self.interactionInterval.first,self.interactionInterval.last+1):
                if self.getIndicator(indicatorName).values.get(i)== None:
                    refinedIndicator[i]= None
                else:
                    refinedIndicator[i]= self.getIndicator(indicatorName).values[i]
        self.addIndicator(indicators.SeverityIndicator(str(indicatorName)+' New', refinedIndicator))

    def removeIndicatorExtraValuesCP(self,indicatorName):
        ''' refine the indicators value wrt interaction definition'''
        self.defineCollisionPointInterval()
        refinedIndicator = {}
        if self.collisionPointInterval!= None:
            for i in xrange(self.collisionPointInterval.first,self.collisionPointInterval.last+1):
                if self.getIndicator(indicatorName).values.get(i)== None:
                    refinedIndicator[i]= None
                else:
                    refinedIndicator[i]= self.getIndicator(indicatorName).values[i]
        self.addIndicator(indicators.SeverityIndicator(str(indicatorName)+'2', refinedIndicator))
		
    def addVideoFilename(self,videoFilename):
        self.videoFilename= videoFilename	

    def addInteractionType(self,interactionType):
	''' interaction types: conflict or collision if they are known'''
        self.interactionType= interactionType			

def createInteractions(objects):
    '''Create all interactions of two co-existing road users

    todo add test to compute categories?'''
    interactions = []
    num = 0
    for i in xrange(len(objects)):
        for j in xrange(i):
            commonTimeInterval = objects[i].commonTimeInterval(objects[j])
            if not commonTimeInterval.empty():
                interactions.append(Interaction(num, commonTimeInterval, objects[i].num, objects[j].num, objects[i], objects[j]))
                num += 1
    return interactions


# TODO:
#http://stackoverflow.com/questions/3288595/multiprocessing-using-pool-map-on-a-function-defined-in-a-class
#http://www.rueckstiess.net/research/snippets/show/ca1d7d90
def calculateIndicatorPipe(pairs, predParam, timeHorizon=75,collisionDistanceThreshold=1.8):  
    collisionPoints, crossingZones = prediction.computeCrossingsCollisions(pairs.movingObject1, pairs.movingObject2, predParam, collisionDistanceThreshold, timeHorizon)      
    #print pairs.num    
    # Ignore empty collision points
    empty = 1
    for i in collisionPoints:
        if(collisionPoints[i] != []):
            empty = 0
    if(empty == 1):
        pairs.hasCP = 0
    else:
        pairs.hasCP = 1
    pairs.CP = collisionPoints
    
    # Ignore empty crossing zones
    empty = 1
    for i in crossingZones:
        if(crossingZones[i] != []):
            empty = 0
    if(empty == 1):
        pairs.hasCZ = 0
    else:
        pairs.hasCZ = 1
    pairs.CZ = crossingZones
    return pairs

def calculateIndicatorPipe_star(a_b):
    """Convert `f([1,2])` to `f(1,2)` call."""
    return calculateIndicatorPipe(*a_b)

class VehPairs():
    '''Create a veh-pairs object from objects list'''
    def __init__(self,objects):
        self.pairs = createInteractions(objects)
        self.interactionCount = 0
        self.CPcount = 0
        self.CZcount = 0
    
    # Process indicator calculation with support for multi-threading
    def calculateIndicators(self,predParam,threads=1,timeHorizon=75,collisionDistanceThreshold=1.8):       
        if(threads > 1):
            pool = multiprocessing.Pool(threads)
            self.pairs = pool.map(calculateIndicatorPipe_star, itertools.izip(self.pairs, itertools.repeat(predParam)))
            pool.close()
        else:
            #prog = Tools.ProgressBar(0, len(self.pairs), 77) #Removed in traffic-intelligenc port
            for j in xrange(len(self.pairs)):
                #prog.updateAmount(j) #Removed in traffic-intelligenc port
                collisionPoints, crossingZones = prediction.computeCrossingsCollisions(self.pairs[j].movingObject1, self.pairs[j].movingObject2, predParam, collisionDistanceThreshold, timeHorizon)      
                
                # Ignore empty collision points
                empty = 1
                for i in collisionPoints:
                    if(collisionPoints[i] != []):
                        empty = 0
                if(empty == 1):
                    self.pairs[j].hasCP = 0
                else:
                    self.pairs[j].hasCP = 1
                self.pairs[j].CP = collisionPoints
                
                # Ignore empty crossing zones
                empty = 1
                for i in crossingZones:
                    if(crossingZones[i] != []):
                        empty = 0
                if(empty == 1):
                    self.pairs[j].hasCZ = 0
                else:
                    self.pairs[j].hasCZ = 1
                self.pairs[j].CZ = crossingZones       
                
        for j in self.pairs:
            self.interactionCount = self.interactionCount + len(j.CP)
        self.CPcount = len(self.getCPlist())
        self.Czcount = len(self.getCZlist())
    
    
    def getPairsWCP(self):
        lists = []
        for j in self.pairs:
            if(j.hasCP):
                lists.append(j.num)
        return lists
        
    def getPairsWCZ(self):
        lists = []
        for j in self.pairs:
            if(j.hasCZ):
                lists.append(j.num)
        return lists
    
    def getCPlist(self,indicatorThreshold=99999):
        lists = []
        for j in self.pairs:
            if(j.hasCP):
                for k in j.CP:
                    if(j.CP[k] != [] and j.CP[k][0].indicator < indicatorThreshold):
                        lists.append([k,j.CP[k][0]])
        return lists
     
    def getCZlist(self,indicatorThreshold=99999):
        lists = []
        for j in self.pairs:
            if(j.hasCZ):
                for k in j.CZ:
                    if(j.CZ[k] != [] and j.CZ[k][0].indicator < indicatorThreshold):
                        lists.append([k,j.CZ[k][0]])
        return lists
        
    def genIndicatorHistogram(self, CPlist=False, bins=range(0,100,1)):
        if(not CPlist):
            CPlist = self.getCPlist()
        if(not CPlist):
            return False
        TTC_list = []
        for i in CPlist:
            TTC_list.append(i[1].indicator)
        histo = np.histogram(TTC_list,bins=bins)
        histo += (histo[0].astype(float)/np.sum(histo[0]),)
        return histo

class Crossing(moving.STObject):
    '''Class for the event of a street crossing

    TODO: detecter passage sur la chaussee
    identifier origines et destination (ou uniquement chaussee dans FOV)
    carac traversee
    detecter proximite veh (retirer si trop similaire simultanement
    carac interaction'''
    
    def __init__(self, roaduserNum = None, num = None, timeInterval = None):
        moving.STObject.__init__(self, num, timeInterval)
        self.roaduserNum = roaduserNum

    

if __name__ == "__main__":
    import doctest
    import unittest
    #suite = doctest.DocFileSuite('tests/moving.txt')
    suite = doctest.DocTestSuite()
    unittest.TextTestRunner().run(suite)