Mercurial Hosting > traffic-intelligence
view python/utils.py @ 116:2bf5b76320c0
moved intersection plotting and added markers for scatter plots
author | Nicolas Saunier <nicolas.saunier@polymtl.ca> |
---|---|
date | Mon, 08 Aug 2011 14:47:30 -0400 |
parents | 550556378466 |
children | 74b1fc68d4df |
line wrap: on
line source
#! /usr/bin/env python ''' Generic utilities.''' #from numpy import * #from pylab import * import moving __metaclass__ = type commentChar = '#' delimiterChar = '%'; ######################### # simple statistics ######################### def computeChi2(expected, observed): '''Returns the Chi2 statistics''' result = 0. for e, o in zip(expected, observed): result += ((e-o)*(e-o))/e return result class EmpiricalDistribution: def nSamples(self): return sum(self.counts) class EmpiricalDiscreteDistribution(EmpiricalDistribution): '''Class to represent a sample of a distribution for a discrete random variable ''' from numpy.core.fromnumeric import sum def __init__(self, categories, counts): self.categories = categories self.counts = counts def mean(self): result = [float(x*y) for x,y in zip(self.categories, self.counts)] return sum(result)/self.nSamples() def var(self, mean = None): if not mean: m = self.mean() else: m = mean result = 0. squares = [float((x-m)*(x-m)*y) for x,y in zip(self.categories, self.counts)] return sum(squares)/(self.nSamples()-1) def referenceCounts(self, probability): '''probability is a function that returns the probability of the random variable for the category values''' refProba = [probability(c) for c in self.categories] refProba[-1] = 1-sum(refProba[:-1]) refCounts = [r*self.nSamples() for r in refProba] return refCounts, refProba class EmpiricalContinuousDistribution(EmpiricalDistribution): '''Class to represent a sample of a distribution for a continuous random variable with the number of observations for each interval intervals (categories variable) are defined by their left limits, the last one being the right limit categories contain therefore one more element than the counts''' def __init__(self, categories, counts): self.categories = categories self.counts = counts def mean(self): result = 0. for i in range(len(self.counts)-1): result += self.counts[i]*(self.categories[i]+self.categories[i+1])/2 return result/self.nSamples() def var(self, mean = None): if not mean: m = self.mean() else: m = mean result = 0. for i in range(len(self.counts)-1): mid = (self.categories[i]+self.categories[i+1])/2 result += self.counts[i]*(mid - m)*(mid - m) return result/(self.nSamples()-1) def referenceCounts(self, cdf): '''cdf is a cumulative distribution function returning the probability of the variable being less that x''' # refCumulativeCounts = [0]#[cdf(self.categories[0][0])] # for inter in self.categories: # refCumulativeCounts.append(cdf(inter[1])) refCumulativeCounts = [cdf(x) for x in self.categories[1:-1]] refProba = [refCumulativeCounts[0]] for i in xrange(1,len(refCumulativeCounts)): refProba.append(refCumulativeCounts[i]-refCumulativeCounts[i-1]) refProba.append(1-refCumulativeCounts[-1]) refCounts = [p*self.nSamples() for p in refProba] return refCounts, refProba def printReferenceCounts(self, refCounts=None): if refCounts: ref = refCounts else: ref = self.referenceCounts for i in xrange(len(ref[0])): print('{0}-{1} & {2:0.3} & {3:0.3} \\\\'.format(self.categories[i],self.categories[i+1],ref[1][i], ref[0][i])) ######################### # maths section ######################### def ceilDecimals(v, nDecimals): '''Rounds the number at the nth decimal eg 1.23 at 0 decimal is 2, at 1 decimal is 1.3''' from math import ceil,pow tens = pow(10,nDecimals) return ceil(v*tens)/tens def segmentIntersection(p1, p2, p3, p4): '''Returns the intersecting point of the segments [p1, p2] and [p3, p4], None otherwise''' from numpy import matrix from numpy.linalg import linalg, det dp1 = p2-p1#[s1[0][1]-s1[0][0], s1[1][1]-s1[1][0]] dp2 = p4-p3#[s2[0][1]-s2[0][0], s2[1][1]-s2[1][0]] A = matrix([[dp1.y, -dp1.x], [dp2.y, -dp2.x]]) B = matrix([[dp1.y*p1.x-dp1.x*p1.y], [dp2.y*p3.x-dp2.x*p3.y]]) if linalg.det(A) == 0:#crossProduct(ds1, ds2) == 0: return None else: intersection = linalg.solve(A,B) if (moving.Interval(p1.x, p2.x, True).contains(intersection[0,0]) and moving.Interval(p3.x, p4.x, True).contains(intersection[0,0]) and moving.Interval(p1.y, p2.y, True).contains(intersection[1,0]) and moving.Interval(p3.y, p4.y, True).contains(intersection[1,0])): return moving.Point(intersection[0,0], intersection[1,0]) else: return None def crossProduct(l1, l2): return l1[0]*l2[1]-l1[1]*l2[0] def filterMovingWindow(input, halfWidth): '''Returns an array obtained after the smoothing of the input by a moving average The first and last points are copied from the original.''' width = float(halfWidth*2+1) win = ones(width,'d') result = convolve(win/width,array(inputSignal),'same') result[:halfWidth] = inputSignal[:halfWidth] result[-halfWidth:] = inputSignal[-halfWidth:] return result ######################### # plotting section ######################### class PlottingPropertyValues: def __init__(self, values): self.values = values def __getitem__(self, i): return self.values[i%len(self.values)] markers = PlottingPropertyValues(['+', '*', ',', '.', 'x', 'D', 's', 'o']) scatterMarkers = PlottingPropertyValues(['s','o','^','>','v','<','d','p','h','8','+','x']) linestyles = PlottingPropertyValues(['-', '--', '-.', ':']) colors = PlottingPropertyValues('brgmyck') # 'w' def plotIndicatorMap(indicatorMap, squareSize, masked = True, defaultValue=-1): from numpy import array, arange, ones, ma from matplotlib.pyplot import pcolor coords = array(indicatorMap.keys()) minX = min(coords[:,0]) minY = min(coords[:,1]) X = arange(minX, max(coords[:,0])+1.1)*squareSize Y = arange(minY, max(coords[:,1])+1.1)*squareSize C = defaultValue*ones((len(Y), len(X))) for k,v in indicatorMap.iteritems(): C[k[1]-minY,k[0]-minX] = v if masked: pcolor(X, Y, ma.masked_where(C==defaultValue,C)) else: pcolor(X, Y, C) ######################### # file I/O section ######################### def openCheck(filename, option = 'r', quit = False): '''Open file filename in read mode by default and checks it is open''' try: return open(filename, option) except IOError: print 'File %s could not be opened.' % filename if quit: from sys import exit exit() return None def readline(f, commentCharacter = commentChar): '''Modified readline function to skip comments.''' s = f.readline() while (len(s) > 0) and s.startswith(commentCharacter): s = f.readline() return s.strip() def getLines(f, delimiterCharacter = delimiterChar): '''Gets a complete entry (all the lines) in between delimiterChar.''' dataStrings = [] s = readline(f) while (len(s) > 0) and (not s.startswith(delimiterCharacter)): dataStrings += [s.strip()] s = readline(f) return dataStrings def removeExtension(filename, delimiter = '.'): '''Returns the filename minus the extension (all characters after last .)''' i = filename.rfind(delimiter) if i>0: return filename[:i] else: return filename def cleanFilename(s): 'cleans filenames obtained when contatenating figure characteristics' return s.replace(' ','-').replace('.','') def listfiles(dirname, extension, remove = False): '''Returns the list of files with the extension in the directory dirname If remove is True, the filenames are stripped from the extension''' from os import listdir tmp = [f for f in listdir(dirname) if f.endswith(extension)] tmp.sort() if remove: return [removeExtension(f, extension) for f in tmp] else: return tmp def removeFile(filename): '''Deletes the file while avoiding raising an error if the file does not exist''' if (os.path.exists(filename)): os.remove(filename) def plotPolygon(poly, options = ''): from numpy.core.multiarray import array from matplotlib.pyplot import plot from shapely.geometry import Polygon tmp = array(poly.exterior) plot(tmp[:,0], tmp[:,1], options) def line2Floats(l, separator=' '): '''Returns the list of floats corresponding to the string''' return [float(x) for x in l.split(separator)] def line2Ints(l, separator=' '): '''Returns the list of ints corresponding to the string''' return [int(x) for x in l.split(separator)] ######################### # running tests ######################### if __name__ == "__main__": import doctest import unittest suite = doctest.DocFileSuite('tests/utils.txt') #suite = doctest.DocTestSuite() unittest.TextTestRunner().run(suite) #doctest.testmod() #doctest.testfile("example.txt")