Mercurial Hosting > traffic-intelligence
comparison trafficintelligence/moving.py @ 1098:469e36eea158
work in progress
author | Nicolas Saunier <nicolas.saunier@polymtl.ca> |
---|---|
date | Tue, 19 Feb 2019 17:22:48 -0500 |
parents | b3f8b26ee838 |
children | 4ab5c63c13a3 |
comparison
equal
deleted
inserted
replaced
1097:b3f8b26ee838 | 1098:469e36eea158 |
---|---|
1121 | 1121 |
1122 def getYCoordAt(self, i): | 1122 def getYCoordAt(self, i): |
1123 return self.positions[1][i] | 1123 return self.positions[1][i] |
1124 | 1124 |
1125 def getLaneAt(self, i): | 1125 def getLaneAt(self, i): |
1126 return self.positions[2][i] | 1126 return self.lanes[i] |
1127 | 1127 |
1128 def addPositionSYL(self, s, y, lane = None): | 1128 def addPositionSYL(self, s, y, lane = None): |
1129 self.addPositionXY(s,y) | 1129 self.addPositionXY(s,y) |
1130 self.lanes.append(lane) | 1130 self.lanes.append(lane) |
1131 | 1131 |
1141 def differentiate(self, doubleLastPosition = False): | 1141 def differentiate(self, doubleLastPosition = False): |
1142 diff = CurvilinearTrajectory() | 1142 diff = CurvilinearTrajectory() |
1143 p1 = self[0] | 1143 p1 = self[0] |
1144 for i in range(1, self.length()): | 1144 for i in range(1, self.length()): |
1145 p2 = self[i] | 1145 p2 = self[i] |
1146 diff.addPositionSYL(p2[0]-p1[0], p2[1]-p1[1]) | 1146 if p2[2] == p1[2]: |
1147 laneChange = None | |
1148 else: | |
1149 laneChange = (p1[2], p2[2]) | |
1150 diff.addPositionSYL(p2[0]-p1[0], p2[1]-p1[1], laneChange) | |
1147 p1=p2 | 1151 p1=p2 |
1148 if doubleLastPosition and self.length() > 1: | 1152 if doubleLastPosition and self.length() > 1: |
1149 diff.addPosition(diff[-1]) | 1153 diff.addPosition(diff[-1]) |
1150 return diff | 1154 return diff |
1151 | 1155 |
1189 | 1193 |
1190 def __init__(self, num = None, timeInterval = None, positions = None, velocities = None, geometry = None, userType = userType2Num['unknown'], nObjects = None, initCurvilinear = False): | 1194 def __init__(self, num = None, timeInterval = None, positions = None, velocities = None, geometry = None, userType = userType2Num['unknown'], nObjects = None, initCurvilinear = False): |
1191 super(MovingObject, self).__init__(num, timeInterval) | 1195 super(MovingObject, self).__init__(num, timeInterval) |
1192 if initCurvilinear: | 1196 if initCurvilinear: |
1193 self.curvilinearPositions = positions | 1197 self.curvilinearPositions = positions |
1194 self.curvilinearVelocities = velocities | 1198 self.curvilinearVelocities = velocities # third component is (previousAlignmentIdx, newAlignmentIdx) or None if no change |
1195 else: | 1199 else: |
1196 self.positions = positions | 1200 self.positions = positions |
1197 self.velocities = velocities | 1201 self.velocities = velocities |
1198 self.geometry = geometry | 1202 self.geometry = geometry |
1199 self.userType = userType | 1203 self.userType = userType |
1251 return MovingObject(num = num, timeInterval = timeInterval, positions = positions, velocities = Trajectory([[v.x]*nPoints, [v.y]*nPoints])) | 1255 return MovingObject(num = num, timeInterval = timeInterval, positions = positions, velocities = Trajectory([[v.x]*nPoints, [v.y]*nPoints])) |
1252 | 1256 |
1253 def updatePositions(self): | 1257 def updatePositions(self): |
1254 inter, self.positions, self.velocities = MovingObject.aggregateTrajectories(self.features, self.getTimeInterval()) | 1258 inter, self.positions, self.velocities = MovingObject.aggregateTrajectories(self.features, self.getTimeInterval()) |
1255 | 1259 |
1256 def updateCurvilinearPositions(self, method, changeOfAlignment, nextAlignment_idx, timeStep = None, instant = None, previousAlignmentId = None, maxSpeed = None, acceleration = None): | 1260 def updateCurvilinearPositions(self, method, instant, timeStep, _nextAlignmentIdx = None, maxSpeed = None, acceleration = None): |
1261 '''Update curvilinear position of user at new instant''' | |
1257 | 1262 |
1258 if method == 'newell': | 1263 if method == 'newell': |
1259 if self.curvilinearPositions is None: # vehicle without positions, all vehicles should have leader (?) | 1264 if self.curvilinearPositions is None: # vehicle without positions, all vehicles should have leader (?) |
1260 if self.leader.curvilinearPositions is not None and self.leader.curvilinearPositions.getSCoordAt(-1) > self.dn and len(self.leader.curvilinearPositions) >=2: | 1265 if self.leader is None: |
1261 leaderSpeed = self.leader.curvilinearVelocities.getSCoordAt(-1) | 1266 s = (timeStep*instant-self.initialHeadway)*self.desiredSpeed |
1262 instantAtX0 = self.tau + instant*timeStep - (self.leader.curvilinearPositions.getSCoordAt(-1)-self.d)/leaderSpeed | 1267 self.timeInterval = TimeInterval(instant, instant) |
1263 if instantAtX0 < obj.initialHeadway: #obj appears at instant initialHeadway at x=0 with desiredSpeed | 1268 self.curvilinearPositions = CurvilinearTrajectory([s], [0.], [self.initialAlignmentIdx]) |
1264 instantAtX0 = obj.initialHeadway | 1269 self.curvilinearVelocities = CurvilinearTrajectory() |
1265 | 1270 self.instantAtX0 = self.initialHeadway |
1266 firstInstant = int(np.ceil(instantAtX0/timeStep)) | 1271 elif self.leader.curvilinearPositions is not None and self.leader.curvilinearPositions.getSCoordAt(-1) > self.d and len(self.leader.curvilinearPositions) >=2: |
1272 firstInstantAfterD = self.leader.getLastInstant() | |
1273 while self.leader.existsAtInstant(firstInstantAfterD) and self.leader.getCurvilinearPositionAtInstant(firstInstantAfterD-1)[0] > self.d:# find first instant after d | |
1274 firstInstantAfterD -= 1 # if not recorded position before self.d, we extrapolate linearly from first known position | |
1275 leaderSpeed = self.leader.getCurvilinearVelocityAtInstant(firstInstantAfterD)[0] | |
1276 self.instantAtX0 = self.tau + instant*timeStep - (self.leader.getCurvilinearPositionAtInstant(firstInstantAfterD)[0]-self.d)/leaderSpeed # second part is the time at which leader is at self.d | |
1277 if self.instantAtX0 < self.initialHeadway: #obj appears at instant initialHeadway at x=0 with desiredSpeed | |
1278 self.instantAtX0 = self.initialHeadway | |
1279 if self.instantAtX0 is not None and instant >= self.instantAtX0: | |
1280 firstInstant = int(ceil(self.instantAtX0/timeStep)) | |
1267 self.timeInterval = TimeInterval(firstInstant, firstInstant) | 1281 self.timeInterval = TimeInterval(firstInstant, firstInstant) |
1268 freeFlowCoord = (firstInstant*timeStep - instantAtX0)*self.desiredSpeed | 1282 freeFlowCoord = (firstInstant*timeStep - self.instantAtX0)*self.desiredSpeed |
1269 # constrainedCoord at firstInstant = xn-1(t = firstInstant*timeStep-self.tn)-self.dn | 1283 # constrainedCoord at firstInstant = xn-1(t = firstInstant*timeStep-self.tau)-self.d |
1270 t = firstInstant*timeStep-self.tn | 1284 t = firstInstant*timeStep-self.tau |
1271 i = int(floor(t/timeStep)) | 1285 i = int(floor(t/timeStep)) |
1272 leaderSpeed = self.leader.getCurvilinearVelocityAtInstant(i)[0] | 1286 leaderSpeed = self.leader.getCurvilinearVelocityAtInstant(i)[0] |
1273 constrainedCoord = self.leader.getCurvilinearPositionAtInstant(i)[0]+leaderSpeed*(t-i*timeStep)-self.dn | 1287 constrainedCoord = self.leader.getCurvilinearPositionAtInstant(i)[0]+leaderSpeed*(t-i*timeStep)-self.d |
1274 obj.curvilinearPositions = moving.CurvilinearTrajectory([min(freeFlowCoord, constrainedCoord)], [0.], [nextAlignment_idx])# TODO verify initial alignment index | 1288 self.curvilinearPositions = CurvilinearTrajectory([min(freeFlowCoord, constrainedCoord)], [0.], [self.initialAlignmentIdx]) |
1275 obj.curvilinearVelocities = moving.CurvilinearTrajectory() | 1289 self.curvilinearVelocities = CurvilinearTrajectory() |
1290 print(firstInstant+1, instant+1) | |
1276 for i in range(firstInstant+1, instant+1): | 1291 for i in range(firstInstant+1, instant+1): |
1277 freeFlowCoord = timeStep*self.desiredSpeed + self.curvilinearPositions.getSCoordAt(-1) | 1292 s1 = self.curvilinearPositions.getSCoordAt(-1) |
1278 #constrainedCoord = | 1293 freeFlowCoord = s1 + timeStep*self.desiredSpeed |
1279 | 1294 constrainedCoord = self.leader.interpolateCurvilinearPositions(i-self.tau/timeStep)[0]-self.d |
1280 # if instant <= self.timeInterval[0] < instant + timeStep: | 1295 s2 = min(freeFlowCoord, constrainedCoord) |
1281 # # #si t < instant de creation du vehicule, la position vaut l'espacement dn entre les deux vehicules | 1296 self.curvilinearPositions.addPositionSYL(s2, 0., self.initialAlignmentIdx) |
1282 # if leaderVehicle is None: | 1297 self.setLastInstant(i) |
1283 # self.curvilinearPositions.addPositionSYL( | 1298 print(i, self.timeInterval) |
1284 # -self.dn, | 1299 self.curvilinearVelocities.addPositionSYL(s2-s1, 0., None) |
1285 # 0, | |
1286 # nextAlignment_idx) | |
1287 # else: | |
1288 # self.curvilinearPositions.addPositionSYL( | |
1289 # leaderVehicle.curvilinearPositions[0][0] - self.dn, | |
1290 # 0, | |
1291 # nextAlignment_idx) | |
1292 else: | 1300 else: |
1293 freeFlowCoord = [self.curvilinearPositions[-1][0]+self.desiredSpeed*timeStep, 0, nextAlignment_idx] | 1301 if _nextAlignmentIdx is not None: |
1302 laneChange = (self.curvilinearPositions.getLaneAt(-1), _nextAlignmentIdx) | |
1303 nextAlignmentIdx = _nextAlignmentIdx | |
1304 else: | |
1305 laneChange = None | |
1306 nextAlignmentIdx = self.curvilinearPositions.getLaneAt(-1) | |
1307 s1 = self.curvilinearPositions.getSCoordAt(-1) | |
1308 freeFlowCoord = s1 + self.desiredSpeed*timeStep | |
1294 if self.leader is None: | 1309 if self.leader is None: |
1295 self.curvilinearPositions.addPositionSYL(*freeFlowCoord) | 1310 if self.getLastInstant() < instant: |
1311 s2 = freeFlowCoord | |
1312 self.curvilinearPositions.addPositionSYL(freeFlowCoord, 0., nextAlignmentIdx) | |
1296 else: | 1313 else: |
1297 # if leader coord unknown at t-dn, no movement | 1314 constrainedCoord = self.leader.interpolateCurvilinearPositions(instant-self.tau/timeStep)[0]-self.d |
1298 if instant > self.reactionTime: | 1315 s2 = min(freeFlowCoord, constrainedCoord) |
1299 previousVehicleCurvilinearPositionAtPrecedentInstant = leaderVehicle.curvilinearPositions[-int(round(self.reactionTime))][0] # t-v.reactionTime | 1316 self.curvilinearPositions.addPositionSYL(s2, 0., nextAlignmentIdx) |
1300 else: | 1317 self.setLastInstant(instant) |
1301 previousVehicleCurvilinearPositionAtPrecedentInstant = \ | 1318 self.curvilinearVelocities.addPositionSYL(s2-s1, 0., laneChange) |
1302 leaderVehicle.curvilinearPositions[0][0] | |
1303 | |
1304 self.curvilinearPositions.addPositionSYL(previousVehicleCurvilinearPositionAtPrecedentInstant - self.dn, 0, nextAlignment_idx) | |
1305 | |
1306 #mise ajour des vitesses | |
1307 if changeOfAlignment: | |
1308 self.curvilinearVelocities.addPositionSYL((self.curvilinearPositions[-1][0]-self.curvilinearPositions[-2][0])/timeStep, | |
1309 (self.curvilinearPositions[-1][1]-self.curvilinearPositions[-2][1])/timeStep, | |
1310 (previousAlignmentId, nextAlignment_idx)) | |
1311 else: | |
1312 self.curvilinearVelocities.addPositionSYL((self.curvilinearPositions[-1][0]-self.curvilinearPositions[-2][0])/timeStep, | |
1313 (self.curvilinearPositions[-1][1]-self.curvilinearPositions[-2][1])/timeStep, | |
1314 None) | |
1315 | 1319 |
1316 @staticmethod | 1320 @staticmethod |
1317 def concatenate(obj1, obj2, num = None, newFeatureNum = None, computePositions = False): | 1321 def concatenate(obj1, obj2, num = None, newFeatureNum = None, computePositions = False): |
1318 '''Concatenates two objects, whether overlapping temporally or not | 1322 '''Concatenates two objects, whether overlapping temporally or not |
1319 | 1323 |