Mercurial Hosting > traffic-intelligence
changeset 269:a9988971aac8
removed legacy code + tweaks
author | Nicolas Saunier <nicolas.saunier@polymtl.ca> |
---|---|
date | Sun, 29 Jul 2012 04:09:43 -0400 |
parents | 0c0b92f621f6 |
children | 05c9b0cb8202 |
files | python/extrapolation.py python/indicators.py |
diffstat | 2 files changed, 44 insertions(+), 191 deletions(-) [+] |
line wrap: on
line diff
--- a/python/extrapolation.py Sat Jul 28 02:58:47 2012 -0400 +++ b/python/extrapolation.py Sun Jul 29 04:09:43 2012 -0400 @@ -106,6 +106,7 @@ return extrapolatedTrajectories class PointSetExtrapolationParameters(ExtrapolationParameters): + # todo generate several trajectories with normal adaptatoins from each position (feature) def __init__(self, maxSpeed): ExtrapolationParameters.__init__(self, 'point set', maxSpeed) @@ -185,8 +186,10 @@ t += 1 return t, p1, p2 -def computeCrossingsCollisionsAtInstant(i, obj1, obj2, extrapolationParameters, collisionDistanceThreshold, timeHorizon): - '''returns the lists of collision points and crossing zones ''' +def computeCrossingsCollisionsAtInstant(i, obj1, obj2, extrapolationParameters, collisionDistanceThreshold, timeHorizon, debug = False): + '''returns the lists of collision points and crossing zones + + Check: Extrapolating all the points together, as if they represent the whole vehicle''' extrapolatedTrajectories1 = extrapolationParameters.generateExtrapolatedTrajectories(obj1, i) extrapolatedTrajectories2 = extrapolationParameters.generateExtrapolatedTrajectories(obj2, i) @@ -214,6 +217,22 @@ crossingZones.append(SafetyPoint(cz, et1.probability*et2.probability, abs(t1-t2))) t2 += 1 t1 += 1 + + if debug: + from matplotlib.pyplot import figure, axis, title + figure() + for et in extrapolatedTrajectories1: + et.predictPosition(timeHorizon) + et.draw('rx') + + for et in extrapolatedTrajectories2: + et.predictPosition(timeHorizon) + et.draw('bx') + obj1.draw('r') + obj2.draw('b') + title('instant {0}'.format(i)) + axis('equal') + return collisionPoints, crossingZones def computeCrossingsCollisions(obj1, obj2, extrapolationParameters, collisionDistanceThreshold, timeHorizon, outCP, outCZ, debug = False, timeInterval = None): @@ -225,25 +244,10 @@ commonTimeInterval = obj1.commonTimeInterval(obj2) for i in list(commonTimeInterval)[:-1]: # do not look at the 1 last position/velocities, often with errors print(obj1.num, obj2.num, i) - collisionPoints[i], crossingZones[i] = computeCrossingsCollisionsAtInstant(i, obj1, obj2, extrapolationParameters, collisionDistanceThreshold, timeHorizon) + collisionPoints[i], crossingZones[i] = computeCrossingsCollisionsAtInstant(i, obj1, obj2, extrapolationParameters, collisionDistanceThreshold, timeHorizon, debug) SafetyPoint.save(outCP, collisionPoints[i], obj1.num, obj2.num, i) SafetyPoint.save(outCZ, crossingZones[i], obj1.num, obj2.num, i) - if debug: - from matplotlib.pyplot import figure, axis, title - figure() - obj1.draw('r') - obj2.draw('b') - for et in extrapolatedTrajectories1: - et.predictPosition(timeHorizon) - et.draw('rx') - - for et in extrapolatedTrajectories2: - et.predictPosition(timeHorizon) - et.draw('bx') - title('instant {0}'.format(i)) - axis('equal') - return collisionPoints, crossingZones def computeCollisionProbability(obj1, obj2, extrapolationParameters, collisionDistanceThreshold, timeHorizon, out, debug = False, timeInterval = None): @@ -270,8 +274,6 @@ if debug: from matplotlib.pyplot import figure, axis, title figure() - obj1.draw('r') - obj2.draw('b') for et in extrapolatedTrajectories1: et.predictPosition(timeHorizon) et.draw('rx') @@ -279,171 +281,13 @@ for et in extrapolatedTrajectories2: et.predictPosition(timeHorizon) et.draw('bx') + obj1.draw('r') + obj2.draw('b') title('instant {0}'.format(i)) axis('equal') return collisionProbabilities -############### - -# Default values: to remove because we cannot tweak that from a script where the value may be different -FPS= 25 # No. of frame per second (FPS) -vLimit= 25/FPS #assume limit speed is 90km/hr = 25 m/sec -deltaT= FPS*5 # extrapolatation time Horizon = 5 second - -def motion (position, velocity, acceleration): - ''' extrapolation hypothesis: constant acceleration''' - from math import atan2,cos,sin - vInit= velocity - vInitial= velocity.norm2() - theta= atan2(velocity.y,velocity.x) - vFinal= vInitial+acceleration - - if acceleration<= 0: - v= max(0,vFinal) - velocity= moving.Point(v* cos(theta),v* sin(theta)) - position= position+ (velocity+vInit). multiply(0.5) - else: - v= min(vLimit,vFinal) - velocity= moving.Point(v* cos(theta),v* sin(theta)) - position= position+ (velocity+vInit). multiply(0.5) - return(position,velocity) - -def motionPET (position, velocity, acceleration, deltaT): - ''' extrapolation hypothesis: constant acceleration for calculating pPET ''' - from math import atan2,cos,sin,fabs - vInit= velocity - vInitial= velocity.norm2() - theta= atan2(velocity.y,velocity.x) - vFinal= vInitial+acceleration * deltaT - if acceleration< 0: - if vFinal> 0: - velocity= moving.Point(vFinal* cos(theta),vFinal* sin(theta)) - position= position+ (vInit+ velocity). multiply(0.5*deltaT) - else: - T= fabs(vInitial/acceleration) - position= position + vInit. multiply(0.5*T) - elif acceleration> 0 : - if vFinal<= vLimit: - velocity= moving.Point(vFinal* cos(theta),vFinal* sin(theta)) - position= position+ (vInit+ velocity). multiply(0.5*deltaT) - else: - time1= fabs((vLimit-vInitial)/acceleration) - velocity= moving.Point(vLimit* cos(theta),vLimit* sin(theta)) - position= (position+ (velocity+vInit). multiply(0.5*time1)) + (velocity.multiply (deltaT-time1)) - elif acceleration == 0: - position= position + velocity. multiply(deltaT) - - return position - -def timePET (position, velocity, acceleration, intersectedPoint ): - ''' extrapolation hypothesis: constant acceleration for calculating pPET ''' - from math import atan2,cos,sin,fabs - vInit= velocity - vInitial= velocity.norm2() - theta= atan2(velocity.y,velocity.x) - vFinal= vInitial+acceleration * deltaT - if acceleration< 0: - if vFinal> 0: - velocity= moving.Point(vFinal* cos(theta),vFinal* sin(theta)) - time= fabs((intersectedPoint.x-position.x)/(0.5*(vInit.x+ velocity.x))) - else: - time= fabs((intersectedPoint.x-position.x)/(0.5*(vInit.x))) - elif acceleration> 0 : - if vFinal<= vLimit: - velocity= moving.Point(vFinal* cos(theta),vFinal* sin(theta)) - time= fabs((intersectedPoint.x-position.x)/(0.5*(vInit.x+ velocity.x))) - else: - time1= fabs((vLimit-vInitial)/acceleration) - velocity= moving.Point(vLimit* cos(theta),vLimit* sin(theta)) - time2= fabs((intersectedPoint.x-position.x)/(0.5*(vInit.x+ velocity.x))) - if time2<=time1: - time= time2 - else: - position2= (position+ (velocity+vInit). multiply(0.5*time1)) - time= time1+fabs((intersectedPoint.x-position2.x)/( velocity.x)) - elif acceleration == 0: - time= fabs((intersectedPoint.x-position.x)/(velocity.x)) - - return time - -def motionSteering (position, velocity, deltaTheta, deltaT ): - ''' extrapolation hypothesis: steering with deltaTheta''' - from math import atan2,cos,sin - vInitial= velocity.norm2() - theta= atan2(velocity.y,velocity.x) - newTheta= theta + deltaTheta - velocity= moving.Point(vInitial* cos(newTheta),vInitial* sin(newTheta)) - position= position+ (velocity). multiply(deltaT) - return position - -def MonteCarlo(movingObject1,movingObject2, instant): - ''' Monte Carlo Simulation : estimate the probability of collision''' - from random import uniform - from math import pow, sqrt, sin, cos,atan2 - N=1000 - ProbOfCollision = 0 - for n in range (1, N): - # acceleration limit - acc1 = uniform(-0.040444,0) - acc2 = uniform(-0.040444,0) - p1= movingObject1.getPositionAtInstant(instant) - p2= movingObject2.getPositionAtInstant(instant) - v1= movingObject1.getVelocityAtInstant(instant) - v2= movingObject2.getVelocityAtInstant(instant) - distance= (p1-p2).norm2() - distanceThreshold= 1.8 - t=1 - while distance > distanceThreshold and t <= deltaT: - # Extrapolation position - (p1,v1) = motion(p1,v1,acc1) - (p2,v2) = motion(p2,v2,acc2) - distance= (p1-p2).norm2() - if distance <=distanceThreshold: - ProbOfCollision= ProbOfCollision+1 - t+=1 - POC= float(ProbOfCollision)/N - return POC - -def velocitySteering(velocity,steering): - from math import atan2,cos,sin - vInitial= velocity.norm2() - theta= atan2(velocity.y,velocity.x) - newTheta= theta + steering - v= moving.Point(vInitial* cos(newTheta),vInitial* sin(newTheta)) - return v - -def MonteCarloSteering(movingObject1,movingObject2, instant,steering1,steering2): - ''' Monte Carlo Simulation : estimate the probability of collision in case of steering''' - from random import uniform - from math import pow, sqrt, sin, cos,atan2 - N=1000 - L= 2.4 - ProbOfCollision = 0 - for n in range (1, N): - # acceleration limit - acc1 = uniform(-0.040444,0) - acc2 = uniform(-0.040444,0) - p1= movingObject1.getPositionAtInstant(instant) - p2= movingObject2.getPositionAtInstant(instant) - vInit1= movingObject1.getVelocityAtInstant(instant) - v1= velocitySteering (vInit1,steering1) - vInit2= movingObject2.getVelocityAtInstant(instant) - v2= velocitySteering (vInit2,steering2) - distance= (p1-p2).norm2() - distanceThreshold= 1.8 - t=1 - while distance > distanceThreshold and t <= deltaT: - # Extrapolation position - (p1,v1) = motion(p1,v1,acc1) - (p2,v2) = motion(p2,v2,acc2) - distance= (p1-p2).norm2() - if distance <=distanceThreshold: - ProbOfCollision= ProbOfCollision+1 - t+=1 - POC= float(ProbOfCollision)/N - return POC - if __name__ == "__main__": import doctest
--- a/python/indicators.py Sat Jul 28 02:58:47 2012 -0400 +++ b/python/indicators.py Sun Jul 29 04:09:43 2012 -0400 @@ -3,6 +3,8 @@ __metaclass__ = type +import moving + # need for a class representing the indicators, their units, how to print them in graphs... class TemporalIndicator: '''Class for temporal indicators @@ -14,13 +16,14 @@ it should have more information like name, unit''' - def __init__(self, name, values, timeInterval=None): + def __init__(self, name, values, timeInterval=None, maxValue = None): self.name = name self.isCosine = name.find('Cosine') self.values = values self.timeInterval = timeInterval if timeInterval: assert len(values) == timeInterval.length() + self.maxValue = maxValue def empty(self): return len(self.values) == 0 @@ -53,9 +56,9 @@ if not self.timeInterval and type(self.values)==dict: instants = self.values.keys() if instants: - self.timeInterval = TimeInterval(instants[0], instants[-1]) + self.timeInterval = moving.TimeInterval(instants[0], instants[-1]) else: - self.timeInterval = TimeInterval() + self.timeInterval = moving.TimeInterval() return self.timeInterval def getValues(self): @@ -75,21 +78,27 @@ else: return values - def plot(self, options = '', **kwargs): - from matplotlib.pylab import plot - if not self.timeInterval and type(self.values)==dict: + def plot(self, options = '', xfactor = 1., **kwargs): + from matplotlib.pylab import plot,ylim + if self.getTimeInterval().length() == 1: + marker = 'o' + else: + marker = '' + if not self.timeInterval or type(self.values)==dict: time = sorted(self.values.keys()) - plot(time, [self.values[i] for i in time], options, **kwargs) + plot([x/xfactor for x in time], [self.values[i] for i in time], options+marker, **kwargs) else: - plot(list(getTimeInterval()), self.values, options, **kwargs) - + plot([x/xfactor for x in list(self.getTimeInterval())], self.values, options+marker, **kwargs) + if self.maxValue: + ylim(ymax = self.maxValue) + class SeverityIndicator(TemporalIndicator): '''Class for severity indicators field mostSevereIsMax is True if the most severe value taken by the indicator is the maximum''' - def __init__(self, name, values, timeInterval=None, mostSevereIsMax=True, ignoredValue = None): - TemporalIndicator.__init__(self, name, values, timeInterval) + def __init__(self, name, values, timeInterval=None, mostSevereIsMax=True, ignoredValue = None, maxValue = None): + TemporalIndicator.__init__(self, name, values, timeInterval, maxValue) self.mostSevereIsMax = mostSevereIsMax self.ignoredValue = ignoredValue