Mercurial Hosting > traffic-intelligence
diff python/utils.py @ 614:5e09583275a4
Merged Nicolas/trafficintelligence into default
author | Mohamed Gomaa <eng.m.gom3a@gmail.com> |
---|---|
date | Fri, 05 Dec 2014 12:13:53 -0500 |
parents | c5406edbcf12 |
children | 0954aaf28231 |
line wrap: on
line diff
--- a/python/utils.py Thu Apr 18 15:29:33 2013 -0400 +++ b/python/utils.py Fri Dec 05 12:13:53 2014 -0500 @@ -3,12 +3,12 @@ #from numpy import * #from pylab import * +from datetime import time, datetime +from math import sqrt __metaclass__ = type -commentChar = '#' - -delimiterChar = '%'; +datetimeFormat = "%Y-%m-%d %H:%M:%S" ######################### # Enumerations @@ -22,46 +22,29 @@ return result ######################### -# CLI utils -######################### - -def parseCLIOptions(helpMessage, options, cliArgs, optionalOptions=[]): - ''' Simple function to handle similar argument parsing - Returns the dictionary of options and their values - - * cliArgs are most likely directly sys.argv - (only the elements after the first one are considered) - - * options should be a list of strings for getopt options, - eg ['frame=','correspondences=','video='] - A value must be provided for each option, or the program quits''' - import sys, getopt - from numpy.core.fromnumeric import all - optionValues, args = getopt.getopt(cliArgs[1:], 'h', ['help']+options+optionalOptions) - optionValues = dict(optionValues) - - if '--help' in optionValues.keys() or '-h' in optionValues.keys(): - print(helpMessage+ - '\n - Compulsory options: '+' '.join([opt.replace('=','') for opt in options])+ - '\n - Non-compulsory options: '+' '.join([opt.replace('=','') for opt in optionalOptions])) - sys.exit() - - missingArgument = [('--'+opt.replace('=','') in optionValues.keys()) for opt in options] - if not all(missingArgument): - print('Missing argument') - print(optionValues) - sys.exit() - - return optionValues - -######################### # simple statistics ######################### -def confidenceInterval(mean, stdev, nSamples, percentConfidence, printLatex = False): - from math import sqrt +def sampleSize(stdev, tolerance, percentConfidence, printLatex = False): from scipy.stats.distributions import norm k = round(norm.ppf(0.5+percentConfidence/200., 0, 1)*100)/100. # 1.-(100-percentConfidence)/200. + if printLatex: + print('${0}^2\\frac{{{1}^2}}{{{2}^2}}$'.format(k, stdev, tolerance)) + return (k*stdev/tolerance)**2 + +def confidenceInterval(mean, stdev, nSamples, percentConfidence, trueStd = True, printLatex = False): + '''if trueStd, use normal distribution, otherwise, Student + + Use otherwise t.interval or norm.interval + ex: norm.interval(0.95, loc = 0., scale = 2.3/sqrt(11)) + t.interval(0.95, 10, loc=1.2, scale = 2.3/sqrt(nSamples)) + loc is mean, scale is sigma/sqrt(n) (for Student, 10 is df)''' + from math import sqrt + from scipy.stats.distributions import norm, t + if trueStd: + k = round(norm.ppf(0.5+percentConfidence/200., 0, 1)*100)/100. # 1.-(100-percentConfidence)/200. + else: # use Student + k = round(t.ppf(0.5+percentConfidence/200., nSamples-1)*100)/100. e = k*stdev/sqrt(nSamples) if printLatex: print('${0} \pm {1}\\frac{{{2}}}{{\sqrt{{{3}}}}}$'.format(mean, k, stdev, nSamples)) @@ -78,15 +61,13 @@ def nSamples(self): return sum(self.counts) -def cumulativeDensityFunction(sample): +def cumulativeDensityFunction(sample, normalized = False): '''Returns the cumulative density function of the sample of a random variable''' - from numpy.core.multiarray import array - from numpy.lib.function_base import unique - from numpy.core.fromnumeric import sum - a = array(sample) - a.sort() - xaxis = unique(a) - counts = [sum(a <= x) for x in xaxis] + from numpy import arange, cumsum + xaxis = sorted(sample) + counts = arange(1,len(sample)+1) # dtype = float + if normalized: + counts /= float(len(sample)) return xaxis, counts class EmpiricalDiscreteDistribution(EmpiricalDistribution): @@ -174,33 +155,75 @@ # maths section ######################### -def LCSS(l1, l2, threshold, distance, delta = float('inf')): - '''returns the longest common subsequence similarity - based on the threshold on distance between two elements of lists l1, l2 - ''' - from numpy import zeros, int as npint - m = len(l1) - n = len(l2) - similarity = zeros((m+1,n+1), dtype = npint) - for i in xrange(1,m+1): - for j in xrange(max(1,i-delta),min(n+1,i+delta)): - if distance(l1[i-1], l2[j-1])<=threshold: - similarity[i][j] = similarity[i-1][j-1]+1 - else: - similarity[i][j] = max(similarity[i-1][j], similarity[i][j-1]) - return similarity[-1][-1] +# def kernelSmoothing(sampleX, X, Y, weightFunc, halfwidth): +# '''Returns a smoothed weighted version of Y at the predefined values of sampleX +# Sum_x weight(sample_x,x) * y(x)''' +# from numpy import zeros, array +# smoothed = zeros(len(sampleX)) +# for i,x in enumerate(sampleX): +# weights = array([weightFunc(x,xx, halfwidth) for xx in X]) +# if sum(weights)>0: +# smoothed[i] = sum(weights*Y)/sum(weights) +# else: +# smoothed[i] = 0 +# return smoothed + +def kernelSmoothing(x, X, Y, weightFunc, halfwidth): + '''Returns the smoothed estimate of (X,Y) at x + Sum_x weight(sample_x,x) * y(x)''' + from numpy import zeros, array + weights = array([weightFunc(x,observedx, halfwidth) for observedx in X]) + if sum(weights)>0: + return sum(weights*Y)/sum(weights) + else: + return 0 + +def uniform(center, x, halfwidth): + if abs(center-x)<halfwidth: + return 1. + else: + return 0. -def framesToTime(nFrames, frameRate, initialTime = (0.,0.,0.)): - 'returns hour, minutes and seconds' +def gaussian(center, x, halfwidth): + from numpy import exp + return exp(-((center-x)/halfwidth)**2/2) + +def epanechnikov(center, x, halfwidth): + diff = abs(center-x) + if diff<halfwidth: + return 1.-(diff/halfwidth)**2 + else: + return 0. + +def triangular(center, x, halfwidth): + diff = abs(center-x) + if diff<halfwidth: + return 1.-abs(diff/halfwidth) + else: + return 0. + +def medianSmoothing(x, X, Y, halfwidth): + '''Returns the media of Y's corresponding to X's in the interval [x-halfwidth, x+halfwidth]''' + from numpy import median + return median([y for observedx, y in zip(X,Y) if abs(x-observedx)<halfwidth]) + +def argmaxDict(d): + return max(d, key=d.get) + +def framesToTime(nFrames, frameRate, initialTime = time()): + '''returns a datetime.time for the time in hour, minutes and seconds + initialTime is a datetime.time''' from math import floor - from datetime import time - seconds = int(floor(float(nFrames)/float(frameRate))+initialTime[0]*3600+initialTime[1]*60+initialTime[2]) + seconds = int(floor(float(nFrames)/float(frameRate))+initialTime.hour*3600+initialTime.minute*60+initialTime.second) h = int(floor(seconds/3600.)) seconds = seconds - h*3600 m = int(floor(seconds/60)) seconds = seconds - m*60 return time(h, m, seconds) +def timeToFrames(t, frameRate): + return frameRate*(t.hour*3600+t.minute*60+t.second) + def sortXY(X,Y): 'returns the sorted (x, Y(x)) sorted on X' D = {} @@ -217,14 +240,32 @@ return ceil(v*tens)/tens def inBetween(bound1, bound2, x): - return bound1 <= x <= bound2 or bound2 <= x<= bound1 + return bound1 <= x <= bound2 or bound2 <= x <= bound1 + +def pointDistanceL2(x1,y1,x2,y2): + ''' Compute point-to-point distance (L2 norm, ie Euclidean distance)''' + return sqrt((x2-x1)**2+(y2-y1)**2) def crossProduct(l1, l2): return l1[0]*l2[1]-l1[1]*l2[0] -def filterMovingWindow(input, halfWidth): +def cat_mvgavg(cat_list, halfWidth): + ''' Return a list of categories/values smoothed according to a window. + halfWidth is the search radius on either side''' + from copy import deepcopy + catgories = deepcopy(cat_list) + smoothed = catgories + for point in range(len(catgories)): + lower_bound_check = max(0,point-halfWidth) + upper_bound_check = min(len(catgories)-1,point+halfWidth+1) + window_values = catgories[lower_bound_check:upper_bound_check] + smoothed[point] = max(set(window_values), key=window_values.count) + return smoothed + +def filterMovingWindow(inputSignal, halfWidth): '''Returns an array obtained after the smoothing of the input by a moving average The first and last points are copied from the original.''' + from numpy import ones,convolve,array width = float(halfWidth*2+1) win = ones(width,'d') result = convolve(win/width,array(inputSignal),'same') @@ -251,20 +292,162 @@ return coef ######################### +# iterable section +######################### + +def mostCommon(L): + '''Returns the most frequent element in a iterable + + taken from http://stackoverflow.com/questions/1518522/python-most-common-element-in-a-list''' + from itertools import groupby + from operator import itemgetter + # get an iterable of (item, iterable) pairs + SL = sorted((x, i) for i, x in enumerate(L)) + # print 'SL:', SL + groups = groupby(SL, key=itemgetter(0)) + # auxiliary function to get "quality" for an item + def _auxfun(g): + item, iterable = g + count = 0 + min_index = len(L) + for _, where in iterable: + count += 1 + min_index = min(min_index, where) + # print 'item %r, count %r, minind %r' % (item, count, min_index) + return count, -min_index + # pick the highest-count/earliest item + return max(groups, key=_auxfun)[0] + +######################### +# sequence section +######################### + +class LCSS: + '''Class that keeps the LCSS parameters + and puts together the various computations''' + def __init__(self, similarityFunc, delta = float('inf'), aligned = False, lengthFunc = min): + self.similarityFunc = similarityFunc + self.aligned = aligned + self.delta = delta + self.lengthFunc = lengthFunc + self.subSequenceIndices = [(0,0)] + + def similarities(self, l1, l2, jshift=0): + from numpy import zeros, int as npint + n1 = len(l1) + n2 = len(l2) + self.similarityTable = zeros((n1+1,n2+1), dtype = npint) + for i in xrange(1,n1+1): + for j in xrange(max(1,i-jshift-self.delta),min(n2,i-jshift+self.delta)+1): + if self.similarityFunc(l1[i-1], l2[j-1]): + self.similarityTable[i,j] = self.similarityTable[i-1,j-1]+1 + else: + self.similarityTable[i,j] = max(self.similarityTable[i-1,j], self.similarityTable[i,j-1]) + + def subSequence(self, i, j): + '''Returns the subsequence of two sequences + http://en.wikipedia.org/wiki/Longest_common_subsequence_problem''' + if i == 0 or j == 0: + return [] + elif self.similarityTable[i][j] == self.similarityTable[i][j-1]: + return self.subSequence(i, j-1) + elif self.similarityTable[i][j] == self.similarityTable[i-1][j]: + return self.subSequence(i-1, j) + else: + return self.subSequence(i-1, j-1) + [(i-1,j-1)] + + def _compute(self, _l1, _l2, computeSubSequence = False): + '''returns the longest common subsequence similarity + based on the threshold on distance between two elements of lists l1, l2 + similarityFunc returns True or False whether the two points are considered similar + + if aligned, returns the best matching if using a finite delta by shiftinig the series alignments + + eg distance(p1, p2) < epsilon + ''' + if len(_l2) < len(_l1): # l1 is the shortest + l1 = _l2 + l2 = _l1 + revertIndices = True + else: + l1 = _l1 + l2 = _l2 + revertIndices = False + n1 = len(l1) + n2 = len(l2) + + if self.aligned: + lcssValues = {} + similarityTables = {} + for i in xrange(-n2-self.delta+1, n1+self.delta): # interval such that [i-shift-delta, i-shift+delta] is never empty, which happens when i-shift+delta < 1 or when i-shift-delta > n2 + self.similarities(l1, l2, i) + lcssValues[i] = self.similarityTable.max() + similarityTables[i] = self.similarityTable + #print self.similarityTable + alignmentShift = argmaxDict(lcssValues) # ideally get the medium alignment shift, the one that minimizes distance + self.similarityTable = similarityTables[alignmentShift] + else: + alignmentShift = 0 + self.similarities(l1, l2) + + # threshold values for the useful part of the similarity table are n2-n1-delta and n1-n2-delta + self.similarityTable = self.similarityTable[:min(n1, n2+alignmentShift+self.delta)+1, :min(n2, n1-alignmentShift+self.delta)+1] + + if computeSubSequence: + self.subSequenceIndices = self.subSequence(self.similarityTable.shape[0]-1, self.similarityTable.shape[1]-1) + if revertIndices: + self.subSequenceIndices = [(j,i) for i,j in self.subSequenceIndices] + return self.similarityTable[-1,-1] + + def compute(self, l1, l2, computeSubSequence = False): + '''get methods are to be shadowed in child classes ''' + return self._compute(l1, l2, computeSubSequence) + + def computeAlignment(self): + from numpy import mean + return mean([j-i for i,j in self.subSequenceIndices]) + + def _computeNormalized(self, l1, l2, computeSubSequence = False): + ''' compute the normalized LCSS + ie, the LCSS divided by the min or mean of the indicator lengths (using lengthFunc) + lengthFunc = lambda x,y:float(x,y)/2''' + return float(self._compute(l1, l2, computeSubSequence))/self.lengthFunc(len(l1), len(l2)) + + def computeNormalized(self, l1, l2, computeSubSequence = False): + return self._computeNormalized(l1, l2, computeSubSequence) + + def _computeDistance(self, l1, l2, computeSubSequence = False): + ''' compute the LCSS distance''' + return 1-self._computeNormalized(l1, l2, computeSubSequence) + + def computeDistance(self, l1, l2, computeSubSequence = False): + return self._computeDistance(l1, l2, computeSubSequence) + +######################### # plotting section ######################### -def stepPlot(X, firstX, lastX, initialCount = 0): - '''for each value in x, increment by one the initial count +def plotPolygon(poly, options = ''): + 'Plots shapely polygon poly' + from numpy.core.multiarray import array + from matplotlib.pyplot import plot + from shapely.geometry import Polygon + + tmp = array(poly.exterior) + plot(tmp[:,0], tmp[:,1], options) + +def stepPlot(X, firstX, lastX, initialCount = 0, increment = 1): + '''for each value in X, increment by increment the initial count returns the lists that can be plotted - to obtain a step plot increasing by one for each value in x, from first to last value''' + to obtain a step plot increasing by one for each value in x, from first to last value + firstX and lastX should be respectively smaller and larger than all elements in X''' sortedX = [] counts = [initialCount] for x in sorted(X): sortedX += [x,x] counts.append(counts[-1]) - counts.append(counts[-1]+1) + counts.append(counts[-1]+increment) counts.append(counts[-1]) return [firstX]+sortedX+[lastX], counts @@ -302,50 +485,6 @@ # file I/O section ######################### -def openCheck(filename, option = 'r', quit = False): - '''Open file filename in read mode by default - and checks it is open''' - try: - return open(filename, option) - except IOError: - print 'File %s could not be opened.' % filename - if quit: - from sys import exit - exit() - return None - -def readline(f, commentCharacter = commentChar): - '''Modified readline function to skip comments.''' - s = f.readline() - while (len(s) > 0) and s.startswith(commentCharacter): - s = f.readline() - return s.strip() - -def getLines(f, delimiterCharacter = delimiterChar): - '''Gets a complete entry (all the lines) in between delimiterChar.''' - dataStrings = [] - s = readline(f) - while (len(s) > 0) and (not s.startswith(delimiterCharacter)): - dataStrings += [s.strip()] - s = readline(f) - return dataStrings - -class FakeSecHead(object): - '''Add fake section header [asection] - - from http://stackoverflow.com/questions/2819696/parsing-properties-file-in-python/2819788#2819788 - use read_file in Python 3.2+ - ''' - def __init__(self, fp): - self.fp = fp - self.sechead = '[main]\n' - - def readline(self): - if self.sechead: - try: return self.sechead - finally: self.sechead = None - else: return self.fp.readline() - def removeExtension(filename, delimiter = '.'): '''Returns the filename minus the extension (all characters after last .)''' i = filename.rfind(delimiter) @@ -386,14 +525,6 @@ else: print(filename+' does not exist') -def plotPolygon(poly, options = ''): - from numpy.core.multiarray import array - from matplotlib.pyplot import plot - from shapely.geometry import Polygon - - tmp = array(poly.exterior) - plot(tmp[:,0], tmp[:,1], options) - def line2Floats(l, separator=' '): '''Returns the list of floats corresponding to the string''' return [float(x) for x in l.split(separator)] @@ -403,14 +534,62 @@ return [int(x) for x in l.split(separator)] ######################### -# sqlite +# CLI utils ######################### -def dropTables(connection, tableNames): - 'deletes the table with names in tableNames' - cursor = connection.cursor() - for tableName in tableNames: - cursor.execute('DROP TABLE '+tableName) +def parseCLIOptions(helpMessage, options, cliArgs, optionalOptions=[]): + ''' Simple function to handle similar argument parsing + Returns the dictionary of options and their values + + * cliArgs are most likely directly sys.argv + (only the elements after the first one are considered) + + * options should be a list of strings for getopt options, + eg ['frame=','correspondences=','video='] + A value must be provided for each option, or the program quits''' + import sys, getopt + from numpy.core.fromnumeric import all + optionValues, args = getopt.getopt(cliArgs[1:], 'h', ['help']+options+optionalOptions) + optionValues = dict(optionValues) + + if '--help' in optionValues.keys() or '-h' in optionValues.keys(): + print(helpMessage+ + '\n - Compulsory options: '+' '.join([opt.replace('=','') for opt in options])+ + '\n - Non-compulsory options: '+' '.join([opt.replace('=','') for opt in optionalOptions])) + sys.exit() + + missingArgument = [('--'+opt.replace('=','') in optionValues.keys()) for opt in options] + if not all(missingArgument): + print('Missing argument') + print(optionValues) + sys.exit() + + return optionValues + + +######################### +# Profiling +######################### + +def analyzeProfile(profileFilename, stripDirs = True): + '''Analyze the file produced by cProfile + + obtained by for example: + - call in script (for main() function in script) + import cProfile, os + cProfile.run('main()', os.path.join(os.getcwd(),'main.profile')) + + - or on the command line: + python -m cProfile [-o profile.bin] [-s sort] scriptfile [arg]''' + import pstats, os + p = pstats.Stats(os.path.join(os.pardir, profileFilename)) + if stripDirs: + p.strip_dirs() + p.sort_stats('time') + p.print_stats(.2) + #p.sort_stats('time') + # p.print_callees(.1, 'int_prediction.py:') + return p ######################### # running tests