Mercurial Hosting > traffic-intelligence
view python/utils.py @ 72:575340e6fce3
corrected most of the method to load NGSIM data (adapted to the current MovingObject class
author | Nicolas Saunier <nicolas.saunier@polymtl.ca> |
---|---|
date | Fri, 14 Jan 2011 19:49:20 -0500 |
parents | 75cf537b8d88 |
children | 46ec876ce90e |
line wrap: on
line source
#! /usr/bin/env python ''' Generic utilities.''' #from numpy import * #from pylab import * import moving __metaclass__ = type commentChar = '#' delimiterChar = '%'; ######################### # simple statistics ######################### def computeChi2(expected, observed): '''Returns the Chi2 statistics''' result = 0. for e, o in zip(expected, observed): result += ((e-o)*(e-o))/e return result class histogram: '''Class to represent a sample of a distribution for a continuous random variable with the number of observations for each interval''' def __init__(self, categories, counts): self.categories = categories self.counts = counts def mean(self): result = 0. for i,c in zip(self.categories, self.counts): result += c*(i[1]+i[0])/2 return result/sum(self.counts) def var(self, mean = None): if not mean: m = self.mean() else: m = mean result = 0. for i,c in zip(self.categories, self.counts): mid = (i[1]+i[0])/2 result += c*(mid - m)*(mid - m) return result/(self.nSamples()-1) def nSamples(self): return sum(self.counts) def referenceCounts(self, cdf): '''cdf is a cumulative distribution function returning the probability of the variable being less that x''' # refCumulativeCounts = [0]#[cdf(self.categories[0][0])] # for inter in self.categories: # refCumulativeCounts.append(cdf(inter[1])) refCumulativeCounts = [cdf(inter[1]) for inter in self.categories[:-1]] refProba = [refCumulativeCounts[0]] for i in xrange(1,len(refCumulativeCounts)): refProba.append(refCumulativeCounts[i]-refCumulativeCounts[i-1]) refProba.append(1-refCumulativeCounts[-1]) refCounts = [p*self.nSamples() for p in refProba] return refCounts, refProba ######################### # maths section ######################### def ceilDecimals(v, nDecimals): '''Rounds the number at the nth decimal eg 1.23 at 0 decimal is 2, at 1 decimal is 1.3''' from math import ceil,pow tens = pow(10,nDecimals) return ceil(v*tens)/tens def segmentIntersection(p1, p2, p3, p4): '''Returns the intersecting point of the segments [p1, p2] and [p3, p4], None otherwise''' from numpy import matrix from numpy.linalg import linalg, det dp1 = p2-p1#[s1[0][1]-s1[0][0], s1[1][1]-s1[1][0]] dp2 = p4-p3#[s2[0][1]-s2[0][0], s2[1][1]-s2[1][0]] A = matrix([[dp1.y, -dp1.x], [dp2.y, -dp2.x]]) B = matrix([[dp1.y*p1.x-dp1.x*p1.y], [dp2.y*p3.x-dp2.x*p3.y]]) if linalg.det(A) == 0:#crossProduct(ds1, ds2) == 0: return None else: intersection = linalg.solve(A,B) if (moving.Interval(p1.x, p2.x, True).contains(intersection[0,0]) and moving.Interval(p3.x, p4.x, True).contains(intersection[0,0]) and moving.Interval(p1.y, p2.y, True).contains(intersection[1,0]) and moving.Interval(p3.y, p4.y, True).contains(intersection[1,0])): return moving.Point(intersection[0,0], intersection[1,0]) else: return None def crossProduct(l1, l2): return l1[0]*l2[1]-l1[1]*l2[0] def filterMovingWindow(input, halfWidth): '''Returns an array obtained after the smoothing of the input by a moving average The first and last points are copied from the original.''' width = float(halfWidth*2+1) win = ones(width,'d') result = convolve(win/width,array(inputSignal),'same') result[:halfWidth] = inputSignal[:halfWidth] result[-halfWidth:] = inputSignal[-halfWidth:] return result ######################### # plotting section ######################### class PlottingPropertyValues: def __init__(self, values): self.values = values def get(self,i): return self.values[i%len(self.values)] markers = PlottingPropertyValues(['+', '*', ',', '.', 'x', 'D', 's', 'o']) linestyles = PlottingPropertyValues(['-', '--', '-.', ':']) colors = PlottingPropertyValues('brgmyck') # 'w' def plotIndicatorMap(indicatorMap, squareSize): from numpy import array, arange, ones, ma from matplotlib.pyplot import pcolor coords = array(indicatorMap.keys()) minX = min(coords[:,0]) minY = min(coords[:,1]) X = arange(minX, max(coords[:,0])+1.1)*squareSize Y = arange(minY, max(coords[:,1])+1.1)*squareSize C = -ones((len(Y), len(X))) for k,v in indicatorMap.iteritems(): C[k[1]-minY,k[0]-minX] = v masked = ma.masked_where(C<0,C) pcolor(X, Y, masked) ######################### # file I/O section ######################### def openCheck(filename, option = 'r', quit = False): '''Open file filename in read mode by default and checks it is open''' try: return open(filename, option) except IOError: print 'File %s could not be opened.' % filename if quit: from sys import exit exit() return None def readline(f, commentCharacter = commentChar): '''Modified readline function to skip comments.''' s = f.readline() while (len(s) > 0) and s.startswith(commentCharacter): s = f.readline() return s.strip() def getLines(f, delimiterCharacter = delimiterChar): '''Gets a complete entry (all the lines) in between delimiterChar.''' dataStrings = [] s = readline(f) while (len(s) > 0) and (not s.startswith(delimiterCharacter)): dataStrings += [s.strip()] s = readline(f) return dataStrings def removeExtension(filename, delimiter = '.'): '''Returns the filename minus the extension (all characters after last .)''' i = filename.rfind(delimiter) if i>0: return filename[:i] else: return filename def cleanFilename(s): 'cleans filenames obtained when contatenating figure characteristics' return s.replace(' ','-').replace('.','') def listfiles(dirname, extension, remove = False): '''Returns the list of files with the extension in the directory dirname If remove is True, the filenames are stripped from the extension''' from os import listdir tmp = [f for f in listdir(dirname) if f.endswith(extension)] tmp.sort() if remove: return [removeExtension(f, extension) for f in tmp] else: return tmp def removeFile(filename): '''Deletes the file while avoiding raising an error if the file does not exist''' if (os.path.exists(filename)): os.remove(filename) def plotPolygon(poly, options = ''): from numpy.core.multiarray import array from matplotlib.pyplot import plot from shapely.geometry import Polygon tmp = array(poly.exterior) plot(tmp[:,0], tmp[:,1], options) def line2Floats(l, separator=' '): '''Returns the list of floats corresponding to the string''' return [float(x) for x in l.split(separator)] def line2Ints(l, separator=' '): '''Returns the list of ints corresponding to the string''' return [int(x) for x in l.split(separator)] ######################### # running tests ######################### if __name__ == "__main__": import doctest import unittest suite = doctest.DocFileSuite('tests/utils.txt') #suite = doctest.DocTestSuite() unittest.TextTestRunner().run(suite) #doctest.testmod() #doctest.testfile("example.txt")