view python/events.py @ 631:2d1d33ae1c69

major work on pPET and pet, issues remain for pPET computed with predicted trajectories
author Nicolas Saunier <nicolas.saunier@polymtl.ca>
date Tue, 17 Feb 2015 03:55:55 +0100
parents 69a98f84f3eb
children 2f1a583bfd20
line wrap: on
line source

#! /usr/bin/env python
'''Libraries for events
Interactions, pedestrian crossing...'''

import numpy as np
from numpy import arccos

import multiprocessing
import itertools

import moving, prediction, indicators, utils, cvutils
__metaclass__ = type

def findRoute(prototypes,objects,i,j,noiseEntryNums,noiseExitNums,minSimilarity= 0.3, spatialThreshold=1.0, delta=180):
    if i[0] not in noiseEntryNums: 
        prototypesRoutes= [ x for x in sorted(prototypes.keys()) if i[0]==x[0]]
    elif i[1] not in noiseExitNums:
        prototypesRoutes=[ x for x in sorted(prototypes.keys()) if i[1]==x[1]]
    else:
        prototypesRoutes=[x for x in sorted(prototypes.keys())]
    routeSim={}
    lcss = utils.LCSS(similarityFunc=lambda x,y: (distanceForLCSS(x,y) <= spatialThreshold),delta=delta)
    for y in prototypesRoutes: 
        if y in prototypes.keys():
            prototypesIDs=prototypes[y]
            similarity=[]
            for x in prototypesIDs:
                s=lcss.computeNormalized(objects[j].positions, objects[x].positions)
                similarity.append(s)
            routeSim[y]=max(similarity)
    route=max(routeSim, key=routeSim.get)
    if routeSim[route]>=minSimilarity:
        return route
    else:
        return i

def getRoute(obj,prototypes,objects,noiseEntryNums,noiseExitNums,useDestination=True):
    route=(obj.startRouteID,obj.endRouteID)
    if useDestination:
        if route not in prototypes.keys():
            route= findRoute(prototypes,objects,route,obj.getNum(),noiseEntryNums,noiseExitNums)
    return route

class Interaction(moving.STObject):
    '''Class for an interaction between two road users 
    or a road user and an obstacle
    
    link to the moving objects
    contains the indicators in a dictionary with the names as keys
    '''

    categories = {'Head On': 0,
                  'rearend': 1,
                  'side': 2,
                  'parallel': 3}

    indicatorNames = ['Collision Course Dot Product',
                      'Collision Course Angle',
                      'Distance',
                      'Minimum Distance',
                      'Velocity Angle',
                      'Speed Differential',
                      'Collision Probability',
                      'Time to Collision', # 7
                      'Probability of Successful Evasive Action',
                      'predicted Post Encroachment Time']

    indicatorNameToIndices = utils.inverseEnumeration(indicatorNames)

    indicatorShortNames = ['CCDP',
                           'CCA',
                           'Dist',
                           'MinDist',
                           'VA',
                           'SD',
                           'PoC',
                           'TTC',
                           'P(SEA)',
                           'pPET']

    indicatorUnits = ['',
                      'rad',
                      'm',
                      'm',
                      'rad',
                      'm/s',
                      '',
                      's',
                      '',
                      '']

    def __init__(self, num = None, timeInterval = None, roaduserNum1 = None, roaduserNum2 = None, roadUser1 = None, roadUser2 = None, categoryNum = None):
        moving.STObject.__init__(self, num, timeInterval)
        if timeInterval == None and roadUser1 != None and roadUser2 != None:
            self.timeInterval = roadUser1.commonTimeInterval(roadUser2)
        self.roadUser1 = roadUser1
        self.roadUser2 = roadUser2
        if roaduserNum1 != None and roaduserNum2 != None:
            self.roadUserNumbers = set([roaduserNum1, roaduserNum2])
        elif roadUser1 != None and roadUser2 != None:
            self.roadUserNumbers = set(roadUser1.getNum(), roadUser2.getNum())
        else:
            self.roadUserNumbers = None
        self.categoryNum = categoryNum
        self.indicators = {}
        self.interactionInterval = None

    def getRoadUserNumbers(self):
        return self.roadUserNumbers

    def setRoadUsers(self, objects):
        nums = list(self.getRoadUserNumbers())
        if objects[nums[0]].getNum() == nums[0]:
            self.roadUser1 = objects[nums[0]]
        if objects[nums[1]].getNum() == nums[1]:
            self.roadUser2 = objects[nums[1]]

        i = 0
        while i < len(objects) and self.roadUser2 == None:
            if objects[i].getNum() in nums:
                if self.roadUser1 == None:
                    self.roadUser1 = objects[i]
                else:
                    self.roadUser2 = objects[i]
            i += 1

    def getIndicator(self, indicatorName):
        return self.indicators.get(indicatorName, None)

    def addIndicator(self, indicator):
        if indicator:
            self.indicators[indicator.name] = indicator

    def plot(self, options = '', withOrigin = False, timeStep = 1, withFeatures = False, **kwargs):
        self.roadUser1.plot(options, withOrigin, timeStep, withFeatures, **kwargs)
        self.roadUser2.plot(options, withOrigin, timeStep, withFeatures, **kwargs)

    def plotOnWorldImage(self, nPixelsPerUnitDistance, options = '', withOrigin = False, timeStep = 1, **kwargs):
        self.roadUser1.plotOnWorldImage(nPixelsPerUnitDistance, options, withOrigin, timeStep, **kwargs)
        self.roadUser2.plotOnWorldImage(nPixelsPerUnitDistance, options, withOrigin, timeStep, **kwargs)

    def play(self, videoFilename, homography = None, undistort = False, intrinsicCameraMatrix = None, distortionCoefficients = None, undistortedImageMultiplication = 1.):
        if self.roadUser1 != None and self.roadUser2 != None:
            cvutils.displayTrajectories(videoFilename, [self.roadUser1, self.roadUser2], homography = homography, firstFrameNum = self.getFirstInstant(), lastFrameNumArg = self.getLastInstant(), undistort = undistort, intrinsicCameraMatrix = intrinsicCameraMatrix, distortionCoefficients = distortionCoefficients, undistortedImageMultiplication = undistortedImageMultiplication)
        else:
            print('Please set the interaction road user attributes roadUser1 and roadUser1 through the method setRoadUsers')

    def computeIndicators(self):
        '''Computes the collision course cosine only if the cosine is positive'''
        collisionCourseDotProducts = {}#[0]*int(self.timeInterval.length())
        collisionCourseAngles = {}
        velocityAngles = {}
        distances = {}#[0]*int(self.timeInterval.length())
        speedDifferentials = {}
        interactionInstants = []
        for instant in self.timeInterval:
            deltap = self.roadUser1.getPositionAtInstant(instant)-self.roadUser2.getPositionAtInstant(instant)
            v1 = self.roadUser1.getVelocityAtInstant(instant)
            v2 = self.roadUser2.getVelocityAtInstant(instant)
            deltav = v2-v1
            velocityAngles[instant] = arccos(moving.Point.dot(v1, v2)/(v1.norm2()*v2.norm2()))
            collisionCourseDotProducts[instant] = moving.Point.dot(deltap, deltav)
            distances[instant] = deltap.norm2()
            speedDifferentials[instant] = deltav.norm2()
            if collisionCourseDotProducts[instant] > 0:
                interactionInstants.append(instant)
            collisionCourseAngles[instant] = arccos(collisionCourseDotProducts[instant]/(distances[instant]*speedDifferentials[instant]))

        if len(interactionInstants) >= 2:
            self.interactionInterval = moving.TimeInterval(interactionInstants[0], interactionInstants[-1])
        else:
            self.interactionInterval = moving.TimeInterval()
        self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[0], collisionCourseDotProducts))
        self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[1], collisionCourseAngles))
        self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[2], distances))
        self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[4], velocityAngles))
        self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[5], speedDifferentials))

        # if we have features, compute other indicators
        if len(self.roadUser1.features) != 0 and len(self.roadUser2.features) != 0:
            minDistance={}
            for instant in self.timeInterval:
                minDistance[instant] = moving.MovingObject.minDistance(self.roadUser1, self.roadUser2, instant)
            self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[3], minDistance))

    def computeCrossingsCollisions(self, predictionParameters, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None, nProcesses = 1, usePrototypes=False, route1= (-1,-1), route2=(-1,-1), prototypes={}, secondStepPrototypes={}, nMatching={}, objects=[], noiseEntryNums=[], noiseExitNums=[], minSimilarity=0.1, mostMatched=None, useDestination=True, useSpeedPrototype=True, acceptPartialLength=30, step=1):
        '''Computes all crossing and collision points at each common instant for two road users. '''
        self.collisionPoints={}
        self.crossingZones={}
        TTCs = {}
        if usePrototypes:
            route1= getRoute(self.roadUser1,prototypes,objects,noiseEntryNums,noiseExitNums,useDestination)
            route2= getRoute(self.roadUser2,prototypes,objects,noiseEntryNums,noiseExitNums,useDestination)

        if timeInterval != None:
            commonTimeInterval = timeInterval
        else:
            commonTimeInterval = self.timeInterval
        self.collisionPoints, self.crossingZones = predictionParameters.computeCrossingsCollisions(self.roadUser1, self.roadUser2, collisionDistanceThreshold, timeHorizon, computeCZ, debug, commonTimeInterval, nProcesses,usePrototypes,route1,route2,prototypes,secondStepPrototypes,nMatching,objects,noiseEntryNums,noiseExitNums,minSimilarity,mostMatched,useDestination,useSpeedPrototype,acceptPartialLength, step)
        for i, cp in self.collisionPoints.iteritems():
            TTCs[i] = prediction.SafetyPoint.computeExpectedIndicator(cp)
        # add probability of collision, and probability of successful evasive action
        self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[7], TTCs, mostSevereIsMax=False))
        
        if computeCZ:
            pPETs = {}
            for i, cz in self.crossingZones.iteritems():
                pPETs[i] = prediction.SafetyPoint.computeExpectedIndicator(cz)
            self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[9], pPETs, mostSevereIsMax=False))

    def computePET(self, collisionDistanceThreshold):
        # TODO add crossing zone
        self.pet = moving.MovingObject.computePET(self.roadUser1, self.roadUser2, collisionDistanceThreshold)

    def addVideoFilename(self,videoFilename):
        self.videoFilename= videoFilename

    def addInteractionType(self,interactionType):
        ''' interaction types: conflict or collision if they are known'''
        self.interactionType= interactionType

def createInteractions(objects, _others = None):
    '''Create all interactions of two co-existing road users'''
    if _others != None:
        others = _others

    interactions = []
    num = 0
    for i in xrange(len(objects)):
        if _others == None:
            others = objects[:i]
        for j in xrange(len(others)):
            commonTimeInterval = objects[i].commonTimeInterval(others[j])
            if not commonTimeInterval.empty():
                interactions.append(Interaction(num, commonTimeInterval, objects[i].num, others[j].num, objects[i], others[j]))
                num += 1
    return interactions

def findInteraction(interactions, roadUserNum1, roadUserNum2):
    'Returns the right interaction in the set'
    i=0
    while i<len(interactions) and set([roadUserNum1, roadUserNum2]) != interactions[i].getRoadUserNumbers():
        i+=1
    if i<len(interactions):
        return interactions[i]
    else:
        return None

def prototypeCluster(interactions, similarityMatrix, alignmentMatrix, indicatorName, minSimilarity):
    '''Finds exemplar indicator time series for all interactions
    Returns the prototype indices (in the interaction list) and the label of each indicator (interaction)

    if an indicator profile (time series) is different enough (<minSimilarity), 
    it will become a new prototype. 
    Non-prototype interactions will be assigned to an existing prototype'''

    # sort indicators based on length
    indices = range(similarityMatrix.shape[0])
    def compare(i, j):
        if len(interactions[i].getIndicator(indicatorName)) > len(interactions[j].getIndicator(indicatorName)):
            return -1
        elif len(interactions[i].getIndicator(indicatorName)) == len(interactions[j].getIndicator(indicatorName)):
            return 0
        else:
            return 1
    indices.sort(compare)
    # go through all indicators
    prototypeIndices = [indices[0]]
    for i in indices[1:]:
        if similarityMatrix[i][prototypeIndices].max() < minSimilarity:
             prototypeIndices.append(i)

    # assignment
    labels = [-1]*similarityMatrix.shape[0]
    indices = [i for i in range(similarityMatrix.shape[0]) if i not in prototypeIndices]
    for i in prototypeIndices:
        labels[i] = i
    for i in indices:
        prototypeIndex = similarityMatrix[i][prototypeIndices].argmax()
        labels[i] = prototypeIndices[prototypeIndex]

    return prototypeIndices, labels

def prototypeMultivariateCluster(interactions, similarityMatrics, indicatorNames, minSimilarities, minClusterSize):
    '''Finds exmaple indicator time series (several indicators) for all interactions

    if any interaction indicator time series is different enough (<minSimilarity),
    it will become a new prototype. 
    Non-prototype interactions will be assigned to an existing prototype if all indicators are similar enough'''
    pass

# TODO:
#http://stackoverflow.com/questions/3288595/multiprocessing-using-pool-map-on-a-function-defined-in-a-class
#http://www.rueckstiess.net/research/snippets/show/ca1d7d90
def calculateIndicatorPipe(pairs, predParam, timeHorizon=75,collisionDistanceThreshold=1.8):  
    collisionPoints, crossingZones = prediction.computeCrossingsCollisions(pairs.roadUser1, pairs.roadUser2, predParam, collisionDistanceThreshold, timeHorizon)      
    #print pairs.num    
    # Ignore empty collision points
    empty = 1
    for i in collisionPoints:
        if(collisionPoints[i] != []):
            empty = 0
    if(empty == 1):
        pairs.hasCP = 0
    else:
        pairs.hasCP = 1
    pairs.CP = collisionPoints
    
    # Ignore empty crossing zones
    empty = 1
    for i in crossingZones:
        if(crossingZones[i] != []):
            empty = 0
    if(empty == 1):
        pairs.hasCZ = 0
    else:
        pairs.hasCZ = 1
    pairs.CZ = crossingZones
    return pairs

def calculateIndicatorPipe_star(a_b):
    """Convert `f([1,2])` to `f(1,2)` call."""
    return calculateIndicatorPipe(*a_b)

class VehPairs():
    '''Create a veh-pairs object from objects list'''
    def __init__(self,objects):
        self.pairs = createInteractions(objects)
        self.interactionCount = 0
        self.CPcount = 0
        self.CZcount = 0
    
    # Process indicator calculation with support for multi-threading
    def calculateIndicators(self,predParam,threads=1,timeHorizon=75,collisionDistanceThreshold=1.8):       
        if(threads > 1):
            pool = multiprocessing.Pool(threads)
            self.pairs = pool.map(calculateIndicatorPipe_star, itertools.izip(self.pairs, itertools.repeat(predParam)))
            pool.close()
        else:
            #prog = Tools.ProgressBar(0, len(self.pairs), 77) #Removed in traffic-intelligenc port
            for j in xrange(len(self.pairs)):
                #prog.updateAmount(j) #Removed in traffic-intelligenc port
                collisionPoints, crossingZones = prediction.computeCrossingsCollisions(self.pairs[j].roadUser1, self.pairs[j].roadUser2, predParam, collisionDistanceThreshold, timeHorizon)      
                
                # Ignore empty collision points
                empty = 1
                for i in collisionPoints:
                    if(collisionPoints[i] != []):
                        empty = 0
                if(empty == 1):
                    self.pairs[j].hasCP = 0
                else:
                    self.pairs[j].hasCP = 1
                self.pairs[j].CP = collisionPoints
                
                # Ignore empty crossing zones
                empty = 1
                for i in crossingZones:
                    if(crossingZones[i] != []):
                        empty = 0
                if(empty == 1):
                    self.pairs[j].hasCZ = 0
                else:
                    self.pairs[j].hasCZ = 1
                self.pairs[j].CZ = crossingZones       
                
        for j in self.pairs:
            self.interactionCount = self.interactionCount + len(j.CP)
        self.CPcount = len(self.getCPlist())
        self.Czcount = len(self.getCZlist())
    
    
    def getPairsWCP(self):
        lists = []
        for j in self.pairs:
            if(j.hasCP):
                lists.append(j.num)
        return lists
        
    def getPairsWCZ(self):
        lists = []
        for j in self.pairs:
            if(j.hasCZ):
                lists.append(j.num)
        return lists
    
    def getCPlist(self,indicatorThreshold=float('Inf')):
        lists = []
        for j in self.pairs:
            if(j.hasCP):
                for k in j.CP:
                    if(j.CP[k] != [] and j.CP[k][0].indicator < indicatorThreshold):
                        lists.append([k,j.CP[k][0]])
        return lists
     
    def getCZlist(self,indicatorThreshold=float('Inf')):
        lists = []
        for j in self.pairs:
            if(j.hasCZ):
                for k in j.CZ:
                    if(j.CZ[k] != [] and j.CZ[k][0].indicator < indicatorThreshold):
                        lists.append([k,j.CZ[k][0]])
        return lists
        
    def genIndicatorHistogram(self, CPlist=False, bins=range(0,100,1)):
        if(not CPlist):
            CPlist = self.getCPlist()
        if(not CPlist):
            return False
        TTC_list = []
        for i in CPlist:
            TTC_list.append(i[1].indicator)
        histo = np.histogram(TTC_list,bins=bins)
        histo += (histo[0].astype(float)/np.sum(histo[0]),)
        return histo

class Crossing(moving.STObject):
    '''Class for the event of a street crossing

    TODO: detecter passage sur la chaussee
    identifier origines et destination (ou uniquement chaussee dans FOV)
    carac traversee
    detecter proximite veh (retirer si trop similaire simultanement
    carac interaction'''
    
    def __init__(self, roaduserNum = None, num = None, timeInterval = None):
        moving.STObject.__init__(self, num, timeInterval)
        self.roaduserNum = roaduserNum

    

if __name__ == "__main__":
    import doctest
    import unittest
    suite = doctest.DocFileSuite('tests/events.txt')
    #suite = doctest.DocTestSuite()
    unittest.TextTestRunner().run(suite)