Mercurial Hosting > traffic-intelligence
view python/events.py @ 601:e1f3b789c632
add a definition of interaction and collision course intervals
author | Mohamed Gomaa |
---|---|
date | Thu, 02 May 2013 11:35:45 -0400 |
parents | 414b2e7cd873 |
children |
line wrap: on
line source
#! /usr/bin/env python '''Libraries for events Interactions, pedestrian crossing...''' import numpy as np from numpy import arccos import multiprocessing import itertools import moving import prediction import indicators __metaclass__ = type class Interaction(moving.STObject): '''Class for an interaction between two road users or a road user and an obstacle link to the moving objects contains the indicators in a dictionary with the names as keys ''' categories = {'headon': 0, 'rearend': 1, 'side': 2, 'parallel': 3} def __init__(self, num = None, timeInterval = None, roaduserNum1 = None, roaduserNum2 = None, movingObject1 = None, movingObject2 = None, categoryNum = None): moving.STObject.__init__(self, num, timeInterval) self.roaduserNumbers = set([roaduserNum1, roaduserNum2]) self.movingObject1 = movingObject1 self.movingObject2 = movingObject2 self.categoryNum = categoryNum self.indicators = {} def getIndicator(self, indicatorName): return self.indicators[indicatorName] def addIndicator(self, indicator): self.indicators[indicator.name] = indicator def computeIndicators(self): '''Computes the collision course cosine only if the cosine is positive''' collisionCourseDotProducts = {}#[0]*int(self.timeInterval.length()) collisionCourseAngles = {} distances = {}#[0]*int(self.timeInterval.length()) speedDifferentials = {} velocityAngle= {} for instant in self.timeInterval: deltap = self.movingObject1.getPositionAtInstant(instant)-self.movingObject2.getPositionAtInstant(instant) deltav = self.movingObject2.getVelocityAtInstant(instant)-self.movingObject1.getVelocityAtInstant(instant) collisionCourseDotProducts[instant] = moving.Point.dot(deltap, deltav) velocityDotProduct= moving.Point.dot(self.movingObject1.getVelocityAtInstant(instant),self.movingObject2.getVelocityAtInstant(instant)) velocityAngle[instant]= np.arccos(velocityDotProduct/ (self.movingObject1.getVelocityAtInstant(instant).norm2() * self.movingObject2.getVelocityAtInstant(instant).norm2())) distances[instant] = deltap.norm2() speedDifferentials[instant] = deltav.norm2() if collisionCourseDotProducts[instant] > 0: collisionCourseAngles[instant] = arccos(collisionCourseDotProducts[instant]/(distances[instant]*speedDifferentials[instant])) # todo shorten the time intervals based on the interaction definition self.addIndicator(indicators.SeverityIndicator('Collision Course Dot Product', collisionCourseDotProducts)) self.addIndicator(indicators.SeverityIndicator('Distance', distances)) self.addIndicator(indicators.SeverityIndicator('Speed Differential', speedDifferentials)) self.addIndicator(indicators.SeverityIndicator('Velocity Angle', velocityAngle)) self.addIndicator(indicators.SeverityIndicator('Collision Course Angle', collisionCourseAngles)) # todo test for interaction instants and interval, compute indicators # if we have features, compute other indicators if self.movingObject1.features and self.movingObject2.features: from scipy.spatial.distance import cdist minDistance={} for instant in self.timeInterval: positions1 = [f.getPositionAtInstant(instant).astuple() for f in self.movingObject1.features if f.existsAtInstant(instant)] positions2 = [f.getPositionAtInstant(instant).astuple() for f in self.movingObject2.features if f.existsAtInstant(instant)] distance = cdist(positions1, positions2, metric = 'euclidean') minDistance[instant] = distance.min() self.addIndicator(indicators.SeverityIndicator('Minimum Distance', minDistance)) def computeCollisionPoints(self, predictionParameters, collisionDistanceThreshold, timeHorizon): if self.movingObject1.features and self.movingObject2.features: collisionPoints,crossingZones = prediction.computeCrossingsCollisions(self.movingObject1, self.movingObject2, predictionParameters, collisionDistanceThreshold, timeHorizon,asWholeVehicle=True) self.addIndicator(indicators.SeverityIndicator('collisionPoints', collisionPoints)) else: print('Features not associated with objects') def defineInteractionInterval(self): ''' interaction defined as the first and last time that the collion course is positive''' keys=sorted(self.getIndicator('Collision Course Angle').values.keys()) if keys!=[]: self.interactionInterval= moving.TimeInterval(keys[0],keys[-1]) else: print('no interaction') self.interactionInterval=None def defineCollisionPointInterval(self): ''' collision point defined as the first and last time that the collion point exist''' values= self.getIndicator('TTC').values keys=sorted(values.keys()) #keysReverse=sorted(values.keys(),reverse=True) keysWithoutinf= [key for key in keys if values[key]!=np.inf] if keysWithoutinf!=[]: self.collisionPointInterval= moving.TimeInterval(min(keysWithoutinf),max(keysWithoutinf)) else: print('no collision points') self.collisionPointInterval=None def removeIndicatorExtraValues(self,indicatorName): ''' refine the indicators value wrt interaction definition''' self.defineInteractionInterval() refinedIndicator = {} if self.interactionInterval!= None: for i in xrange(self.interactionInterval.first,self.interactionInterval.last+1): if self.getIndicator(indicatorName).values.get(i)== None: refinedIndicator[i]= None else: refinedIndicator[i]= self.getIndicator(indicatorName).values[i] self.addIndicator(indicators.SeverityIndicator(str(indicatorName)+' New', refinedIndicator)) def removeIndicatorExtraValuesCP(self,indicatorName): ''' refine the indicators value wrt interaction definition''' self.defineCollisionPointInterval() refinedIndicator = {} if self.collisionPointInterval!= None: for i in xrange(self.collisionPointInterval.first,self.collisionPointInterval.last+1): if self.getIndicator(indicatorName).values.get(i)== None: refinedIndicator[i]= None else: refinedIndicator[i]= self.getIndicator(indicatorName).values[i] self.addIndicator(indicators.SeverityIndicator(str(indicatorName)+'2', refinedIndicator)) def addVideoFilename(self,videoFilename): self.videoFilename= videoFilename def addInteractionType(self,interactionType): ''' interaction types: conflict or collision if they are known''' self.interactionType= interactionType def createInteractions(objects): '''Create all interactions of two co-existing road users todo add test to compute categories?''' interactions = [] num = 0 for i in xrange(len(objects)): for j in xrange(i): commonTimeInterval = objects[i].commonTimeInterval(objects[j]) if not commonTimeInterval.empty(): interactions.append(Interaction(num, commonTimeInterval, objects[i].num, objects[j].num, objects[i], objects[j])) num += 1 return interactions # TODO: #http://stackoverflow.com/questions/3288595/multiprocessing-using-pool-map-on-a-function-defined-in-a-class #http://www.rueckstiess.net/research/snippets/show/ca1d7d90 def calculateIndicatorPipe(pairs, predParam, timeHorizon=75,collisionDistanceThreshold=1.8): collisionPoints, crossingZones = prediction.computeCrossingsCollisions(pairs.movingObject1, pairs.movingObject2, predParam, collisionDistanceThreshold, timeHorizon) #print pairs.num # Ignore empty collision points empty = 1 for i in collisionPoints: if(collisionPoints[i] != []): empty = 0 if(empty == 1): pairs.hasCP = 0 else: pairs.hasCP = 1 pairs.CP = collisionPoints # Ignore empty crossing zones empty = 1 for i in crossingZones: if(crossingZones[i] != []): empty = 0 if(empty == 1): pairs.hasCZ = 0 else: pairs.hasCZ = 1 pairs.CZ = crossingZones return pairs def calculateIndicatorPipe_star(a_b): """Convert `f([1,2])` to `f(1,2)` call.""" return calculateIndicatorPipe(*a_b) class VehPairs(): '''Create a veh-pairs object from objects list''' def __init__(self,objects): self.pairs = createInteractions(objects) self.interactionCount = 0 self.CPcount = 0 self.CZcount = 0 # Process indicator calculation with support for multi-threading def calculateIndicators(self,predParam,threads=1,timeHorizon=75,collisionDistanceThreshold=1.8): if(threads > 1): pool = multiprocessing.Pool(threads) self.pairs = pool.map(calculateIndicatorPipe_star, itertools.izip(self.pairs, itertools.repeat(predParam))) pool.close() else: #prog = Tools.ProgressBar(0, len(self.pairs), 77) #Removed in traffic-intelligenc port for j in xrange(len(self.pairs)): #prog.updateAmount(j) #Removed in traffic-intelligenc port collisionPoints, crossingZones = prediction.computeCrossingsCollisions(self.pairs[j].movingObject1, self.pairs[j].movingObject2, predParam, collisionDistanceThreshold, timeHorizon) # Ignore empty collision points empty = 1 for i in collisionPoints: if(collisionPoints[i] != []): empty = 0 if(empty == 1): self.pairs[j].hasCP = 0 else: self.pairs[j].hasCP = 1 self.pairs[j].CP = collisionPoints # Ignore empty crossing zones empty = 1 for i in crossingZones: if(crossingZones[i] != []): empty = 0 if(empty == 1): self.pairs[j].hasCZ = 0 else: self.pairs[j].hasCZ = 1 self.pairs[j].CZ = crossingZones for j in self.pairs: self.interactionCount = self.interactionCount + len(j.CP) self.CPcount = len(self.getCPlist()) self.Czcount = len(self.getCZlist()) def getPairsWCP(self): lists = [] for j in self.pairs: if(j.hasCP): lists.append(j.num) return lists def getPairsWCZ(self): lists = [] for j in self.pairs: if(j.hasCZ): lists.append(j.num) return lists def getCPlist(self,indicatorThreshold=99999): lists = [] for j in self.pairs: if(j.hasCP): for k in j.CP: if(j.CP[k] != [] and j.CP[k][0].indicator < indicatorThreshold): lists.append([k,j.CP[k][0]]) return lists def getCZlist(self,indicatorThreshold=99999): lists = [] for j in self.pairs: if(j.hasCZ): for k in j.CZ: if(j.CZ[k] != [] and j.CZ[k][0].indicator < indicatorThreshold): lists.append([k,j.CZ[k][0]]) return lists def genIndicatorHistogram(self, CPlist=False, bins=range(0,100,1)): if(not CPlist): CPlist = self.getCPlist() if(not CPlist): return False TTC_list = [] for i in CPlist: TTC_list.append(i[1].indicator) histo = np.histogram(TTC_list,bins=bins) histo += (histo[0].astype(float)/np.sum(histo[0]),) return histo class Crossing(moving.STObject): '''Class for the event of a street crossing TODO: detecter passage sur la chaussee identifier origines et destination (ou uniquement chaussee dans FOV) carac traversee detecter proximite veh (retirer si trop similaire simultanement carac interaction''' def __init__(self, roaduserNum = None, num = None, timeInterval = None): moving.STObject.__init__(self, num, timeInterval) self.roaduserNum = roaduserNum if __name__ == "__main__": import doctest import unittest #suite = doctest.DocFileSuite('tests/moving.txt') suite = doctest.DocTestSuite() unittest.TextTestRunner().run(suite)