view python/events.py @ 558:a80ef6931fd8

updated safety-analysis to test multiprocessing
author Nicolas Saunier <nicolas.saunier@polymtl.ca>
date Mon, 14 Jul 2014 17:44:09 -0400
parents 21bdeb29f855
children 806df5f61c03
line wrap: on
line source

#! /usr/bin/env python
'''Libraries for events
Interactions, pedestrian crossing...'''

import numpy as np
from numpy import arccos

import multiprocessing
import itertools

import moving, prediction, indicators, utils

__metaclass__ = type

class Interaction(moving.STObject):
    '''Class for an interaction between two road users 
    or a road user and an obstacle
    
    link to the moving objects
    contains the indicators in a dictionary with the names as keys
    '''

    categories = {'Head On': 0,
                  'rearend': 1,
                  'side': 2,
                  'parallel': 3}

    indicatorNames = ['Collision Course Dot Product',
                      'Collision Course Angle',
                      'Distance',
                      'Minimum Distance',
                      'Velocity Angle',
                      'Speed Differential',
                      'Collision Probability',
                      'Time to Collision',
                      'Probability of Successful Evasive Action',
                      'predicted Post Encroachment Time']

    indicatorNameToIndices = utils.inverseEnumeration(indicatorNames)

    indicatorShortNames = ['CCDP',
                           'CCA',
                           'Dist',
                           'MinDist',
                           'VA',
                           'SD',
                           'PoC',
                           'TTC',
                           'P(SEA)',
                           'pPET']

    indicatorUnits = ['',
                      'rad',
                      'm',
                      'm',
                      'rad',
                      'm/s',
                      '',
                      's',
                      '',
                      '']

    def __init__(self, num = None, timeInterval = None, roaduserNum1 = None, roaduserNum2 = None, roadUser1 = None, roadUser2 = None, categoryNum = None):
        moving.STObject.__init__(self, num, timeInterval)
        self.roadUserNumbers = set([roaduserNum1, roaduserNum2])
        self.roadUser1 = roadUser1
        self.roadUser2 = roadUser2
        self.categoryNum = categoryNum
        self.indicators = {}
        self.interactionInterval = None

    def getRoadUserNumbers(self):
        return self.roadUserNumbers

    def getIndicator(self, indicatorName):
        return self.indicators.get(indicatorName, None)

    def addIndicator(self, indicator):
        if indicator:
            self.indicators[indicator.name] = indicator

    def computeIndicators(self):
        '''Computes the collision course cosine only if the cosine is positive'''
        collisionCourseDotProducts = {}#[0]*int(self.timeInterval.length())
        collisionCourseAngles = {}
        velocityAngles = {}
        distances = {}#[0]*int(self.timeInterval.length())
        speedDifferentials = {}
        interactionInstants = []
        for instant in self.timeInterval:
            deltap = self.roadUser1.getPositionAtInstant(instant)-self.roadUser2.getPositionAtInstant(instant)
            v1 = self.roadUser1.getVelocityAtInstant(instant)
            v2 = self.roadUser2.getVelocityAtInstant(instant)
            deltav = v2-v1
            velocityAngles[instant] = arccos(moving.Point.dot(v1, v2)/(v1.norm2()*v2.norm2()))
            collisionCourseDotProducts[instant] = moving.Point.dot(deltap, deltav)
            distances[instant] = deltap.norm2()
            speedDifferentials[instant] = deltav.norm2()
            if collisionCourseDotProducts[instant] > 0:
                interactionInstants.append(instant)
            collisionCourseAngles[instant] = arccos(collisionCourseDotProducts[instant]/(distances[instant]*speedDifferentials[instant]))

        if len(interactionInstants) >= 2:
            self.interactionInterval = moving.TimeInterval(interactionInstants[0], interactionInstants[-1])
        else:
            self.interactionInterval = moving.TimeInterval()
        self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[0], collisionCourseDotProducts))
        self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[1], collisionCourseAngles))
        self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[2], distances))
        self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[4], velocityAngles))
        self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[5], speedDifferentials))

        # if we have features, compute other indicators
        if self.roadUser1.features != None and self.roadUser2.features != None:
            minDistance={}
            for instant in self.timeInterval:
                minDistance[instant] = moving.MovingObject.minDistance(self.roadUser1, self.roadUser2, instant)
            self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[3], minDistance))

    def computeCrossingsCollisions(self, predictionParameters, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None, nProcesses = 1):
        '''Computes all crossing and collision points at each common instant for two road users. '''
        self.collisionPoints={}
        self.crossingZones={}
        TTCs = {}

        if timeInterval:
            commonTimeInterval = timeInterval
        else:
            commonTimeInterval = self.timeInterval
        self.collisionPoints, self.crossingZones = prediction.computeCrossingsCollisions(normalAdaptationPredictionParameters, obj1, obj2, collisionDistanceThreshold, timeHorizon, computeCZ, debug, nProcesses)
        # for i in list(commonTimeInterval)[:-1]: # do not look at the 1 last position/velocities, often with errors
        #     i, self.collisionPoints[i], self.crossingZones[i] = predictionParameters.computeCrossingsCollisionsAtInstant(i, self.roadUser1, self.roadUser2, collisionDistanceThreshold, timeHorizon, computeCZ, debug)
        #     if len(self.collisionPoints[i]) > 0:
        for i, cp in self.collisionPoints.iteritems():
            TTCs[i] = prediction.SafetyPoint.computeExpectedIndicator(cp)
        # add probability of collision, and probability of successful evasive action
        self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[7], TTCs))
        
        if computeCZ:
            pPETs = {}
            for i in list(commonTimeInterval)[:-1]:
                if len(self.crossingZones[i]) > 0:
                    pPETs[i] = prediction.SafetyPoint.computeExpectedIndicator(self.crossingZones[i])
            self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[9], pPETs))

    def addVideoFilename(self,videoFilename):
        self.videoFilename= videoFilename	

    def addInteractionType(self,interactionType):
        ''' interaction types: conflict or collision if they are known'''
        self.interactionType= interactionType

def createInteractions(objects, _others = None):
    '''Create all interactions of two co-existing road users'''
    if _others != None:
        others = _others

    interactions = []
    num = 0
    for i in xrange(len(objects)):
        if _others == None:
            others = objects[:i]
        for j in xrange(len(others)):
            commonTimeInterval = objects[i].commonTimeInterval(others[j])
            if not commonTimeInterval.empty():
                interactions.append(Interaction(num, commonTimeInterval, objects[i].num, others[j].num, objects[i], others[j]))
                num += 1
    return interactions

def prototypeCluster(interactions, similarityMatrix, alignmentMatrix, indicatorName, minSimilarity):
    '''Finds exemplar indicator time series for all interactions
    Returns the prototype indices (in the interaction list) and the label of each indicator (interaction)

    if an indicator profile (time series) is different enough (<minSimilarity), 
    it will become a new prototype. 
    Non-prototype interactions will be assigned to an existing prototype'''

    # sort indicators based on length
    indices = range(similarityMatrix.shape[0])
    def compare(i, j):
        if len(interactions[i].getIndicator(indicatorName)) > len(interactions[j].getIndicator(indicatorName)):
            return -1
        elif len(interactions[i].getIndicator(indicatorName)) == len(interactions[j].getIndicator(indicatorName)):
            return 0
        else:
            return 1
    indices.sort(compare)
    # go through all indicators
    prototypeIndices = [indices[0]]
    for i in indices[1:]:
        if similarityMatrix[i][prototypeIndices].max() < minSimilarity:
             prototypeIndices.append(i)

    # assignment
    labels = [-1]*similarityMatrix.shape[0]
    indices = [i for i in range(similarityMatrix.shape[0]) if i not in prototypeIndices]
    for i in prototypeIndices:
        labels[i] = i
    for i in indices:
        prototypeIndex = similarityMatrix[i][prototypeIndices].argmax()
        labels[i] = prototypeIndices[prototypeIndex]

    return prototypeIndices, labels

def prototypeMultivariateCluster(interactions, similarityMatrics, indicatorNames, minSimilarities, minClusterSize):
    '''Finds exmaple indicator time series (several indicators) for all interactions

    if any interaction indicator time series is different enough (<minSimilarity),
    it will become a new prototype. 
    Non-prototype interactions will be assigned to an existing prototype if all indicators are similar enough'''
    pass

# TODO:
#http://stackoverflow.com/questions/3288595/multiprocessing-using-pool-map-on-a-function-defined-in-a-class
#http://www.rueckstiess.net/research/snippets/show/ca1d7d90
def calculateIndicatorPipe(pairs, predParam, timeHorizon=75,collisionDistanceThreshold=1.8):  
    collisionPoints, crossingZones = prediction.computeCrossingsCollisions(pairs.roadUser1, pairs.roadUser2, predParam, collisionDistanceThreshold, timeHorizon)      
    #print pairs.num    
    # Ignore empty collision points
    empty = 1
    for i in collisionPoints:
        if(collisionPoints[i] != []):
            empty = 0
    if(empty == 1):
        pairs.hasCP = 0
    else:
        pairs.hasCP = 1
    pairs.CP = collisionPoints
    
    # Ignore empty crossing zones
    empty = 1
    for i in crossingZones:
        if(crossingZones[i] != []):
            empty = 0
    if(empty == 1):
        pairs.hasCZ = 0
    else:
        pairs.hasCZ = 1
    pairs.CZ = crossingZones
    return pairs

def calculateIndicatorPipe_star(a_b):
    """Convert `f([1,2])` to `f(1,2)` call."""
    return calculateIndicatorPipe(*a_b)

class VehPairs():
    '''Create a veh-pairs object from objects list'''
    def __init__(self,objects):
        self.pairs = createInteractions(objects)
        self.interactionCount = 0
        self.CPcount = 0
        self.CZcount = 0
    
    # Process indicator calculation with support for multi-threading
    def calculateIndicators(self,predParam,threads=1,timeHorizon=75,collisionDistanceThreshold=1.8):       
        if(threads > 1):
            pool = multiprocessing.Pool(threads)
            self.pairs = pool.map(calculateIndicatorPipe_star, itertools.izip(self.pairs, itertools.repeat(predParam)))
            pool.close()
        else:
            #prog = Tools.ProgressBar(0, len(self.pairs), 77) #Removed in traffic-intelligenc port
            for j in xrange(len(self.pairs)):
                #prog.updateAmount(j) #Removed in traffic-intelligenc port
                collisionPoints, crossingZones = prediction.computeCrossingsCollisions(self.pairs[j].roadUser1, self.pairs[j].roadUser2, predParam, collisionDistanceThreshold, timeHorizon)      
                
                # Ignore empty collision points
                empty = 1
                for i in collisionPoints:
                    if(collisionPoints[i] != []):
                        empty = 0
                if(empty == 1):
                    self.pairs[j].hasCP = 0
                else:
                    self.pairs[j].hasCP = 1
                self.pairs[j].CP = collisionPoints
                
                # Ignore empty crossing zones
                empty = 1
                for i in crossingZones:
                    if(crossingZones[i] != []):
                        empty = 0
                if(empty == 1):
                    self.pairs[j].hasCZ = 0
                else:
                    self.pairs[j].hasCZ = 1
                self.pairs[j].CZ = crossingZones       
                
        for j in self.pairs:
            self.interactionCount = self.interactionCount + len(j.CP)
        self.CPcount = len(self.getCPlist())
        self.Czcount = len(self.getCZlist())
    
    
    def getPairsWCP(self):
        lists = []
        for j in self.pairs:
            if(j.hasCP):
                lists.append(j.num)
        return lists
        
    def getPairsWCZ(self):
        lists = []
        for j in self.pairs:
            if(j.hasCZ):
                lists.append(j.num)
        return lists
    
    def getCPlist(self,indicatorThreshold=float('Inf')):
        lists = []
        for j in self.pairs:
            if(j.hasCP):
                for k in j.CP:
                    if(j.CP[k] != [] and j.CP[k][0].indicator < indicatorThreshold):
                        lists.append([k,j.CP[k][0]])
        return lists
     
    def getCZlist(self,indicatorThreshold=float('Inf')):
        lists = []
        for j in self.pairs:
            if(j.hasCZ):
                for k in j.CZ:
                    if(j.CZ[k] != [] and j.CZ[k][0].indicator < indicatorThreshold):
                        lists.append([k,j.CZ[k][0]])
        return lists
        
    def genIndicatorHistogram(self, CPlist=False, bins=range(0,100,1)):
        if(not CPlist):
            CPlist = self.getCPlist()
        if(not CPlist):
            return False
        TTC_list = []
        for i in CPlist:
            TTC_list.append(i[1].indicator)
        histo = np.histogram(TTC_list,bins=bins)
        histo += (histo[0].astype(float)/np.sum(histo[0]),)
        return histo

class Crossing(moving.STObject):
    '''Class for the event of a street crossing

    TODO: detecter passage sur la chaussee
    identifier origines et destination (ou uniquement chaussee dans FOV)
    carac traversee
    detecter proximite veh (retirer si trop similaire simultanement
    carac interaction'''
    
    def __init__(self, roaduserNum = None, num = None, timeInterval = None):
        moving.STObject.__init__(self, num, timeInterval)
        self.roaduserNum = roaduserNum

    

if __name__ == "__main__":
    import doctest
    import unittest
    suite = doctest.DocFileSuite('tests/events.txt')
    #suite = doctest.DocTestSuite()
    unittest.TextTestRunner().run(suite)