view python/utils.py @ 101:5cc30ba3a933

change utils.py file permission
author Nicolas Saunier <nico@confins.net>
date Wed, 13 Jul 2011 15:01:43 -0400
parents f03ec4697a09
children 550556378466
line wrap: on
line source

#! /usr/bin/env python
''' Generic utilities.'''

#from numpy import *
#from pylab import *

import moving

__metaclass__ = type

commentChar = '#'

delimiterChar = '%';

#########################
# simple statistics
#########################

def computeChi2(expected, observed):
    '''Returns the Chi2 statistics'''
    result = 0.
    for e, o in zip(expected, observed):
        result += ((e-o)*(e-o))/e
    return result

class EmpiricalDistribution:
    def nSamples(self):
        return sum(self.counts)


class EmpiricalDiscreteDistribution(EmpiricalDistribution):
    '''Class to represent a sample of a distribution for a discrete random variable
    '''
    from numpy.core.fromnumeric import sum

    def __init__(self, categories, counts):
        self.categories = categories
        self.counts = counts

    def mean(self):
        result = [float(x*y) for x,y in zip(self.categories, self.counts)]
        return sum(result)/self.nSamples()

    def var(self, mean = None):
        if not mean:
            m = self.mean()
        else:
            m = mean
        result = 0.
        squares = [float((x-m)*(x-m)*y) for x,y in zip(self.categories, self.counts)]
        return sum(squares)/(self.nSamples()-1)

    def referenceCounts(self, probability):
        '''probability is a function that returns the probability of the random variable for the category values'''
        refProba = [probability(c) for c in self.categories]
        refProba[-1] = 1-sum(refProba[:-1])
        refCounts = [r*self.nSamples() for r in refProba]
        return refCounts, refProba

class EmpiricalContinuousDistribution(EmpiricalDistribution):
    '''Class to represent a sample of a distribution for a continuous random variable
    with the number of observations for each interval
    intervals (categories variable) are defined by their left limits, the last one being the right limit
    categories contain therefore one more element than the counts'''
    def __init__(self, categories, counts):
        self.categories = categories
        self.counts = counts

    def mean(self):
        result = 0.
        for i in range(len(self.counts)-1):
            result += self.counts[i]*(self.categories[i]+self.categories[i+1])/2
        return result/self.nSamples()

    def var(self, mean = None):
        if not mean:
            m = self.mean()
        else:
            m = mean
        result = 0.
        for i in range(len(self.counts)-1):
            mid = (self.categories[i]+self.categories[i+1])/2
            result += self.counts[i]*(mid - m)*(mid - m)
        return result/(self.nSamples()-1)

    def referenceCounts(self, cdf):
        '''cdf is a cumulative distribution function
        returning the probability of the variable being less that x'''
        # refCumulativeCounts = [0]#[cdf(self.categories[0][0])]
#         for inter in self.categories:
#             refCumulativeCounts.append(cdf(inter[1]))
        refCumulativeCounts = [cdf(x) for x in self.categories[1:-1]]

        refProba = [refCumulativeCounts[0]]
        for i in xrange(1,len(refCumulativeCounts)):
            refProba.append(refCumulativeCounts[i]-refCumulativeCounts[i-1])
        refProba.append(1-refCumulativeCounts[-1])
        refCounts = [p*self.nSamples() for p in refProba]
        
        return refCounts, refProba

    def printReferenceCounts(self, refCounts=None):
        if refCounts:
            ref = refCounts
        else:
            ref = self.referenceCounts
        for i in xrange(len(ref[0])):
            print('{0}-{1} & {2:0.3} & {3:0.3} \\\\'.format(self.categories[i],self.categories[i+1],ref[1][i], ref[0][i]))


#########################
# maths section
#########################

def ceilDecimals(v, nDecimals):
    '''Rounds the number at the nth decimal
    eg 1.23 at 0 decimal is 2, at 1 decimal is 1.3'''
    from math import ceil,pow
    tens = pow(10,nDecimals)
    return ceil(v*tens)/tens

def segmentIntersection(p1, p2, p3, p4):
    '''Returns the intersecting point of the segments [p1, p2] and [p3, p4], None otherwise'''
    from numpy import matrix
    from numpy.linalg import linalg, det

    dp1 = p2-p1#[s1[0][1]-s1[0][0], s1[1][1]-s1[1][0]]
    dp2 = p4-p3#[s2[0][1]-s2[0][0], s2[1][1]-s2[1][0]]

    A = matrix([[dp1.y, -dp1.x],
                [dp2.y, -dp2.x]])
    B = matrix([[dp1.y*p1.x-dp1.x*p1.y],
                [dp2.y*p3.x-dp2.x*p3.y]])

    if linalg.det(A) == 0:#crossProduct(ds1, ds2) == 0:
        return None
    else:
        intersection = linalg.solve(A,B)
        if (moving.Interval(p1.x, p2.x, True).contains(intersection[0,0])
            and moving.Interval(p3.x, p4.x, True).contains(intersection[0,0])
            and moving.Interval(p1.y, p2.y, True).contains(intersection[1,0])
            and moving.Interval(p3.y, p4.y, True).contains(intersection[1,0])):
            return moving.Point(intersection[0,0], intersection[1,0])
        else:
            return None

def crossProduct(l1, l2):
    return l1[0]*l2[1]-l1[1]*l2[0]

def filterMovingWindow(input, halfWidth):
    '''Returns an array obtained after the smoothing of the input by a moving average
    The first and last points are copied from the original.'''
    width = float(halfWidth*2+1)
    win = ones(width,'d')
    result = convolve(win/width,array(inputSignal),'same')
    result[:halfWidth] = inputSignal[:halfWidth]
    result[-halfWidth:] = inputSignal[-halfWidth:]
    return result

#########################
# plotting section
#########################

class PlottingPropertyValues:
    def __init__(self, values):
        self.values = values

    def get(self,i):
        return self.values[i%len(self.values)]

markers = PlottingPropertyValues(['+', '*', ',', '.', 'x', 'D', 's', 'o'])

linestyles = PlottingPropertyValues(['-', '--', '-.', ':'])

colors = PlottingPropertyValues('brgmyck') # 'w'

def plotIndicatorMap(indicatorMap, squareSize):
    from numpy import array, arange, ones, ma
    from matplotlib.pyplot import pcolor
    coords = array(indicatorMap.keys())
    minX = min(coords[:,0])
    minY = min(coords[:,1])
    X = arange(minX, max(coords[:,0])+1.1)*squareSize
    Y = arange(minY, max(coords[:,1])+1.1)*squareSize
    C = -ones((len(Y), len(X)))
    for k,v in indicatorMap.iteritems():
        C[k[1]-minY,k[0]-minX] = v
    masked = ma.masked_where(C<0,C)
    pcolor(X, Y, masked)

#########################
# file I/O section
#########################

def openCheck(filename, option = 'r', quit = False):
    '''Open file filename in read mode by default
    and checks it is open'''
    try:
        return open(filename, option)
    except IOError:
        print 'File %s could not be opened.' % filename
        if quit:
            from sys import exit
            exit()
        return None

def readline(f, commentCharacter = commentChar):
    '''Modified readline function to skip comments.'''
    s = f.readline()
    while (len(s) > 0) and s.startswith(commentCharacter):
        s = f.readline()
    return s.strip()

def getLines(f, delimiterCharacter = delimiterChar):
    '''Gets a complete entry (all the lines) in between delimiterChar.'''
    dataStrings = []
    s = readline(f)
    while (len(s) > 0) and (not s.startswith(delimiterCharacter)):
        dataStrings += [s.strip()]
        s = readline(f)
    return dataStrings

def removeExtension(filename, delimiter = '.'):
    '''Returns the filename minus the extension (all characters after last .)'''
    i = filename.rfind(delimiter)
    if i>0:
        return filename[:i]
    else:
        return filename

def cleanFilename(s):
    'cleans filenames obtained when contatenating figure characteristics'
    return s.replace(' ','-').replace('.','')

def listfiles(dirname, extension, remove = False):
    '''Returns the list of files with the extension in the directory dirname
    If remove is True, the filenames are stripped from the extension'''
    from os import listdir
    tmp = [f for f in listdir(dirname) if f.endswith(extension)]
    tmp.sort()
    if remove:
        return [removeExtension(f, extension) for f in tmp]
    else:
        return tmp

def removeFile(filename):
    '''Deletes the file while avoiding raising an error 
    if the file does not exist'''
    if (os.path.exists(filename)):
        os.remove(filename)

def plotPolygon(poly, options = ''):
    from numpy.core.multiarray import array
    from matplotlib.pyplot import plot
    from shapely.geometry import Polygon

    tmp = array(poly.exterior)
    plot(tmp[:,0], tmp[:,1], options)

def line2Floats(l, separator=' '):
    '''Returns the list of floats corresponding to the string'''
    return [float(x) for x in l.split(separator)]

def line2Ints(l, separator=' '):
    '''Returns the list of ints corresponding to the string'''
    return [int(x) for x in l.split(separator)]

#########################
# running tests
#########################

if __name__ == "__main__":
    import doctest
    import unittest
    suite = doctest.DocFileSuite('tests/utils.txt')
    #suite = doctest.DocTestSuite()
    unittest.TextTestRunner().run(suite)
    #doctest.testmod()
    #doctest.testfile("example.txt")