Mercurial Hosting > traffic-intelligence
changeset 1146:b219d5a1bb55
added code to categorize interactions
author | Nicolas Saunier <nicolas.saunier@polymtl.ca> |
---|---|
date | Wed, 29 Apr 2020 01:09:55 -0400 |
parents | 66f063ca2d24 |
children | 8c0ec7e1eb8e |
files | trafficintelligence/events.py trafficintelligence/moving.py |
diffstat | 2 files changed, 38 insertions(+), 11 deletions(-) [+] |
line wrap: on
line diff
--- a/trafficintelligence/events.py Tue Apr 28 10:17:49 2020 -0400 +++ b/trafficintelligence/events.py Wed Apr 29 01:09:55 2020 -0400 @@ -94,7 +94,7 @@ timeIndicators = ['Time to Collision', 'predicted Post Encroachment Time'] - def __init__(self, num = None, timeInterval = None, roaduserNum1 = None, roaduserNum2 = None, roadUser1 = None, roadUser2 = None, categoryNum = None): + def __init__(self, num = None, timeInterval = None, roaduserNum1 = None, roaduserNum2 = None, roadUser1 = None, roadUser2 = None): moving.STObject.__init__(self, num, timeInterval) if timeInterval is None and roadUser1 is not None and roadUser2 is not None: self.timeInterval = roadUser1.commonTimeInterval(roadUser2) @@ -106,7 +106,6 @@ self.roadUserNumbers = set([roadUser1.getNum(), roadUser2.getNum()]) else: self.roadUserNumbers = None - self.categoryNum = categoryNum self.indicators = {} self.interactionInterval = None # list for collison points and crossing zones @@ -157,9 +156,13 @@ values[k] = indicator[instant] return values - def plot(self, options = '', withOrigin = False, timeStep = 1, withFeatures = False, **kwargs): - self.roadUser1.plot(options, withOrigin, timeStep, withFeatures, **kwargs) - self.roadUser2.plot(options, withOrigin, timeStep, withFeatures, **kwargs) + def plot(self, options = '', withOrigin = False, timeStep = 1, withFeatures = False, restricted = True, **kwargs): + if restricted: + self.roadUser1.getObjectInTimeInterval(self.timeInterval).plot(options, withOrigin, timeStep, withFeatures, **kwargs) + self.roadUser2.getObjectInTimeInterval(self.timeInterval).plot(options, withOrigin, timeStep, withFeatures, **kwargs) + else: + self.roadUser1.plot(options, withOrigin, timeStep, withFeatures, **kwargs) + self.roadUser2.plot(options, withOrigin, timeStep, withFeatures, **kwargs) def plotOnWorldImage(self, nPixelsPerUnitDistance, options = '', withOrigin = False, timeStep = 1, **kwargs): self.roadUser1.plotOnWorldImage(nPixelsPerUnitDistance, options, withOrigin, timeStep, **kwargs) @@ -179,10 +182,10 @@ def computeIndicators(self): '''Computes the collision course cosine only if the cosine is positive''' - collisionCourseDotProducts = {}#[0]*int(self.timeInterval.length()) + collisionCourseDotProducts = {} collisionCourseAngles = {} velocityAngles = {} - distances = {}#[0]*int(self.timeInterval.length()) + distances = {} speedDifferentials = {} interactionInstants = [] for instant in self.timeInterval: @@ -216,8 +219,32 @@ minDistances[instant] = moving.MovingObject.minDistance(self.roadUser1, self.roadUser2, instant) self.addIndicator(indicators.SeverityIndicator(Interaction.indicatorNames[3], minDistances, mostSevereIsMax = False)) - def categorize(self): - '''Computes the interaction category by instant ''' + # categories = {'headon': 0, + # 'rearend': 1, + # 'side': 2, + # 'parallel': 3} + + def categorize(self, velocityAngleTolerance, parallelAngleTolerance): + '''Computes the interaction category by instant + velocityAngleTolerance and parallelAngleTolerance in radian''' + parallelAngleToleranceCosine = np.cos(parallelAngleTolerance) + self.categories = {} + collisionCourseDotProducts = self.getIndicator(Interaction.indicatorNames[0]) + velocityAngles = self.getIndicator(Interaction.indicatorNames[4]) + for instant in self.timeInterval: + if velocityAngles[instant] < velocityAngleTolerance: # parallel or rear end + midVelocity = self.roadUser1.getVelocityAtInstant(instant) + self.roadUser2.getVelocityAtInstant(instant) + deltap = self.roadUser1.getPositionAtInstant(instant)-self.roadUser2.getPositionAtInstant(instant) + if moving.Point.dot(midVelocity, deltap) < parallelAngleToleranceCosine: + self.categories[instant] = Interaction.categories["parallel"] + else: + self.categories[instant] = Interaction.categories["rearend"] + elif velocityAngles[instant] > np.pi - velocityAngleTolerance: # head on + if collisionCourseDotProducts[instant] > 0: + self.categories[instant] = Interaction.categories["headon"] + else: + if collisionCourseDotProducts[instant] > 0: + self.categories[instant] = Interaction.categories["side"] def computeCrossingsCollisions(self, predictionParameters, collisionDistanceThreshold, timeHorizon, computeCZ = False, debug = False, timeInterval = None): '''Computes all crossing and collision points at each common instant for two road users. '''
--- a/trafficintelligence/moving.py Tue Apr 28 10:17:49 2020 -0400 +++ b/trafficintelligence/moving.py Wed Apr 29 01:09:55 2020 -0400 @@ -1371,9 +1371,9 @@ intersection = TimeInterval.intersection(inter, self.getTimeInterval()) if not intersection.empty(): trajectoryInterval = TimeInterval(intersection.first-self.getFirstInstant(), intersection.last-self.getFirstInstant()) - obj = MovingObject(self.num, intersection, self.positions.getTrajectoryInInterval(trajectoryInterval), self.geometry, self.userType, self.nObjects) + obj = MovingObject(self.num, intersection, self.positions.subTrajectoryInInterval(trajectoryInterval), self.geometry, self.userType, self.nObjects) if self.velocities is not None: - obj.velocities = self.velocities.getTrajectoryInInterval(trajectoryInterval) + obj.velocities = self.velocities.subTrajectoryInInterval(trajectoryInterval) return obj else: print('The object does not exist at {}'.format(inter))