Mercurial Hosting > traffic-intelligence
view python/storage.py @ 203:e2f31813ade6
added code to display trajectories on videa
author | Nicolas Saunier <nicolas.saunier@polymtl.ca> |
---|---|
date | Tue, 06 Mar 2012 18:10:19 -0500 |
parents | 319a04ba9033 |
children | 966c2cd2bd9f |
line wrap: on
line source
#! /usr/bin/env python '''Various utilities to save and load data''' import utils import moving __metaclass__ = type ngsimUserTypes = {'twowheels':1, 'car':2, 'truck':3} def loadTrajectoriesFromSqlite(filename, objectNumbers = -1): '''Loads nObjects or the indices in objectNumbers from the database TODO: load velocities''' import sqlite3 connection = sqlite3.connect(filename) # add test if it open cursor = connection.cursor() if type(objectNumbers) == int: if objectNumbers == -1: cursor.execute('SELECT * from positions order by trajectory_id, frame_number') else: cursor.execute('SELECT * from positions where trajectory_id between 0 and {0} order by trajectory_id, frame_number'.format(objectNumbers)) elif type(objectNumbers) == list: cursor.execute('SELECT * from positions where trajectory_id in (' +', '.join([str(n) for n in objectNumbers]) +') order by trajectory_id, frame_number') objId = -1 obj = None objects = [] for row in cursor: if row[0] != objId: objId = row[0] if obj: objects.append(obj) obj = moving.MovingObject(row[0], timeInterval = moving.TimeInterval(row[1], row[1]), positions = moving.Trajectory([[row[2]],[row[3]]])) else: obj.timeInterval.last = row[1] obj.positions.addPositionXY(row[2],row[3]) if obj: objects.append(obj) connection.close() return objects def loadObjectsFromSqlite(filename, objectNumbers = -1): '''Loads objects as averages of feature trajectories TODO: load features as well, other ways of averaging trajectories need to provide table name(s) ?''' # elect frame_number, avg(x_coordinate), avg(y_coordinate) from positions where trajectory_id in (select trajectory_id from objects_features where object_id=12) group by frame_number; def loadTrajectoriesFromNgsimFile(filename, nObjects = -1, sequenceNum = -1): '''Reads data from the trajectory data provided by NGSIM project and returns the list of Feature objects''' objects = [] input = utils.openCheck(filename) if not input: import sys sys.exit() def createObject(numbers): firstFrameNum = int(numbers[1]) # do the geometry and usertype firstFrameNum = int(numbers[1]) lastFrameNum = firstFrameNum+int(numbers[2])-1 #time = moving.TimeInterval(firstFrameNum, firstFrameNum+int(numbers[2])-1) obj = moving.MovingObject(num = int(numbers[0]), timeInterval = moving.TimeInterval(firstFrameNum, lastFrameNum), positions = moving.Trajectory([[float(numbers[6])],[float(numbers[7])]]), userType = int(numbers[10])) obj.userType = int(numbers[10]) obj.laneNums = [int(numbers[13])] obj.precedingVehicles = [int(numbers[14])] # lead vehicle (before) obj.followingVehicles = [int(numbers[15])] # following vehicle (after) obj.spaceHeadways = [float(numbers[16])] # feet obj.timeHeadways = [float(numbers[17])] # seconds obj.curvilinearPositions = moving.Trajectory([[float(numbers[5])],[float(numbers[4])]]) # X is the longitudinal coordinate obj.speeds = [float(numbers[11])] obj.size = [float(numbers[8]), float(numbers[9])] # 8 lengh, 9 width # TODO: temporary, should use a geometry object return obj numbers = input.readline().strip().split() if (len(numbers) > 0): obj = createObject(numbers) for line in input: numbers = line.strip().split() if obj.num != int(numbers[0]): # check and adapt the length to deal with issues in NGSIM data if (obj.length() != obj.positions.length()): print 'length pb with object %s (%d,%d)' % (obj.num,obj.length(),obj.positions.length()) obj.last = obj.getFirstInstant()+obj.positions.length()-1 #obj.velocities = utils.computeVelocities(f.positions) # compare norm to speeds ? objects.append(obj) if (nObjects>0) and (len(objects)>=nObjects): break obj = createObject(numbers) else: obj.positions.addPositionXY(float(numbers[6]), float(numbers[7])) obj.curvilinearPositions.addPositionXY(float(numbers[5]), float(numbers[4])) obj.speeds.append(float(numbers[11])) obj.laneNums.append(int(numbers[13])) obj.precedingVehicles.append(int(numbers[14])) obj.followingVehicles.append(int(numbers[15])) obj.spaceHeadways.append(float(numbers[16])) obj.timeHeadways.append(float(numbers[17])) if (obj.size[0] != float(numbers[8])): print 'changed length obj %d' % (f.num) if (obj.size[1] != float(numbers[9])): print 'changed width obj %d' % (f.num) input.close() return objects def convertNgsimFile(inFile, outFile, append = False, nObjects = -1, sequenceNum = 0): '''Reads data from the trajectory data provided by NGSIM project and converts to our current format.''' if append: out = open(outFile,'a') else: out = open(outFile,'w') nObjectsPerType = [0,0,0] features = loadNgsimFile(inFile, sequenceNum) for f in features: nObjectsPerType[f.userType-1] += 1 f.write(out) print nObjectsPerType out.close() # if __name__ == "__main__": # import doctest # import unittest # suite = doctest.DocFileSuite('tests/ubc_utils.txt') # unittest.TextTestRunner().run(suite) # #doctest.testmod() # #doctest.testfile("example.txt")