Mercurial Hosting > traffic-intelligence
view python/storage.py @ 78:99e807c29753
added loading other information from NGSIM
author | Nicolas Saunier <nicolas.saunier@polymtl.ca> |
---|---|
date | Fri, 25 Feb 2011 14:25:34 -0500 |
parents | 575340e6fce3 |
children | 436b87d4b992 |
line wrap: on
line source
#! /usr/bin/env python '''Various utilities to save and load data''' import utils import moving __metaclass__ = type ngsimUserTypes = {'twowheels':1, 'car':2, 'truck':3} def loadNgsimFile(filename, nObjects = -1, sequenceNum = -1): '''Reads data from the trajectory data provided by NGSIM project and returns the list of Feature objects''' objects = [] input = utils.openCheck(filename) if not input: import sys sys.exit() def createObject(numbers): firstFrameNum = int(numbers[1]) # do the geometry and usertype firstFrameNum = int(numbers[1]) lastFrameNum = firstFrameNum+int(numbers[2])-1 #time = moving.TimeInterval(firstFrameNum, firstFrameNum+int(numbers[2])-1) obj = moving.MovingObject(num = int(numbers[0]), timeInterval = moving.TimeInterval(firstFrameNum, lastFrameNum), positions = moving.Trajectory([[float(numbers[6])],[float(numbers[7])]]), userType = int(numbers[10])) obj.userType = int(numbers[10]) obj.laneNums = [int(numbers[13])] obj.precedingVehicles = [int(numbers[14])] # lead vehicle (before) obj.followingVehicles = [int(numbers[15])] # following vehicle (after) obj.spaceHeadways = [float(numbers[16])] # feet obj.timeHeadways = [float(numbers[17])] # seconds obj.curvilinearPositions = moving.Trajectory([[float(numbers[5])],[float(numbers[4])]]) # X is the longitudinal coordinate obj.speeds = [float(numbers[11])] obj.size = [float(numbers[8]), float(numbers[9])] # 8 lengh, 9 width # TODO: temporary, should use a geometry object return obj numbers = input.readline().strip().split() if (len(numbers) > 0): obj = createObject(numbers) for line in input: numbers = line.strip().split() if obj.num != int(numbers[0]): # check and adapt the length to deal with issues in NGSIM data if (obj.length() != obj.positions.length()): print 'length pb with object %s (%d,%d)' % (obj.num,obj.length(),obj.positions.length()) obj.last = obj.getFirstInstant()+obj.positions.length()-1 #obj.velocities = utils.computeVelocities(f.positions) # compare norm to speeds ? objects.append(obj) if (nObjects>0) and (len(objects)>=nObjects): break obj = createObject(numbers) else: obj.positions.addPositionXY(float(numbers[6]), float(numbers[7])) obj.curvilinearPositions.addPositionXY(float(numbers[5]), float(numbers[4])) obj.speeds.append(float(numbers[11])) obj.laneNums.append(int(numbers[13])) obj.precedingVehicles.append(int(numbers[14])) obj.followingVehicles.append(int(numbers[15])) obj.spaceHeadways.append(float(numbers[16])) obj.timeHeadways.append(float(numbers[17])) if (obj.size[0] != float(numbers[8])): print 'changed length obj %d' % (f.num) if (obj.size[1] != float(numbers[9])): print 'changed width obj %d' % (f.num) input.close() return objects def convertNgsimFile(inFile, outFile, append = False, nObjects = -1, sequenceNum = 0): '''Reads data from the trajectory data provided by NGSIM project and converts to our current format.''' if append: out = open(outFile,'a') else: out = open(outFile,'w') nObjectsPerType = [0,0,0] features = loadNgsimFile(inFile, sequenceNum) for f in features: nObjectsPerType[f.userType-1] += 1 f.write(out) print nObjectsPerType out.close() # if __name__ == "__main__": # import doctest # import unittest # suite = doctest.DocFileSuite('tests/ubc_utils.txt') # unittest.TextTestRunner().run(suite) # #doctest.testmod() # #doctest.testfile("example.txt")