Mercurial Hosting > traffic-intelligence
changeset 1097:b3f8b26ee838
modification for simulation
author | Nicolas Saunier <nicolas.saunier@polymtl.ca> |
---|---|
date | Mon, 18 Feb 2019 17:23:26 -0500 |
parents | 9a32d63bae3f |
children | 469e36eea158 |
files | trafficintelligence/moving.py trafficintelligence/tests/moving.txt |
diffstat | 2 files changed, 107 insertions(+), 35 deletions(-) [+] |
line wrap: on
line diff
--- a/trafficintelligence/moving.py Fri Feb 15 14:35:56 2019 -0500 +++ b/trafficintelligence/moving.py Mon Feb 18 17:23:26 2019 -0500 @@ -4,7 +4,7 @@ import copy from math import sqrt, atan2, cos, sin -from numpy import median, mean, array, arange, zeros, ones, hypot, NaN, std, floor, float32, argwhere, minimum +from numpy import median, mean, array, arange, zeros, ones, hypot, NaN, std, floor, ceil, float32, argwhere, minimum from matplotlib.pyplot import plot, text from scipy.stats import scoreatpercentile from scipy.spatial.distance import cdist @@ -1116,6 +1116,15 @@ def getLanes(self): return self.lanes + def getSCoordAt(self, i): + return self.positions[0][i] + + def getYCoordAt(self, i): + return self.positions[1][i] + + def getLaneAt(self, i): + return self.positions[2][i] + def addPositionSYL(self, s, y, lane = None): self.addPositionXY(s,y) self.lanes.append(lane) @@ -1178,10 +1187,14 @@ and a usertype (e.g. road user) coded as a number (see userTypeNames) ''' - def __init__(self, num = None, timeInterval = None, positions = None, velocities = None, geometry = None, userType = userType2Num['unknown'], nObjects = None): + def __init__(self, num = None, timeInterval = None, positions = None, velocities = None, geometry = None, userType = userType2Num['unknown'], nObjects = None, initCurvilinear = False): super(MovingObject, self).__init__(num, timeInterval) - self.positions = positions - self.velocities = velocities + if initCurvilinear: + self.curvilinearPositions = positions + self.curvilinearVelocities = velocities + else: + self.positions = positions + self.velocities = velocities self.geometry = geometry self.userType = userType self.setNObjects(nObjects) # a feature has None for nObjects @@ -1240,45 +1253,66 @@ def updatePositions(self): inter, self.positions, self.velocities = MovingObject.aggregateTrajectories(self.features, self.getTimeInterval()) - def updateCurvilinearPositions(self, method, changeOfAlignment, nextAlignment_idx, timeStep = None, time = None, - leaderVehicle = None, previousAlignmentId = None, - maxSpeed = None, acceleration = None): + def updateCurvilinearPositions(self, method, changeOfAlignment, nextAlignment_idx, timeStep = None, instant = None, previousAlignmentId = None, maxSpeed = None, acceleration = None): if method == 'newell': - if time <= self.timeInterval[0] < time + timeStep: - # #si t < instant de creation du vehicule, la position vaut l'espacement dn entre les deux vehicules - if leaderVehicle is None: - self.curvilinearPositions.addPositionSYL( - -self.dn, - 0, - nextAlignment_idx) + if self.curvilinearPositions is None: # vehicle without positions, all vehicles should have leader (?) + if self.leader.curvilinearPositions is not None and self.leader.curvilinearPositions.getSCoordAt(-1) > self.dn and len(self.leader.curvilinearPositions) >=2: + leaderSpeed = self.leader.curvilinearVelocities.getSCoordAt(-1) + instantAtX0 = self.tau + instant*timeStep - (self.leader.curvilinearPositions.getSCoordAt(-1)-self.d)/leaderSpeed + if instantAtX0 < obj.initialHeadway: #obj appears at instant initialHeadway at x=0 with desiredSpeed + instantAtX0 = obj.initialHeadway + + firstInstant = int(np.ceil(instantAtX0/timeStep)) + self.timeInterval = TimeInterval(firstInstant, firstInstant) + freeFlowCoord = (firstInstant*timeStep - instantAtX0)*self.desiredSpeed + # constrainedCoord at firstInstant = xn-1(t = firstInstant*timeStep-self.tn)-self.dn + t = firstInstant*timeStep-self.tn + i = int(floor(t/timeStep)) + leaderSpeed = self.leader.getCurvilinearVelocityAtInstant(i)[0] + constrainedCoord = self.leader.getCurvilinearPositionAtInstant(i)[0]+leaderSpeed*(t-i*timeStep)-self.dn + obj.curvilinearPositions = moving.CurvilinearTrajectory([min(freeFlowCoord, constrainedCoord)], [0.], [nextAlignment_idx])# TODO verify initial alignment index + obj.curvilinearVelocities = moving.CurvilinearTrajectory() + for i in range(firstInstant+1, instant+1): + freeFlowCoord = timeStep*self.desiredSpeed + self.curvilinearPositions.getSCoordAt(-1) + #constrainedCoord = + + # if instant <= self.timeInterval[0] < instant + timeStep: + # # #si t < instant de creation du vehicule, la position vaut l'espacement dn entre les deux vehicules + # if leaderVehicle is None: + # self.curvilinearPositions.addPositionSYL( + # -self.dn, + # 0, + # nextAlignment_idx) + # else: + # self.curvilinearPositions.addPositionSYL( + # leaderVehicle.curvilinearPositions[0][0] - self.dn, + # 0, + # nextAlignment_idx) + else: + freeFlowCoord = [self.curvilinearPositions[-1][0]+self.desiredSpeed*timeStep, 0, nextAlignment_idx] + if self.leader is None: + self.curvilinearPositions.addPositionSYL(*freeFlowCoord) else: - self.curvilinearPositions.addPositionSYL( - leaderVehicle.curvilinearPositions[0][0] - self.dn, - 0, - nextAlignment_idx) - else: - if leaderVehicle is None: - self.curvilinearPositions.addPositionSYL(self.curvilinearPositions[-1][0]+self.desiredSpeed*timeStep, 0, nextAlignment_idx) - else: - if time > self.reactionTime: - previousVehicleCurvilinearPositionAtPrecedentTime = \ - leaderVehicle.curvilinearPositions[-int(round(self.reactionTime))][0] # t-v.reactionTime + # if leader coord unknown at t-dn, no movement + if instant > self.reactionTime: + previousVehicleCurvilinearPositionAtPrecedentInstant = leaderVehicle.curvilinearPositions[-int(round(self.reactionTime))][0] # t-v.reactionTime else: - previousVehicleCurvilinearPositionAtPrecedentTime = \ + previousVehicleCurvilinearPositionAtPrecedentInstant = \ leaderVehicle.curvilinearPositions[0][0] - self.curvilinearPositions.addPositionSYL(previousVehicleCurvilinearPositionAtPrecedentTime - self.dn, 0, nextAlignment_idx) + self.curvilinearPositions.addPositionSYL(previousVehicleCurvilinearPositionAtPrecedentInstant - self.dn, 0, nextAlignment_idx) #mise ajour des vitesses if changeOfAlignment: - self.velocities.addPositionSYL((self.curvilinearPositions[-1][0]-self.curvilinearPositions[-2][0])/timeStep, - (self.curvilinearPositions[-1][1]-self.curvilinearPositions[-2][1])/timeStep, - (previousAlignmentId, nextAlignment_idx)) + self.curvilinearVelocities.addPositionSYL((self.curvilinearPositions[-1][0]-self.curvilinearPositions[-2][0])/timeStep, + (self.curvilinearPositions[-1][1]-self.curvilinearPositions[-2][1])/timeStep, + (previousAlignmentId, nextAlignment_idx)) else: - self.velocities.addPositionSYL((self.curvilinearPositions[-1][0]-self.curvilinearPositions[-2][0])/timeStep, - (self.curvilinearPositions[-1][1]-self.curvilinearPositions[-2][1])/timeStep, - None) + self.curvilinearVelocities.addPositionSYL((self.curvilinearPositions[-1][0]-self.curvilinearPositions[-2][0])/timeStep, + (self.curvilinearPositions[-1][1]-self.curvilinearPositions[-2][1])/timeStep, + None) + @staticmethod def concatenate(obj1, obj2, num = None, newFeatureNum = None, computePositions = False): '''Concatenates two objects, whether overlapping temporally or not @@ -1429,6 +1463,24 @@ else: print('Object {} has no curvilinear positions'.format(self.getNum())) + def interpolateCurvilinearPositions(self, t): + '''Linear interpolation of curvilinear positions, t being a float''' + if hasattr(self, 'curvilinearPositions'): + if self.existsAtInstant(t): + i = int(floor(t)) + p1 = self.getCurvilinearPositionAtInstant(i) + p2 = self.getCurvilinearPositionAtInstant(i+1) + alpha = t-float(i) + if alpha < 0.5: + lane = p1[2] + else: + lane = p2[2] + return [(1-alpha)*p1[0]+alpha*p2[0], (1-alpha)*p1[1]+alpha*p2[1], lane] + else: + print('Object {} does not exist at {}'.format(self.getNum(), t)) + else: + print('Object {} has no curvilinear positions'.format(self.getNum())) + def setUserType(self, userType): self.userType = userType @@ -1517,6 +1569,18 @@ def getVelocityAtInstant(self, i): return self.velocities[i-self.getFirstInstant()] + def getCurvilinearPositionAt(self, i): + return self.curvilinearPositions[i] + + def getCurvilinearVelocityAt(self, i): + return self.curvilinearVelocities[i] + + def getCurvilinearPositionAtInstant(self, i): + return self.curvilinearPositions[i-self.getFirstInstant()] + + def getCurvilinearVelocityAtInstant(self, i): + return self.curvilinearVelocities[i-self.getFirstInstant()] + def getXCoordinates(self): return self.positions.getXCoordinates()
--- a/trafficintelligence/tests/moving.txt Fri Feb 15 14:35:56 2019 -0500 +++ b/trafficintelligence/tests/moving.txt Mon Feb 18 17:23:26 2019 -0500 @@ -211,12 +211,20 @@ 10 >>> t1[3] [6.0, 0, 'b'] ->>> t2 = CurvilinearTrajectory.generate(3, 1., 10, 'a', 1.) +>>> t2 = CurvilinearTrajectory.generate(15, 1., 10, 'a', 1.) >>> t2[4] -[7.0, 1.0, 'a'] +[19.0, 1.0, 'a'] >>> t1.append(t2) >>> t1.length() 20 +>>> t1[9] +[12.0, 0, 'b'] +>>> o = MovingObject(0, TimeInterval(1,21)) +>>> o.curvilinearPositions = t1 +>>> o.interpolateCurvilinearPositions(2.3) +[4.3, 0.0, 'b'] +>>> o.interpolateCurvilinearPositions(10.7) # doctest:+ELLIPSIS +[14.09999..., 0.69999..., 'a'] >>> a = Trajectory.generate(Point(0.,0.), Point(10.,0.), 4) >>> t = Trajectory.generate(Point(0.1,-1.), Point(1.,0.), 22)