Mercurial Hosting > traffic-intelligence
view python/event.py @ 291:9f81218e497a
class VehPairs subsumes createInteractions(objects); legacy code remains
author | Paul@BEAST-III |
---|---|
date | Tue, 05 Feb 2013 15:45:33 -0500 |
parents | dbe7e53334d7 |
children | 8b2c8a4015f1 |
line wrap: on
line source
#! /usr/bin/env python '''Libraries for events Interactions, pedestrian crossing...''' #import utils; import moving import prediction __metaclass__ = type ## Subsumes createInteractions(objects) with a VehPair Object class VehPairs(): def __init__(self,objects): ''' Create all pairs of two co-existing road users TODO: add test to compute categories? ''' #print(' -------------') #Disabled for traffic-intelligence trunk #print(' Generating vehicle pairs...') #Disabled for traffic-intelligence trunk self.pairs = [] num = 0 for i in xrange(len(objects)): for j in xrange(i): commonTimeInterval = objects[i].commonTimeInterval(objects[j]) if not commonTimeInterval.empty(): self.pairs.append(event.Interaction(num, commonTimeInterval, objects[i].num, objects[j].num, objects[i], objects[j])) num += 1 def calculateIndicators(self,predParam,frameRate=15,collisionDistanceThreshold=1.8,timeHorizonMultiplier=5): #print(' -------------') #Disabled for traffic-intelligence trunk #print(' Calculating time-to-collision...') #Disabled for traffic-intelligence trunk timeHorizon = frameRate*timeHorizonMultiplier # prediction time Horizon = 1.5 s (reaction time) (5 second) for params in [predParam]: #prog = Tools.ProgressBar(0, len(self.pairs), 77) #Disabled for traffic-intelligence trunk (PVA Tools dependancy) for j in xrange(len(self.pairs)): #prog.updateAmount(j) #Disabled for traffic-intelligence trunk (PVA Tools dependancy) collisionPoints, crossingZones = prediction.computeCrossingsCollisions(self.pairs[j].movingObject1, self.pairs[j].movingObject2, predParam, collisionDistanceThreshold, timeHorizon) # Ignore empty collision points empty = 1 for i in collisionPoints: if(collisionPoints[i] != []): empty = 0 if(empty == 1): self.pairs[j].hasCP = 0 else: self.pairs[j].hasCP = 1 self.pairs[j].CP = collisionPoints # Ignore empty crossing zones empty = 1 for i in crossingZones: if(crossingZones[i] != []): empty = 0 if(empty == 1): self.pairs[j].hasCZ = 0 else: self.pairs[j].hasCZ = 1 self.pairs[j].CZ = crossingZones return def getPairsWCP(self): lists = [] for j in self.pairs: if(j.hasCP): lists.append(j.num) return lists def getPairsWCZ(self): lists = [] for j in self.pairs: if(j.hasCZ): lists.append(j.num) return lists def getCPlist(self): lists = [] for j in self.pairs: if(j.hasCP): for k in j.CP: if(j.CP[k] != []): lists.append(j.CP[k]) return lists def getCZlist(self): lists = [] for j in self.pairs: if(j.hasCZ): for k in j.CZ: if(j.CZ[k] != []): lists.append(j.CZ[k]) return lists class Interaction(moving.STObject): '''Class for an interaction between two road users or a road user and an obstacle link to the moving objects ''' categories = {'headon': 0, 'rearend': 1, 'side': 2, 'parallel': 3} def __init__(self, num = None, timeInterval = None, roaduserNum1 = None, roaduserNum2 = None, movingObject1 = None, movingObject2 = None, categoryNum = None): moving.STObject.__init__(self, num, timeInterval) self.roaduserNumbers = set([roaduserNum1, roaduserNum2]) self.movingObject1 = movingObject1 self.movingObject2 = movingObject2 self.categoryNum = categoryNum def getIndicator(self, indicatorName): if hasattr(self, 'indicators'): for i in self.indicators: if i.name == indicatorName: return i else: return None def computeIndicators(self): '''Computes the collision course cosine only if the cosine is positive''' collisionCourseDotProduct = [0]*int(self.timeInterval.length()) collisionCourseCosine = {} distances = [0]*int(self.timeInterval.length()) for i,instant in enumerate(self.timeInterval): deltap = self.movingObject1.getPositionAtInstant(instant)-self.movingObject2.getPositionAtInstant(instant) deltav = self.movingObject2.getVelocityAtInstant(instant)-self.movingObject1.getVelocityAtInstant(instant) collisionCourseDotProduct[i] = moving.Point.dot(deltap, deltav) distances[i] = deltap.norm2() if collisionCourseDotProduct[i] > 0: collisionCourseCosine[instant] = collisionCourseDotProduct[i]/(distances[i]*deltav.norm2()) self.indicators = [moving.SeverityIndicator('Collision Course Dot Product', collisionCourseDotProduct, self.timeInterval), moving.SeverityIndicator('Distances', distances, self.timeInterval), moving.SeverityIndicator('Collision Course Cosine', collisionCourseCosine)] ######====>BEGIN LEGACY CODE def createInteractions(objects): '''Create all interactions of two co-existing road users todo add test to compute categories?''' interactions = [] num = 0 for i in xrange(len(objects)): for j in xrange(i): commonTimeInterval = objects[i].commonTimeInterval(objects[j]) if not commonTimeInterval.empty(): interactions.append(Interaction(num, commonTimeInterval, objects[i].num, objects[j].num, objects[i], objects[j])) num += 1 return interactions #<====END LEGACY CODE##### class Crossing(moving.STObject): '''Class for the event of a street crossing TODO: detecter passage sur la chaussee identifier origines et destination (ou uniquement chaussee dans FOV) carac traversee detecter proximite veh (retirer si trop similaire simultanement carac interaction''' def __init__(self, roaduserNum = None, num = None, timeInterval = None): moving.STObject.__init__(self, num, timeInterval) self.roaduserNum = roaduserNum if __name__ == "__main__": import doctest import unittest #suite = doctest.DocFileSuite('tests/moving.txt') suite = doctest.DocTestSuite() unittest.TextTestRunner().run(suite)