Mercurial Hosting > traffic-intelligence
view python/events.py @ 398:3399bd48cb40
Ajout d'une méthode pour obtenir le nombre de FPS
Méthode de capture des trames vidéos plus résistante aux erreur
Utilisation d'un dictionnaire pour les fichier de configuration afin de garder le nom des sections
author | Jean-Philippe Jodoin <jpjodoin@gmail.com> |
---|---|
date | Mon, 29 Jul 2013 13:46:07 -0400 |
parents | 6e0dedd34920 |
children | 365d8dee44f3 |
line wrap: on
line source
#! /usr/bin/env python '''Libraries for events Interactions, pedestrian crossing...''' import numpy as np from numpy import arccos import multiprocessing import itertools import moving, prediction, indicators, utils __metaclass__ = type class Interaction(moving.STObject): '''Class for an interaction between two road users or a road user and an obstacle link to the moving objects contains the indicators in a dictionary with the names as keys ''' categories = {'Head On': 0, 'rearend': 1, 'side': 2, 'parallel': 3} indicatorNames = ['Collision Course Dot Product', 'Collision Course Angle', 'Distance', 'Minimum Distance', 'Velocity Angle', 'Speed Differential', 'Collision Probability', 'Time to Collision', 'Probability of Successful Evasive Action', 'predicted Post Encroachment Time'] indicatorNameToIndices = utils.inverseEnumeration(indicatorNames) indicatorShortNames = ['CCDP', 'CCAng', 'Dist', 'MinDist', 'VA', 'SD', 'PoC', 'TTC', 'P(SEA)', 'pPET'] def __init__(self, num = None, timeInterval = None, roaduserNum1 = None, roaduserNum2 = None, roadUser1 = None, roadUser2 = None, categoryNum = None): moving.STObject.__init__(self, num, timeInterval) self.roadUserNumbers = set([roaduserNum1, roaduserNum2]) self.roadUser1 = roadUser1 self.roadUser2 = roadUser2 self.categoryNum = categoryNum self.indicators = {} def getRoadUserNumbers(self): return self.roadUserNumbers def getIndicator(self, indicatorName): return self.indicators.get(indicatorName, None) def addIndicator(self, indicator): if indicator: self.indicators[indicator.name] = indicator def computeIndicators(self): '''Computes the collision course cosine only if the cosine is positive''' collisionCourseDotProducts = {}#[0]*int(self.timeInterval.length()) collisionCourseAngles = {} velocityAngles = {} distances = {}#[0]*int(self.timeInterval.length()) speedDifferentials = {} for instant in self.timeInterval: deltap = self.roadUser1.getPositionAtInstant(instant)-self.roadUser2.getPositionAtInstant(instant) v1 = self.roadUser1.getVelocityAtInstant(instant) v2 = self.roadUser2.getVelocityAtInstant(instant) deltav = v2-v1 velocityAngles[instant] = arccos(moving.Point.dot(v1, v2)/(v1.norm2()*v2.norm2())) collisionCourseDotProducts[instant] = moving.Point.dot(deltap, deltav) distances[instant] = deltap.norm2() speedDifferentials[instant] = deltav.norm2() #if collisionCourseDotProducts[instant] > 0: collisionCourseAngles[instant] = arccos(collisionCourseDotProducts[instant]/(distances[instant]*speedDifferentials[instant])) # todo shorten the time intervals based on the interaction definition self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[0], collisionCourseDotProducts)) self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[1], collisionCourseAngles)) self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[2], distances)) self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[4], velocityAngles)) self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[5], speedDifferentials)) # todo test for interaction instants and interval, compute indicators # if we have features, compute other indicators if self.roadUser1.features != None and self.roadUser2.features != None: minDistance={} for instant in self.timeInterval: minDistance[instant] = moving.MovingObject.minDistance(self.roadUser1, self.roadUser2, instant) self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[3], minDistance)) def computeCrossingsCollisions(self, predictionParameters, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None): '''Computes all crossing and collision points at each common instant for two road users. ''' self.collisionPoints={} self.crossingZones={} TTCs = {} if timeInterval: commonTimeInterval = timeInterval else: commonTimeInterval = self.timeInterval for i in list(commonTimeInterval)[:-1]: # do not look at the 1 last position/velocities, often with errors self.collisionPoints[i], self.crossingZones[i] = predictionParameters.computeCrossingsCollisionsAtInstant(i, self.roadUser1, self.roadUser2, collisionDistanceThreshold, timeHorizon, computeCZ, debug) if len(self.collisionPoints[i]) > 0: TTCs[i] = prediction.SafetyPoint.computeExpectedIndicator(self.collisionPoints[i]) # add probability of collision, and probability of successful evasive action self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[7], TTCs)) if computeCZ: pPETs = {} for i in list(commonTimeInterval)[:-1]: if len(self.crossingZones[i]) > 0: pPETs[i] = prediction.SafetyPoint.computeExpectedIndicator(self.crossingZones[i]) self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[9], pPETs)) def addVideoFilename(self,videoFilename): self.videoFilename= videoFilename def addInteractionType(self,interactionType): ''' interaction types: conflict or collision if they are known''' self.interactionType= interactionType def createInteractions(objects): '''Create all interactions of two co-existing road users todo add test to compute categories?''' interactions = [] num = 0 for i in xrange(len(objects)): for j in xrange(i): commonTimeInterval = objects[i].commonTimeInterval(objects[j]) if not commonTimeInterval.empty(): interactions.append(Interaction(num, commonTimeInterval, objects[i].num, objects[j].num, objects[i], objects[j])) num += 1 return interactions # TODO: #http://stackoverflow.com/questions/3288595/multiprocessing-using-pool-map-on-a-function-defined-in-a-class #http://www.rueckstiess.net/research/snippets/show/ca1d7d90 def calculateIndicatorPipe(pairs, predParam, timeHorizon=75,collisionDistanceThreshold=1.8): collisionPoints, crossingZones = prediction.computeCrossingsCollisions(pairs.roadUser1, pairs.roadUser2, predParam, collisionDistanceThreshold, timeHorizon) #print pairs.num # Ignore empty collision points empty = 1 for i in collisionPoints: if(collisionPoints[i] != []): empty = 0 if(empty == 1): pairs.hasCP = 0 else: pairs.hasCP = 1 pairs.CP = collisionPoints # Ignore empty crossing zones empty = 1 for i in crossingZones: if(crossingZones[i] != []): empty = 0 if(empty == 1): pairs.hasCZ = 0 else: pairs.hasCZ = 1 pairs.CZ = crossingZones return pairs def calculateIndicatorPipe_star(a_b): """Convert `f([1,2])` to `f(1,2)` call.""" return calculateIndicatorPipe(*a_b) class VehPairs(): '''Create a veh-pairs object from objects list''' def __init__(self,objects): self.pairs = createInteractions(objects) self.interactionCount = 0 self.CPcount = 0 self.CZcount = 0 # Process indicator calculation with support for multi-threading def calculateIndicators(self,predParam,threads=1,timeHorizon=75,collisionDistanceThreshold=1.8): if(threads > 1): pool = multiprocessing.Pool(threads) self.pairs = pool.map(calculateIndicatorPipe_star, itertools.izip(self.pairs, itertools.repeat(predParam))) pool.close() else: #prog = Tools.ProgressBar(0, len(self.pairs), 77) #Removed in traffic-intelligenc port for j in xrange(len(self.pairs)): #prog.updateAmount(j) #Removed in traffic-intelligenc port collisionPoints, crossingZones = prediction.computeCrossingsCollisions(self.pairs[j].roadUser1, self.pairs[j].roadUser2, predParam, collisionDistanceThreshold, timeHorizon) # Ignore empty collision points empty = 1 for i in collisionPoints: if(collisionPoints[i] != []): empty = 0 if(empty == 1): self.pairs[j].hasCP = 0 else: self.pairs[j].hasCP = 1 self.pairs[j].CP = collisionPoints # Ignore empty crossing zones empty = 1 for i in crossingZones: if(crossingZones[i] != []): empty = 0 if(empty == 1): self.pairs[j].hasCZ = 0 else: self.pairs[j].hasCZ = 1 self.pairs[j].CZ = crossingZones for j in self.pairs: self.interactionCount = self.interactionCount + len(j.CP) self.CPcount = len(self.getCPlist()) self.Czcount = len(self.getCZlist()) def getPairsWCP(self): lists = [] for j in self.pairs: if(j.hasCP): lists.append(j.num) return lists def getPairsWCZ(self): lists = [] for j in self.pairs: if(j.hasCZ): lists.append(j.num) return lists def getCPlist(self,indicatorThreshold=99999): lists = [] for j in self.pairs: if(j.hasCP): for k in j.CP: if(j.CP[k] != [] and j.CP[k][0].indicator < indicatorThreshold): lists.append([k,j.CP[k][0]]) return lists def getCZlist(self,indicatorThreshold=99999): lists = [] for j in self.pairs: if(j.hasCZ): for k in j.CZ: if(j.CZ[k] != [] and j.CZ[k][0].indicator < indicatorThreshold): lists.append([k,j.CZ[k][0]]) return lists def genIndicatorHistogram(self, CPlist=False, bins=range(0,100,1)): if(not CPlist): CPlist = self.getCPlist() if(not CPlist): return False TTC_list = [] for i in CPlist: TTC_list.append(i[1].indicator) histo = np.histogram(TTC_list,bins=bins) histo += (histo[0].astype(float)/np.sum(histo[0]),) return histo class Crossing(moving.STObject): '''Class for the event of a street crossing TODO: detecter passage sur la chaussee identifier origines et destination (ou uniquement chaussee dans FOV) carac traversee detecter proximite veh (retirer si trop similaire simultanement carac interaction''' def __init__(self, roaduserNum = None, num = None, timeInterval = None): moving.STObject.__init__(self, num, timeInterval) self.roaduserNum = roaduserNum if __name__ == "__main__": import doctest import unittest #suite = doctest.DocFileSuite('tests/moving.txt') suite = doctest.DocTestSuite() unittest.TextTestRunner().run(suite)