view python/events.py @ 451:cd342a774806

Point/CurvilinearTrajectory/Interaction utiles
author Nicolas Saunier <nicolas.saunier@polymtl.ca>
date Thu, 13 Feb 2014 01:24:27 -0500
parents b64ff7fe7b45
children c59a47ce0209
line wrap: on
line source

#! /usr/bin/env python
'''Libraries for events
Interactions, pedestrian crossing...'''

import numpy as np
from numpy import arccos

import multiprocessing
import itertools

import moving, prediction, indicators, utils

__metaclass__ = type

class Interaction(moving.STObject):
    '''Class for an interaction between two road users 
    or a road user and an obstacle
    
    link to the moving objects
    contains the indicators in a dictionary with the names as keys
    '''

    categories = {'Head On': 0,
                  'rearend': 1,
                  'side': 2,
                  'parallel': 3}

    indicatorNames = ['Collision Course Dot Product',
                      'Collision Course Angle',
                      'Distance',
                      'Minimum Distance',
                      'Velocity Angle',
                      'Speed Differential',
                      'Collision Probability',
                      'Time to Collision',
                      'Probability of Successful Evasive Action',
                      'predicted Post Encroachment Time']

    indicatorNameToIndices = utils.inverseEnumeration(indicatorNames)

    indicatorShortNames = ['CCDP',
                           'CCA',
                           'Dist',
                           'MinDist',
                           'VA',
                           'SD',
                           'PoC',
                           'TTC',
                           'P(SEA)',
                           'pPET']

    indicatorUnits = ['',
                      'rad',
                      'm',
                      'm',
                      'rad',
                      'm/s',
                      '',
                      's',
                      '',
                      '']

    def __init__(self, num = None, timeInterval = None, roaduserNum1 = None, roaduserNum2 = None, roadUser1 = None, roadUser2 = None, categoryNum = None):
        moving.STObject.__init__(self, num, timeInterval)
        self.roadUserNumbers = set([roaduserNum1, roaduserNum2])
        self.roadUser1 = roadUser1
        self.roadUser2 = roadUser2
        self.categoryNum = categoryNum
        self.indicators = {}
        self.interactionInterval = None

    def getRoadUserNumbers(self):
        return self.roadUserNumbers

    def getIndicator(self, indicatorName):
        return self.indicators.get(indicatorName, None)

    def addIndicator(self, indicator):
        if indicator:
            self.indicators[indicator.name] = indicator

    def computeIndicators(self):
        '''Computes the collision course cosine only if the cosine is positive'''
        collisionCourseDotProducts = {}#[0]*int(self.timeInterval.length())
        collisionCourseAngles = {}
        velocityAngles = {}
        distances = {}#[0]*int(self.timeInterval.length())
        speedDifferentials = {}
        for instant in self.timeInterval:
            deltap = self.roadUser1.getPositionAtInstant(instant)-self.roadUser2.getPositionAtInstant(instant)
            v1 = self.roadUser1.getVelocityAtInstant(instant)
            v2 = self.roadUser2.getVelocityAtInstant(instant)
            deltav = v2-v1
            velocityAngles[instant] = arccos(moving.Point.dot(v1, v2)/(v1.norm2()*v2.norm2()))
            collisionCourseDotProducts[instant] = moving.Point.dot(deltap, deltav)
            distances[instant] = deltap.norm2()
            speedDifferentials[instant] = deltav.norm2()
            #if collisionCourseDotProducts[instant] > 0:
            collisionCourseAngles[instant] = arccos(collisionCourseDotProducts[instant]/(distances[instant]*speedDifferentials[instant]))

        # todo shorten the time intervals based on the interaction definition
        self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[0], collisionCourseDotProducts))
        self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[1], collisionCourseAngles))
        self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[2], distances))
        self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[4], velocityAngles))
        self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[5], speedDifferentials))

        # todo test for interaction instants and interval, compute indicators

        # if we have features, compute other indicators
        if self.roadUser1.features != None and self.roadUser2.features != None:
            minDistance={}
            for instant in self.timeInterval:
                minDistance[instant] = moving.MovingObject.minDistance(self.roadUser1, self.roadUser2, instant)
            self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[3], minDistance))

    def computeCrossingsCollisions(self, predictionParameters, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None):
        '''Computes all crossing and collision points at each common instant for two road users. '''
        self.collisionPoints={}
        self.crossingZones={}
        TTCs = {}

        if timeInterval:
            commonTimeInterval = timeInterval
        else:
            commonTimeInterval = self.timeInterval
        for i in list(commonTimeInterval)[:-1]: # do not look at the 1 last position/velocities, often with errors
            self.collisionPoints[i], self.crossingZones[i] = predictionParameters.computeCrossingsCollisionsAtInstant(i, self.roadUser1, self.roadUser2, collisionDistanceThreshold, timeHorizon, computeCZ, debug)
            if len(self.collisionPoints[i]) > 0:
                TTCs[i] = prediction.SafetyPoint.computeExpectedIndicator(self.collisionPoints[i])
        # add probability of collision, and probability of successful evasive action
        self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[7], TTCs))
        
        if computeCZ:
            pPETs = {}
            for i in list(commonTimeInterval)[:-1]:
                if len(self.crossingZones[i]) > 0:
                    pPETs[i] = prediction.SafetyPoint.computeExpectedIndicator(self.crossingZones[i])
            self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[9], pPETs))

    def addVideoFilename(self,videoFilename):
        self.videoFilename= videoFilename	

    def addInteractionType(self,interactionType):
	    ''' interaction types: conflict or collision if they are known'''
        self.interactionType= interactionType

    def computeInteractionInterval(self):
        ''' Computes the times during which the road users are getting closer,
        ie the collision course angle is positive
        (Other thresholds/indicators could be tried)'''
        collisionCourseDotProducts = self.getIndicator(Interaction.indicatorNames[0])
        inter = collisionCourseDotProducts.getTimeInterval()
        interactionInstants = [t for t in inter if collisionCourseDotProducts[t] >= 0]
        if len(interactionInstants) >= 2:
            self.interactionInterval = moving.TimeInterval(interactionInstants[0], interactionInstants[-1])
        else:
            self.interactionInterval = moving.TimeInterval()

def createInteractions(objects):
    '''Create all interactions of two co-existing road users

    todo add test to compute categories?'''
    interactions = []
    num = 0
    for i in xrange(len(objects)):
        for j in xrange(i):
            commonTimeInterval = objects[i].commonTimeInterval(objects[j])
            if not commonTimeInterval.empty():
                interactions.append(Interaction(num, commonTimeInterval, objects[i].num, objects[j].num, objects[i], objects[j]))
                num += 1
    return interactions

def prototypeCluster(interactions, similarityMatrix, alignmentMatrix, indicatorName, minSimilarity):
    '''Finds exemplar indicator time series for all interactions
    Returns the prototype indices (in the interaction list) and the label of each indicator (interaction)

    if an indicator profile (time series) is different enough (<minSimilarity), 
    it will become a new prototype. Otherwise, it will be assigned to an existing prototypes'''

    # sort indicators based on length
    indices = range(similarityMatrix.shape[0])
    def compare(i, j):
        if len(interactions[i].getIndicator(indicatorName)) > len(interactions[j].getIndicator(indicatorName)):
            return -1
        elif len(interactions[i].getIndicator(indicatorName)) == len(interactions[j].getIndicator(indicatorName)):
            return 0
        else:
            return 1
    indices.sort(compare)
    # go through all indicators
    prototypeIndices = [indices[0]]
    for i in indices[1:]:
        if similarityMatrix[i][prototypeIndices].max() < minSimilarity:
             prototypeIndices.append(i)

    # assignment
    labels = [-1]*similarityMatrix.shape[0]
    indices = [i for i in range(similarityMatrix.shape[0]) if i not in prototypeIndices]
    for i in prototypeIndices:
        labels[i] = i
    for i in indices:
        prototypeIndex = similarityMatrix[i][prototypeIndices].argmax()
        labels[i] = prototypeIndices[prototypeIndex]

    return prototypeIndices, labels

# TODO:
#http://stackoverflow.com/questions/3288595/multiprocessing-using-pool-map-on-a-function-defined-in-a-class
#http://www.rueckstiess.net/research/snippets/show/ca1d7d90
def calculateIndicatorPipe(pairs, predParam, timeHorizon=75,collisionDistanceThreshold=1.8):  
    collisionPoints, crossingZones = prediction.computeCrossingsCollisions(pairs.roadUser1, pairs.roadUser2, predParam, collisionDistanceThreshold, timeHorizon)      
    #print pairs.num    
    # Ignore empty collision points
    empty = 1
    for i in collisionPoints:
        if(collisionPoints[i] != []):
            empty = 0
    if(empty == 1):
        pairs.hasCP = 0
    else:
        pairs.hasCP = 1
    pairs.CP = collisionPoints
    
    # Ignore empty crossing zones
    empty = 1
    for i in crossingZones:
        if(crossingZones[i] != []):
            empty = 0
    if(empty == 1):
        pairs.hasCZ = 0
    else:
        pairs.hasCZ = 1
    pairs.CZ = crossingZones
    return pairs

def calculateIndicatorPipe_star(a_b):
    """Convert `f([1,2])` to `f(1,2)` call."""
    return calculateIndicatorPipe(*a_b)

class VehPairs():
    '''Create a veh-pairs object from objects list'''
    def __init__(self,objects):
        self.pairs = createInteractions(objects)
        self.interactionCount = 0
        self.CPcount = 0
        self.CZcount = 0
    
    # Process indicator calculation with support for multi-threading
    def calculateIndicators(self,predParam,threads=1,timeHorizon=75,collisionDistanceThreshold=1.8):       
        if(threads > 1):
            pool = multiprocessing.Pool(threads)
            self.pairs = pool.map(calculateIndicatorPipe_star, itertools.izip(self.pairs, itertools.repeat(predParam)))
            pool.close()
        else:
            #prog = Tools.ProgressBar(0, len(self.pairs), 77) #Removed in traffic-intelligenc port
            for j in xrange(len(self.pairs)):
                #prog.updateAmount(j) #Removed in traffic-intelligenc port
                collisionPoints, crossingZones = prediction.computeCrossingsCollisions(self.pairs[j].roadUser1, self.pairs[j].roadUser2, predParam, collisionDistanceThreshold, timeHorizon)      
                
                # Ignore empty collision points
                empty = 1
                for i in collisionPoints:
                    if(collisionPoints[i] != []):
                        empty = 0
                if(empty == 1):
                    self.pairs[j].hasCP = 0
                else:
                    self.pairs[j].hasCP = 1
                self.pairs[j].CP = collisionPoints
                
                # Ignore empty crossing zones
                empty = 1
                for i in crossingZones:
                    if(crossingZones[i] != []):
                        empty = 0
                if(empty == 1):
                    self.pairs[j].hasCZ = 0
                else:
                    self.pairs[j].hasCZ = 1
                self.pairs[j].CZ = crossingZones       
                
        for j in self.pairs:
            self.interactionCount = self.interactionCount + len(j.CP)
        self.CPcount = len(self.getCPlist())
        self.Czcount = len(self.getCZlist())
    
    
    def getPairsWCP(self):
        lists = []
        for j in self.pairs:
            if(j.hasCP):
                lists.append(j.num)
        return lists
        
    def getPairsWCZ(self):
        lists = []
        for j in self.pairs:
            if(j.hasCZ):
                lists.append(j.num)
        return lists
    
    def getCPlist(self,indicatorThreshold=99999):
        lists = []
        for j in self.pairs:
            if(j.hasCP):
                for k in j.CP:
                    if(j.CP[k] != [] and j.CP[k][0].indicator < indicatorThreshold):
                        lists.append([k,j.CP[k][0]])
        return lists
     
    def getCZlist(self,indicatorThreshold=99999):
        lists = []
        for j in self.pairs:
            if(j.hasCZ):
                for k in j.CZ:
                    if(j.CZ[k] != [] and j.CZ[k][0].indicator < indicatorThreshold):
                        lists.append([k,j.CZ[k][0]])
        return lists
        
    def genIndicatorHistogram(self, CPlist=False, bins=range(0,100,1)):
        if(not CPlist):
            CPlist = self.getCPlist()
        if(not CPlist):
            return False
        TTC_list = []
        for i in CPlist:
            TTC_list.append(i[1].indicator)
        histo = np.histogram(TTC_list,bins=bins)
        histo += (histo[0].astype(float)/np.sum(histo[0]),)
        return histo

class Crossing(moving.STObject):
    '''Class for the event of a street crossing

    TODO: detecter passage sur la chaussee
    identifier origines et destination (ou uniquement chaussee dans FOV)
    carac traversee
    detecter proximite veh (retirer si trop similaire simultanement
    carac interaction'''
    
    def __init__(self, roaduserNum = None, num = None, timeInterval = None):
        moving.STObject.__init__(self, num, timeInterval)
        self.roaduserNum = roaduserNum

    

if __name__ == "__main__":
    import doctest
    import unittest
    #suite = doctest.DocFileSuite('tests/moving.txt')
    suite = doctest.DocTestSuite()
    unittest.TextTestRunner().run(suite)