Mercurial Hosting > traffic-intelligence
view python/storage.py @ 0:aed8eb63cdde
initial commit with non-functional python code for NGSIM
author | Nicolas Saunier <nico@confins.net> |
---|---|
date | Sun, 18 Oct 2009 22:29:24 -0400 |
parents | |
children | ffddccfab7f9 |
line wrap: on
line source
#! /usr/bin/env python '''Various utilities to save and load data''' import utils; __metaclass__ = type def loadNgsimFile(filename, nObjects = -1, sequenceNum = -1): '''Reads data from the trajectory data provided by NGSIM project and returns the list of Feature objects''' features = [] input = utils.open(filename) if not input: import sys sys.exit() def createFeature(numbers): firstFrameNum = int(numbers[1]) lastFrameNum = firstFrameNum+int(numbers[2])-1 f = Feature(sequenceNum, firstFrameNum, lastFrameNum, int(numbers[0])) f.sizeLength = float(numbers[8]) f.sizeWidth = float(numbers[9]) f.userType = int(numbers[10]) f.positions = [[float(numbers[6])],[float(numbers[7])]] f.speeds = [float(numbers[11])] return f numbers = input.readline().strip().split() if (len(numbers) > 0): f = createFeature(numbers) for line in input: numbers = line.strip().split() if f.num != int(numbers[0]): # check and adapt the length to deal with issues in NGSIM data objLength = f.length() if (objLength != len(f.positions[0])): print 'length pb with object %s (%d,%d)' % (f.num,objLength,len(f.positions[0])) f.lastFrameNum = f.getFirstFrameNum()+len(f.positions[0])-1 f.velocities = utils.computeVelocities(f.positions) # compare norm to speeds ? features.append(f) if (nObjects>0) and (len(features)>=nObjects): break f = createFeature(numbers) else: f.positions[0] += [float(numbers[6])] f.positions[1] += [float(numbers[7])] f.speeds += [float(numbers[11])] # if (f.sizeWidth != float(numbers[9])): # print 'changed width obj %d' % (f.num) # if (f.sizeLength != float(numbers[8])): # print 'changed length obj %d' % (f.num) input.close() return features def convertNgsimFile(inFile, outFile, append = False, nObjects = -1, sequenceNum = 0): '''Reads data from the trajectory data provided by NGSIM project and converts to our current format.''' if append: out = open(outFile,'a') else: out = open(outFile,'w') nObjectsPerType = [0,0,0] features = loadNgsimFile(inFile, sequenceNum) for f in features: nObjectsPerType[f.userType-1] += 1 f.write(out) print nObjectsPerType out.close() # other functions def getLines(f): '''Gets a complete entry (all the lines) in between delimiterChar.''' dataStrings = [] s = utils.myreadline(f) while (len(s) > 0) and (not s.startswith(delimiterChar)): dataStrings += [s.strip()] s = utils.myreadline(f) return dataStrings if __name__ == "__main__": import doctest import unittest suite = doctest.DocFileSuite('tests/ubc_utils.txt') unittest.TextTestRunner().run(suite) #doctest.testmod() #doctest.testfile("example.txt")