view python/utils.py @ 372:349eb1e09f45

Cleaned the methods/functions indicating if a point is in a polygon In general, shapely should be used, especially for lots of points: from shapely.geometry import Polygon, Point poly = Polygon(array([[0,0],[0,1],[1,1],[1,0]])) p = Point(0.5,0.5) poly.contains(p) -> returns True poly.contains(Point(-1,-1)) -> returns False You can convert a moving.Point to a shapely point: p = moving.Point(1,2) p.asShapely() returns the equivalent shapely point If you have several points to test, use moving.pointsInPolygon(points, polygon) where points are moving.Point and polygon is a shapely polygon.
author Nicolas Saunier <nicolas.saunier@polymtl.ca>
date Tue, 16 Jul 2013 17:00:17 -0400
parents 924e38c9f70e
children d0b86ed50f32
line wrap: on
line source

#! /usr/bin/env python
''' Generic utilities.'''

#from numpy import *
#from pylab import *

__metaclass__ = type

commentChar = '#'

delimiterChar = '%';

#########################
# Enumerations
#########################

def inverseEnumeration(l):
    'Returns the dictionary that provides for each element in the input list its index in the input list'
    result = {}
    for i,x in enumerate(l):
        result[x] = i
    return result

#########################
# simple statistics
#########################

def confidenceInterval(mean, stdev, nSamples, percentConfidence, printLatex = False):
    from math import sqrt
    from scipy.stats.distributions import norm
    k = round(norm.ppf(0.5+percentConfidence/200., 0, 1)*100)/100. # 1.-(100-percentConfidence)/200.
    e = k*stdev/sqrt(nSamples)
    if printLatex:
        print('${0} \pm {1}\\frac{{{2}}}{{\sqrt{{{3}}}}}$'.format(mean, k, stdev, nSamples))
    return mean-e, mean+e

def computeChi2(expected, observed):
    '''Returns the Chi2 statistics'''
    result = 0.
    for e, o in zip(expected, observed):
        result += ((e-o)*(e-o))/e
    return result

class EmpiricalDistribution:
    def nSamples(self):
        return sum(self.counts)

def cumulativeDensityFunction(sample):
    '''Returns the cumulative density function of the sample of a random variable'''
    from numpy.core.multiarray import array
    from numpy.lib.function_base import unique
    from numpy.core.fromnumeric import sum
    a = array(sample)
    a.sort()
    xaxis = unique(a)
    counts = [sum(a <= x) for x in xaxis]
    return xaxis, counts

class EmpiricalDiscreteDistribution(EmpiricalDistribution):
    '''Class to represent a sample of a distribution for a discrete random variable
    '''
    from numpy.core.fromnumeric import sum

    def __init__(self, categories, counts):
        self.categories = categories
        self.counts = counts

    def mean(self):
        result = [float(x*y) for x,y in zip(self.categories, self.counts)]
        return sum(result)/self.nSamples()

    def var(self, mean = None):
        if not mean:
            m = self.mean()
        else:
            m = mean
        result = 0.
        squares = [float((x-m)*(x-m)*y) for x,y in zip(self.categories, self.counts)]
        return sum(squares)/(self.nSamples()-1)

    def referenceCounts(self, probability):
        '''probability is a function that returns the probability of the random variable for the category values'''
        refProba = [probability(c) for c in self.categories]
        refProba[-1] = 1-sum(refProba[:-1])
        refCounts = [r*self.nSamples() for r in refProba]
        return refCounts, refProba

class EmpiricalContinuousDistribution(EmpiricalDistribution):
    '''Class to represent a sample of a distribution for a continuous random variable
    with the number of observations for each interval
    intervals (categories variable) are defined by their left limits, the last one being the right limit
    categories contain therefore one more element than the counts'''
    def __init__(self, categories, counts):
        # todo add samples for initialization and everything to None? (or setSamples?)
        self.categories = categories
        self.counts = counts

    def mean(self):
        result = 0.
        for i in range(len(self.counts)-1):
            result += self.counts[i]*(self.categories[i]+self.categories[i+1])/2
        return result/self.nSamples()

    def var(self, mean = None):
        if not mean:
            m = self.mean()
        else:
            m = mean
        result = 0.
        for i in range(len(self.counts)-1):
            mid = (self.categories[i]+self.categories[i+1])/2
            result += self.counts[i]*(mid - m)*(mid - m)
        return result/(self.nSamples()-1)

    def referenceCounts(self, cdf):
        '''cdf is a cumulative distribution function
        returning the probability of the variable being less that x'''
        # refCumulativeCounts = [0]#[cdf(self.categories[0][0])]
#         for inter in self.categories:
#             refCumulativeCounts.append(cdf(inter[1]))
        refCumulativeCounts = [cdf(x) for x in self.categories[1:-1]]

        refProba = [refCumulativeCounts[0]]
        for i in xrange(1,len(refCumulativeCounts)):
            refProba.append(refCumulativeCounts[i]-refCumulativeCounts[i-1])
        refProba.append(1-refCumulativeCounts[-1])
        refCounts = [p*self.nSamples() for p in refProba]
        
        return refCounts, refProba

    def printReferenceCounts(self, refCounts=None):
        if refCounts:
            ref = refCounts
        else:
            ref = self.referenceCounts
        for i in xrange(len(ref[0])):
            print('{0}-{1} & {2:0.3} & {3:0.3} \\\\'.format(self.categories[i],self.categories[i+1],ref[1][i], ref[0][i]))


#########################
# maths section
#########################

def argMaxDict(d):
    return max(d.iterkeys(), key=(lambda key: d[key]))

def framesToTime(nFrames, frameRate, initialTime = (0.,0.,0.)):
    'returns hour, minutes and seconds'
    from math import floor
    from datetime import time
    seconds = int(floor(float(nFrames)/float(frameRate))+initialTime[0]*3600+initialTime[1]*60+initialTime[2])
    h = int(floor(seconds/3600.))
    seconds = seconds - h*3600
    m = int(floor(seconds/60))
    seconds = seconds - m*60
    return time(h, m, seconds)

def sortXY(X,Y):
    'returns the sorted (x, Y(x)) sorted on X'
    D = {}
    for x, y in zip(X,Y):
        D[x]=y
    xsorted = sorted(D.keys())
    return xsorted, [D[x] for x in xsorted]

def ceilDecimals(v, nDecimals):
    '''Rounds the number at the nth decimal
    eg 1.23 at 0 decimal is 2, at 1 decimal is 1.3'''
    from math import ceil,pow
    tens = pow(10,nDecimals)
    return ceil(v*tens)/tens

def inBetween(bound1, bound2, x):
    return bound1 <= x <= bound2 or bound2 <= x<= bound1

def crossProduct(l1, l2):
    return l1[0]*l2[1]-l1[1]*l2[0]

def filterMovingWindow(input, halfWidth):
    '''Returns an array obtained after the smoothing of the input by a moving average
    The first and last points are copied from the original.'''
    width = float(halfWidth*2+1)
    win = ones(width,'d')
    result = convolve(win/width,array(inputSignal),'same')
    result[:halfWidth] = inputSignal[:halfWidth]
    result[-halfWidth:] = inputSignal[-halfWidth:]
    return result

def linearRegression(x, y, deg = 1, plotData = False):
    '''returns the least square estimation of the linear regression of y = ax+b
    as well as the plot'''
    from numpy.lib.polynomial import polyfit
    from matplotlib.pyplot import plot
    from numpy.core.multiarray import arange
    coef = polyfit(x, y, deg)
    if plotData:
        def poly(x):
            result = 0
            for i in range(len(coef)):
                result += coef[i]*x**(len(coef)-i-1)
            return result
        plot(x, y, 'x')
        xx = arange(min(x), max(x),(max(x)-min(x))/1000)
        plot(xx, [poly(z) for z in xx])
    return coef

#########################
# sequence section
#########################

class LCSS:
    '''Class that keeps the LCSS parameters
    and puts together the various computations'''
    def __init__(self, similarityFunc, delta = float('inf'), aligned = False, lengthFunc = min):
        self.similarityFunc = similarityFunc
        self.aligned = aligned
        self.delta = delta
        self.lengthFunc = lengthFunc
        self.alignmentShift = 0

    def similarities(self, l1, l2, shift=0):
        from numpy import zeros, int as npint
        n1 = len(l1)
        n2 = len(l2)
        self.similarityTable = zeros((n1+1,n2+1), dtype = npint)
        for i in xrange(1,n1+1):
            for j in xrange(max(1,i-shift-self.delta),min(n2+1,i-shift+self.delta+1)):
                #print max(1,i-shift-self.delta),min(n2+1,i-shift+self.delta+1)
                if self.similarityFunc(l1[i-1], l2[j-1]):
                    self.similarityTable[i,j] = self.similarityTable[i-1,j-1]+1
                else:
                    self.similarityTable[i,j] = max(self.similarityTable[i-1,j], self.similarityTable[i,j-1])

    def subSequence(self, i, j):
        '''Returns the subsequence of two sequences
        http://en.wikipedia.org/wiki/Longest_common_subsequence_problem'''
        if i == 0 or j == 0:
            return []
        elif self.similarityTable[i][j] == self.similarityTable[i][j-1]:
            return self.subSequence(i, j-1)
        elif self.similarityTable[i][j] == self.similarityTable[i-1][j]:
            return self.subSequence(i-1, j)
        else:
            return self.subSequence(i-1, j-1) + [(i-1,j-1)]

    def computeLCSS(self, l1, l2, computeSubSequence = False):
        '''returns the longest common subsequence similarity
        based on the threshold on distance between two elements of lists l1, l2
        similarityFunc returns True or False whether the two points are considered similar

        eg distance(p1, p2) < epsilon
        '''
        #from numpy import argmax
        self.similarities(l1, l2)
        #self.similarityTable = self.similarityTable[:, :min(len(l2), len(l1)+self.delta)+1]
        if computeSubSequence:
            self.subSequenceIndices = self.subSequence(len(l1), len(l2))
        return self.similarityTable.max()

    def _compute(self, _l1, _l2, computeSubSequence = False):
        '''returns the best matching if using a finite delta by shiftinig the series alignments'''
        if len(_l2) < len(_l1): # l1 is the shortest
            l1 = _l2
            l2 = _l1
            revertIndices = True
        else:
            l1 = _l1
            l2 = _l2
            revertIndices = False
        n1 = len(l1)
        n2 = len(l2)

        if self.aligned:
            # for i in xrange(min(delta,n1), max(n1+n2-delta, n2+1)): # i is the alignment of the end of l1 in l2
            #     print l1[min(-i-1,n1):] # min(n1+n2-i,n1)
            #     print l2[max(0,i-n1):]
            #     print LCSS(l1[min(-i-1,n1):], l2[max(0,i-n1):], similarityFunc, delta)
            lcssValues = {}
            similarityTables = {}
            #for i in xrange(min(self.delta,n1), max(n1+n2-self.delta, n2+1)):
            for i in xrange(-max(n1,n2)-self.delta, +max(n1,n2)+self.delta):
                #print l1[min(-i-1,n1):] # min(n1+n2-i,n1)
                #print l2[max(0,i-n1):]
                #lcssValues[i] = self.computeLCSS(l1[min(-i-1,n1):], l2[max(0,i-n1):])
                #print i, lcssValues[i]
                lcssValues[i] = self.similarities(l1, l2, i)
                similarityTables[i] = self.similarityTable
                #print i
                print self.similarityTable
            self.alignmentShift = argMaxDict(lcssValues)
            self.similarityTable = similarityTables[self.alignmentShift]
            # do the subsequence computation here, once similarityTable is set
            #self.subSequenceIndices = self.subSequence(self.similarityTable.shape[0]-1, self.similarityTable.shape[1]-1) 
            #lcss = lcssValues[imax]
        else:
            self.alignmentShift = 0
            self.similarities(l1, l2)
        self.similarityTable = self.similarityTable[:, :min(len(l2), len(l1)+self.delta)+1]
        if computeSubSequence:
            self.subSequenceIndices = self.subSequence(self.similarityTable.shape[0]-1, self.similarityTable.shape[1]-1)
            if revertIndices:
                self.subSequenceIndices = [(j+self.alignmentShift,i) for i,j in self.subSequenceIndices]
            #self.alignmentShift = imax-n1
            else:
                self.subSequenceIndices = [(i+self.alignmentShift,j) for i,j in self.subSequenceIndices]
            #self.alignmentShift = n1-imax
        return self.similarityTable[-1,-1]

    def compute(self, l1, l2, computeSubSequence = False):
        '''get methods are to be shadowed in child classes '''
        return self._compute(l1, l2, computeSubSequence)

    def computeAlignement(self):
        from numpy import mean
        return mean([j-i for i,j in self.subSequenceIndices])+self.alignmentShift

    def _computeNormalized(self, l1, l2):
        ''' compute the normalized LCSS
        ie, the LCSS divided by the min or mean of the indicator lengths (using lengthFunc)
        lengthFunc = lambda x,y:float(x,y)/2'''
        return float(self.compute(l1, l2))/self.lengthFunc(len(l1), len(l2))

    def computeNormalized(self, l1, l2):
        return self._computeNormalized(l1, l2)

    def _computeDistance(self, l1, l2):
        ''' compute the LCSS distance'''
        return 1-self.computeNormalized(l1, l2)

    def computeDistance(self, l1, l2):
        return self._computeDistance(l1, l2)
    
#########################
# plotting section
#########################

def plotPolygon(poly, options = ''):
    'Plots shapely polygon poly'
    from numpy.core.multiarray import array
    from matplotlib.pyplot import plot
    from shapely.geometry import Polygon

    tmp = array(poly.exterior)
    plot(tmp[:,0], tmp[:,1], options)

def stepPlot(X, firstX, lastX, initialCount = 0, increment = 1):
    '''for each value in X, increment by increment the initial count
    returns the lists that can be plotted 
    to obtain a step plot increasing by one for each value in x, from first to last value
    firstX and lastX should be respectively smaller and larger than all elements in X'''
    
    sortedX = []
    counts = [initialCount]
    for x in sorted(X):
        sortedX += [x,x]
        counts.append(counts[-1])
        counts.append(counts[-1]+increment)
    counts.append(counts[-1])
    return [firstX]+sortedX+[lastX], counts

class PlottingPropertyValues:
    def __init__(self, values):
        self.values = values

    def __getitem__(self, i):
        return self.values[i%len(self.values)]

markers = PlottingPropertyValues(['+', '*', ',', '.', 'x', 'D', 's', 'o'])
scatterMarkers = PlottingPropertyValues(['s','o','^','>','v','<','d','p','h','8','+','x'])

linestyles = PlottingPropertyValues(['-', '--', '-.', ':'])

colors = PlottingPropertyValues('brgmyck') # 'w'

def plotIndicatorMap(indicatorMap, squareSize, masked = True, defaultValue=-1):
    from numpy import array, arange, ones, ma
    from matplotlib.pyplot import pcolor
    coords = array(indicatorMap.keys())
    minX = min(coords[:,0])
    minY = min(coords[:,1])
    X = arange(minX, max(coords[:,0])+1.1)*squareSize
    Y = arange(minY, max(coords[:,1])+1.1)*squareSize
    C = defaultValue*ones((len(Y), len(X)))
    for k,v in indicatorMap.iteritems():
        C[k[1]-minY,k[0]-minX] = v
    if masked:
        pcolor(X, Y, ma.masked_where(C==defaultValue,C))
    else:
        pcolor(X, Y, C)

#########################
# file I/O section
#########################

def openCheck(filename, option = 'r', quit = False):
    '''Open file filename in read mode by default
    and checks it is open'''
    try:
        return open(filename, option)
    except IOError:
        print 'File %s could not be opened.' % filename
        if quit:
            from sys import exit
            exit()
        return None

def readline(f, commentCharacter = commentChar):
    '''Modified readline function to skip comments.'''
    s = f.readline()
    while (len(s) > 0) and s.startswith(commentCharacter):
        s = f.readline()
    return s.strip()

def getLines(f, delimiterCharacter = delimiterChar):
    '''Gets a complete entry (all the lines) in between delimiterChar.'''
    dataStrings = []
    s = readline(f)
    while (len(s) > 0) and (not s.startswith(delimiterCharacter)):
        dataStrings += [s.strip()]
        s = readline(f)
    return dataStrings

class FakeSecHead(object):
    '''Add fake section header [asection]

    from http://stackoverflow.com/questions/2819696/parsing-properties-file-in-python/2819788#2819788
    use read_file in Python 3.2+
    '''
    def __init__(self, fp):
        self.fp = fp
        self.sechead = '[main]\n'

    def readline(self):
        if self.sechead:
            try: return self.sechead
            finally: self.sechead = None
        else: return self.fp.readline()

def removeExtension(filename, delimiter = '.'):
    '''Returns the filename minus the extension (all characters after last .)'''
    i = filename.rfind(delimiter)
    if i>0:
        return filename[:i]
    else:
        return filename

def cleanFilename(s):
    'cleans filenames obtained when contatenating figure characteristics'
    return s.replace(' ','-').replace('.','').replace('/','-')

def listfiles(dirname, extension, remove = False):
    '''Returns the list of files with the extension in the directory dirname
    If remove is True, the filenames are stripped from the extension'''
    from os import listdir
    tmp = [f for f in listdir(dirname) if f.endswith(extension)]
    tmp.sort()
    if remove:
        return [removeExtension(f, extension) for f in tmp]
    else:
        return tmp

def mkdir(dirname):
    'Creates a directory if it does not exist'
    import os
    if not os.path.exists(dirname):
        os.mkdir(dirname)
    else:
        print(dirname+' already exists')

def removeFile(filename):
    '''Deletes the file while avoiding raising an error 
    if the file does not exist'''
    import os
    if (os.path.exists(filename)):
        os.remove(filename)
    else:
        print(filename+' does not exist')

def line2Floats(l, separator=' '):
    '''Returns the list of floats corresponding to the string'''
    return [float(x) for x in l.split(separator)]

def line2Ints(l, separator=' '):
    '''Returns the list of ints corresponding to the string'''
    return [int(x) for x in l.split(separator)]

#########################
# CLI utils
#########################

def parseCLIOptions(helpMessage, options, cliArgs, optionalOptions=[]):
    ''' Simple function to handle similar argument parsing
    Returns the dictionary of options and their values

    * cliArgs are most likely directly sys.argv 
    (only the elements after the first one are considered)
    
    * options should be a list of strings for getopt options, 
    eg ['frame=','correspondences=','video=']
    A value must be provided for each option, or the program quits'''
    import sys, getopt
    from numpy.core.fromnumeric import all
    optionValues, args = getopt.getopt(cliArgs[1:], 'h', ['help']+options+optionalOptions)
    optionValues = dict(optionValues)

    if '--help' in optionValues.keys() or '-h' in optionValues.keys():
        print(helpMessage+
              '\n - Compulsory options: '+' '.join([opt.replace('=','') for opt in options])+
              '\n - Non-compulsory options: '+' '.join([opt.replace('=','') for opt in optionalOptions]))
        sys.exit()

    missingArgument = [('--'+opt.replace('=','') in optionValues.keys()) for opt in options]
    if not all(missingArgument):
        print('Missing argument')
        print(optionValues)
        sys.exit()

    return optionValues

class TrackingParameters:
    def loadConfigFile(self, filename):
        from ConfigParser import ConfigParser
        from numpy import loadtxt
        from os import path

        config = ConfigParser()
        config.readfp(FakeSecHead(openCheck(filename)))
        self.sectionHeader = config.sections()[0]
        self.videoFilename = config.get(self.sectionHeader, 'video-filename')
        self.databaseFilename = config.get(self.sectionHeader, 'database-filename')
        self.homographyFilename = config.get(self.sectionHeader, 'homography-filename')
        if (path.exists(self.homographyFilename)):
            self.homography = loadtxt(self.homographyFilename)
        else:
            self.homography = None
        self.firstFrameNum = config.getint(self.sectionHeader, 'frame1')
        self.videoFrameRate = config.getfloat(self.sectionHeader, 'video-fps')

        self.maxPredictedSpeed = config.getfloat(self.sectionHeader, 'max-predicted-speed')/3.6/self.videoFrameRate
        self.predictionTimeHorizon = config.getfloat(self.sectionHeader, 'prediction-time-horizon')*self.videoFrameRate
        self.collisionDistance = config.getfloat(self.sectionHeader, 'collision-distance')
        self.crossingZones = config.getboolean(self.sectionHeader, 'crossing-zones')
        self.predictionMethod = config.get(self.sectionHeader, 'prediction-method')
        self.nPredictedTrajectories = config.getint(self.sectionHeader, 'npredicted-trajectories')
        self.minAcceleration = config.getfloat(self.sectionHeader, 'min-acceleration')/self.videoFrameRate**2
        self.maxAcceleration = config.getfloat(self.sectionHeader, 'max-acceleration')/self.videoFrameRate**2
        self.maxSteering = config.getfloat(self.sectionHeader, 'max-steering')/self.videoFrameRate
        self.useFeaturesForPrediction = config.getboolean(self.sectionHeader, 'use-features-prediction')

#########################
# sqlite
#########################

def printDBError(error):
    print('DB Error: {}'.format(error))

def dropTables(connection, tableNames):
    'deletes the table with names in tableNames'
    import sqlite3
    try:
        cursor = connection.cursor()
        for tableName in tableNames:
            cursor.execute('DROP TABLE '+tableName)
    except sqlite3.OperationalError as error:
        printDBError(error)

#########################
# running tests
#########################

if __name__ == "__main__":
    import doctest
    import unittest
    suite = doctest.DocFileSuite('tests/utils.txt')
    #suite = doctest.DocTestSuite()
    unittest.TextTestRunner().run(suite)
    #doctest.testmod()
    #doctest.testfile("example.txt")