diff trafficintelligence/events.py @ 1028:cc5cb04b04b0

major update using the trafficintelligence package name and install through pip
author Nicolas Saunier <nicolas.saunier@polymtl.ca>
date Fri, 15 Jun 2018 11:19:10 -0400
parents python/events.py@933670761a57
children 6a8fe3ed3bc6
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/trafficintelligence/events.py	Fri Jun 15 11:19:10 2018 -0400
@@ -0,0 +1,341 @@
+#! /usr/bin/env python
+'''Libraries for events
+Interactions, pedestrian crossing...'''
+
+from trafficintelligence import moving, prediction, indicators, utils, cvutils, ml
+from trafficintelligence.base import VideoFilenameAddable
+
+import numpy as np
+
+import multiprocessing
+import itertools, logging
+
+
+def findRoute(prototypes,objects,i,j,noiseEntryNums,noiseExitNums,minSimilarity= 0.3, spatialThreshold=1.0, delta=180):
+    if i[0] not in noiseEntryNums: 
+        prototypesRoutes= [ x for x in sorted(prototypes.keys()) if i[0]==x[0]]
+    elif i[1] not in noiseExitNums:
+        prototypesRoutes=[ x for x in sorted(prototypes.keys()) if i[1]==x[1]]
+    else:
+        prototypesRoutes=[x for x in sorted(prototypes.keys())]
+    routeSim={}
+    lcss = utils.LCSS(similarityFunc=lambda x,y: (distanceForLCSS(x,y) <= spatialThreshold),delta=delta)
+    for y in prototypesRoutes: 
+        if y in prototypes:
+            prototypesIDs=prototypes[y]
+            similarity=[]
+            for x in prototypesIDs:
+                s=lcss.computeNormalized(objects[j].positions, objects[x].positions)
+                similarity.append(s)
+            routeSim[y]=max(similarity)
+    route=max(routeSim, key=routeSim.get)
+    if routeSim[route]>=minSimilarity:
+        return route
+    else:
+        return i
+
+def getRoute(obj,prototypes,objects,noiseEntryNums,noiseExitNums,useDestination=True):
+    route=(obj.startRouteID,obj.endRouteID)
+    if useDestination:
+        if route not in prototypes:
+            route= findRoute(prototypes,objects,route,obj.getNum(),noiseEntryNums,noiseExitNums)
+    return route
+
+class Interaction(moving.STObject, VideoFilenameAddable):
+    '''Class for an interaction between two road users 
+    or a road user and an obstacle
+    
+    link to the moving objects
+    contains the indicators in a dictionary with the names as keys
+    '''
+
+    categories = {'Head On': 0,
+                  'rearend': 1,
+                  'side': 2,
+                  'parallel': 3}
+
+    indicatorNames = ['Collision Course Dot Product',
+                      'Collision Course Angle',
+                      'Distance',
+                      'Minimum Distance',
+                      'Velocity Angle',
+                      'Speed Differential',
+                      'Collision Probability',
+                      'Time to Collision', # 7
+                      'Probability of Successful Evasive Action',
+                      'predicted Post Encroachment Time',
+                      'Post Encroachment Time']
+
+    indicatorNameToIndices = utils.inverseEnumeration(indicatorNames)
+
+    indicatorShortNames = ['CCDP',
+                           'CCA',
+                           'Dist',
+                           'MinDist',
+                           'VA',
+                           'SD',
+                           'PoC',
+                           'TTC',
+                           'P(SEA)',
+                           'pPET',
+                           'PET']
+
+    indicatorUnits = ['',
+                      'rad',
+                      'm',
+                      'm',
+                      'rad',
+                      'm/s',
+                      '',
+                      's',
+                      '',
+                      's',
+                      's']
+
+    timeIndicators = ['Time to Collision', 'predicted Post Encroachment Time']
+
+    def __init__(self, num = None, timeInterval = None, roaduserNum1 = None, roaduserNum2 = None, roadUser1 = None, roadUser2 = None, categoryNum = None):
+        moving.STObject.__init__(self, num, timeInterval)
+        if timeInterval is None and roadUser1 is not None and roadUser2 is not None:
+            self.timeInterval = roadUser1.commonTimeInterval(roadUser2)
+        self.roadUser1 = roadUser1
+        self.roadUser2 = roadUser2
+        if roaduserNum1 is not None and roaduserNum2 is not None:
+            self.roadUserNumbers = set([roaduserNum1, roaduserNum2])
+        elif roadUser1 is not None and roadUser2 is not None:
+            self.roadUserNumbers = set([roadUser1.getNum(), roadUser2.getNum()])
+        else:
+            self.roadUserNumbers = None
+        self.categoryNum = categoryNum
+        self.indicators = {}
+        self.interactionInterval = None
+         # list for collison points and crossing zones
+        self.collisionPoints = None
+        self.crossingZones = None
+
+    def getRoadUserNumbers(self):
+        return self.roadUserNumbers
+
+    def setRoadUsers(self, objects):
+        nums = sorted(list(self.getRoadUserNumbers()))
+        if nums[0]<len(objects) and objects[nums[0]].getNum() == nums[0]:
+            self.roadUser1 = objects[nums[0]]
+        if nums[1]<len(objects) and objects[nums[1]].getNum() == nums[1]:
+            self.roadUser2 = objects[nums[1]]
+
+        if self.roadUser1 is None or self.roadUser2 is None:
+            self.roadUser1 = None
+            self.roadUser2 = None
+            i = 0
+            while i < len(objects) and self.roadUser2 is None:
+                if objects[i].getNum() in nums:
+                    if self.roadUser1 is None:
+                        self.roadUser1 = objects[i]
+                    else:
+                        self.roadUser2 = objects[i]
+                i += 1
+
+    def getIndicator(self, indicatorName):
+        return self.indicators.get(indicatorName, None)
+
+    def addIndicator(self, indicator):
+        if indicator is not None:
+            self.indicators[indicator.name] = indicator
+
+    def getIndicatorValueAtInstant(self, indicatorName, instant):
+        indicator = self.getIndicator(indicatorName)
+        if indicator is not None:
+            return indicator[instant]
+        else:
+            return None
+
+    def getIndicatorValuesAtInstant(self, instant):
+        '''Returns list of indicator values at instant
+        as dict (with keys from indicators dict)'''
+        values = {}
+        for k, indicator in self.indicators.items():
+            values[k] = indicator[instant]
+        return values
+        
+    def plot(self, options = '', withOrigin = False, timeStep = 1, withFeatures = False, **kwargs):
+        self.roadUser1.plot(options, withOrigin, timeStep, withFeatures, **kwargs)
+        self.roadUser2.plot(options, withOrigin, timeStep, withFeatures, **kwargs)
+
+    def plotOnWorldImage(self, nPixelsPerUnitDistance, options = '', withOrigin = False, timeStep = 1, **kwargs):
+        self.roadUser1.plotOnWorldImage(nPixelsPerUnitDistance, options, withOrigin, timeStep, **kwargs)
+        self.roadUser2.plotOnWorldImage(nPixelsPerUnitDistance, options, withOrigin, timeStep, **kwargs)
+
+    def play(self, videoFilename, homography = None, undistort = False, intrinsicCameraMatrix = None, distortionCoefficients = None, undistortedImageMultiplication = 1., allUserInstants = False):
+        if self.roadUser1 is not None and self.roadUser2 is not None:
+            if allUserInstants:
+                firstFrameNum = min(self.roadUser1.getFirstInstant(), self.roadUser2.getFirstInstant())
+                lastFrameNum = max(self.roadUser1.getLastInstant(), self.roadUser2.getLastInstant())
+            else:
+                firstFrameNum = self.getFirstInstant()
+                lastFrameNum = self.getLastInstant()
+            cvutils.displayTrajectories(videoFilename, [self.roadUser1, self.roadUser2], homography = homography, firstFrameNum = firstFrameNum, lastFrameNumArg = lastFrameNum, undistort = undistort, intrinsicCameraMatrix = intrinsicCameraMatrix, distortionCoefficients = distortionCoefficients, undistortedImageMultiplication = undistortedImageMultiplication)
+        else:
+            print('Please set the interaction road user attributes roadUser1 and roadUser1 through the method setRoadUsers')
+
+    def computeIndicators(self):
+        '''Computes the collision course cosine only if the cosine is positive'''
+        collisionCourseDotProducts = {}#[0]*int(self.timeInterval.length())
+        collisionCourseAngles = {}
+        velocityAngles = {}
+        distances = {}#[0]*int(self.timeInterval.length())
+        speedDifferentials = {}
+        interactionInstants = []
+        for instant in self.timeInterval:
+            deltap = self.roadUser1.getPositionAtInstant(instant)-self.roadUser2.getPositionAtInstant(instant)
+            v1 = self.roadUser1.getVelocityAtInstant(instant)
+            v2 = self.roadUser2.getVelocityAtInstant(instant)
+            deltav = v2-v1
+            velocityAngles[instant] = np.arccos(moving.Point.dot(v1, v2)/(v1.norm2()*v2.norm2()))
+            collisionCourseDotProducts[instant] = moving.Point.dot(deltap, deltav)
+            distances[instant] = deltap.norm2()
+            speedDifferentials[instant] = deltav.norm2()
+            if collisionCourseDotProducts[instant] > 0:
+                interactionInstants.append(instant)
+            if distances[instant] != 0 and speedDifferentials[instant] != 0:
+                collisionCourseAngles[instant] = np.arccos(collisionCourseDotProducts[instant]/(distances[instant]*speedDifferentials[instant]))
+
+        if len(interactionInstants) >= 2:
+            self.interactionInterval = moving.TimeInterval(interactionInstants[0], interactionInstants[-1])
+        else:
+            self.interactionInterval = moving.TimeInterval()
+        self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[0], collisionCourseDotProducts))
+        self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[1], collisionCourseAngles))
+        self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[2], distances, mostSevereIsMax = False))
+        self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[4], velocityAngles))
+        self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[5], speedDifferentials))
+
+        # if we have features, compute other indicators
+        if self.roadUser1.hasFeatures() and self.roadUser2.hasFeatures():
+            minDistances={}
+            for instant in self.timeInterval:
+                minDistances[instant] = moving.MovingObject.minDistance(self.roadUser1, self.roadUser2, instant)
+            self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[3], minDistances, mostSevereIsMax = False))
+
+    def computeCrossingsCollisions(self, predictionParameters, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None):
+        '''Computes all crossing and collision points at each common instant for two road users. '''
+        TTCs = {}
+        collisionProbabilities = {}
+        if timeInterval is not None:
+            commonTimeInterval = timeInterval
+        else:
+            commonTimeInterval = self.timeInterval
+        self.collisionPoints, crossingZones = predictionParameters.computeCrossingsCollisions(self.roadUser1, self.roadUser2, collisionDistanceThreshold, timeHorizon, computeCZ, debug, commonTimeInterval)
+        for i, cps in self.collisionPoints.items():
+            TTCs[i] = prediction.SafetyPoint.computeExpectedIndicator(cps)
+            collisionProbabilities[i] = sum([p.probability for p in cps])
+        if len(TTCs) > 0:
+            self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[7], TTCs, mostSevereIsMax=False))
+            self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[6], collisionProbabilities))
+        
+        # crossing zones and pPET
+        if computeCZ:
+            self.crossingZones = crossingZones
+            pPETs = {}
+            for i, cz in self.crossingZones.items():
+                pPETs[i] = prediction.SafetyPoint.computeExpectedIndicator(cz)
+            self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[9], pPETs, mostSevereIsMax=False))
+        # TODO add probability of collision, and probability of successful evasive action
+
+    def computePET(self, collisionDistanceThreshold):
+        pet, t1, t2=  moving.MovingObject.computePET(self.roadUser1, self.roadUser2, collisionDistanceThreshold)
+        if pet is not None:
+            self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[10], {min(t1, t2): pet}, mostSevereIsMax = False))
+
+    def setCollision(self, collision):
+        '''indicates if it is a collision: argument should be boolean'''
+        self.collision = collision
+
+    def isCollision(self):
+        if hasattr(self, 'collision'):
+            return self.collision
+        else:
+            return None
+
+    def getCollisionPoints(self):
+        return self.collisionPoints
+
+    def getCrossingZones(self):
+        return self.crossingZones
+
+def createInteractions(objects, _others = None):
+    '''Create all interactions of two co-existing road users'''
+    if _others is not None:
+        others = _others
+
+    interactions = []
+    num = 0
+    for i in range(len(objects)):
+        if _others is None:
+            others = objects[:i]
+        for j in range(len(others)):
+            commonTimeInterval = objects[i].commonTimeInterval(others[j])
+            if not commonTimeInterval.empty():
+                interactions.append(Interaction(num, commonTimeInterval, objects[i].num, others[j].num, objects[i], others[j]))
+                num += 1
+    return interactions
+
+def findInteraction(interactions, roadUserNum1, roadUserNum2):
+    'Returns the right interaction in the set'
+    i=0
+    while i<len(interactions) and set([roadUserNum1, roadUserNum2]) != interactions[i].getRoadUserNumbers():
+        i+=1
+    if i<len(interactions):
+        return interactions[i]
+    else:
+        return None
+
+def computeIndicators(interactions, computeMotionPrediction, computePET, predictionParameters, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None):
+    for inter in interactions:
+        print('processing interaction {}'.format(inter.getNum())) # logging.debug('processing interaction {}'.format(inter.getNum()))
+        inter.computeIndicators()
+        if computeMotionPrediction:
+            inter.computeCrossingsCollisions(predictionParameters, collisionDistanceThreshold, timeHorizon, computeCZ, debug, timeInterval)
+        if computePET:
+            inter.computePET(collisionDistanceThreshold)
+    return interactions
+    
+def aggregateSafetyPoints(interactions, pointType = 'collision'):
+    '''Put all collision points or crossing zones in a list for display'''
+    allPoints = []
+    if pointType == 'collision':
+        for i in interactions:
+            for points in i.collisionPoints.values():
+                allPoints += points
+    elif pointType == 'crossing':
+        for i in interactions:
+            for points in i.crossingZones.values():
+                allPoints += points
+    else:
+        print('unknown type of point: '+pointType)
+    return allPoints
+
+def prototypeCluster(interactions, similarities, indicatorName, minSimilarity, similarityFunc = None, minClusterSize = None, randomInitialization = False):
+    return ml.prototypeCluster([inter.getIndicator(indicatorName) for inter in interactions], similarities, minSimilarity, similarityFunc, minClusterSize, randomInitialization)
+
+class Crossing(moving.STObject):
+    '''Class for the event of a street crossing
+
+    TODO: detecter passage sur la chaussee
+    identifier origines et destination (ou uniquement chaussee dans FOV)
+    carac traversee
+    detecter proximite veh (retirer si trop similaire simultanement
+    carac interaction'''
+    
+    def __init__(self, roaduserNum = None, num = None, timeInterval = None):
+        moving.STObject.__init__(self, num, timeInterval)
+        self.roaduserNum = roaduserNum
+
+    
+
+if __name__ == "__main__":
+    import doctest
+    import unittest
+    suite = doctest.DocFileSuite('tests/events.txt')
+    #suite = doctest.DocTestSuite()
+    unittest.TextTestRunner().run(suite)
+