Mercurial Hosting > traffic-intelligence
changeset 244:5027c174ab90
moved indicators to new file, added ExtrapolatedTrajectory class to extrapolation file
author | Nicolas Saunier <nicolas.saunier@polymtl.ca> |
---|---|
date | Tue, 17 Jul 2012 00:15:42 -0400 |
parents | e0988a8ace0c |
children | bd8ab323c198 |
files | python/extrapolation.py python/indicators.py python/moving.py |
diffstat | 3 files changed, 349 insertions(+), 316 deletions(-) [+] |
line wrap: on
line diff
--- a/python/extrapolation.py Mon Jul 16 04:57:35 2012 -0400 +++ b/python/extrapolation.py Tue Jul 17 00:15:42 2012 -0400 @@ -1,166 +1,190 @@ #! /usr/bin/env python '''Library for moving object extrapolation hypotheses''' -import sys +import moving + +class ExtrapolatedTrajectory: + '''Class for extrapolated trajectories with lazy evaluation + if the predicted position has not been already computed, compute it + + it should also have a probability''' + def predictPosition(self, nTimeSteps): + return None -import moving +class ExtrapolatedTrajectoryConstant(ExtrapolatedTrajectory): + '''Extrapolated trajectory at constant speed or acceleration + TODO add limits if acceleration + TODO generalize by passing a series of velocities/accelerations''' + + def __init__(self, initialPosition, initialVelocity, initialAccleration = 0, probability = 1): + self.initialPosition = initialPosition + self.initialVelocity = initialVelocity + self.initialAccleration = initialAccleration + self.probability = probability + self.predictedPositions = {} + self.predictedVelocities = {} + + def predictPosition(self, nTimeSteps): + if not nTimeSteps in self.predictedPositions.keys(): + self.predictedPositions[nTimeSteps] = moving.Point.predictPosition(nTimeSteps, self.initialPosition, self.initialVelocity, self.initialAcceleration) + return self.predictedPositions[nTimeSteps] # Default values: to remove because we cannot tweak that from a script where the value may be different FPS= 25 # No. of frame per second (FPS) -vLimit= 25/FPS #assume limit speed is 90km/hr = 25 m/sec -deltaT= FPS*5 # extrapolatation time Horizon = 5 second - +vLimit= 25/FPS #assume limit speed is 90km/hr = 25 m/sec +deltaT= FPS*5 # extrapolatation time Horizon = 5 second + def motion (position, velocity, acceleration): - ''' extrapolation hypothesis: constant acceleration''' - from math import atan2,cos,sin - vInit= velocity - vInitial= velocity.norm2() - theta= atan2(velocity.y,velocity.x) - vFinal= vInitial+acceleration - - if acceleration<= 0: - v= max(0,vFinal) - velocity= moving.Point(v* cos(theta),v* sin(theta)) - position= position+ (velocity+vInit). multiply(0.5) - else: - v= min(vLimit,vFinal) - velocity= moving.Point(v* cos(theta),v* sin(theta)) - position= position+ (velocity+vInit). multiply(0.5) - return(position,velocity) + ''' extrapolation hypothesis: constant acceleration''' + from math import atan2,cos,sin + vInit= velocity + vInitial= velocity.norm2() + theta= atan2(velocity.y,velocity.x) + vFinal= vInitial+acceleration + + if acceleration<= 0: + v= max(0,vFinal) + velocity= moving.Point(v* cos(theta),v* sin(theta)) + position= position+ (velocity+vInit). multiply(0.5) + else: + v= min(vLimit,vFinal) + velocity= moving.Point(v* cos(theta),v* sin(theta)) + position= position+ (velocity+vInit). multiply(0.5) + return(position,velocity) def motionPET (position, velocity, acceleration, deltaT): - ''' extrapolation hypothesis: constant acceleration for calculating pPET ''' - from math import atan2,cos,sin,fabs - vInit= velocity - vInitial= velocity.norm2() - theta= atan2(velocity.y,velocity.x) - vFinal= vInitial+acceleration * deltaT - if acceleration< 0: - if vFinal> 0: - velocity= moving.Point(vFinal* cos(theta),vFinal* sin(theta)) - position= position+ (vInit+ velocity). multiply(0.5*deltaT) - else: - T= fabs(vInitial/acceleration) - position= position + vInit. multiply(0.5*T) - elif acceleration> 0 : - if vFinal<= vLimit: - velocity= moving.Point(vFinal* cos(theta),vFinal* sin(theta)) - position= position+ (vInit+ velocity). multiply(0.5*deltaT) - else: - time1= fabs((vLimit-vInitial)/acceleration) - velocity= moving.Point(vLimit* cos(theta),vLimit* sin(theta)) - position= (position+ (velocity+vInit). multiply(0.5*time1)) + (velocity.multiply (deltaT-time1)) - elif acceleration == 0: - position= position + velocity. multiply(deltaT) - - return position + ''' extrapolation hypothesis: constant acceleration for calculating pPET ''' + from math import atan2,cos,sin,fabs + vInit= velocity + vInitial= velocity.norm2() + theta= atan2(velocity.y,velocity.x) + vFinal= vInitial+acceleration * deltaT + if acceleration< 0: + if vFinal> 0: + velocity= moving.Point(vFinal* cos(theta),vFinal* sin(theta)) + position= position+ (vInit+ velocity). multiply(0.5*deltaT) + else: + T= fabs(vInitial/acceleration) + position= position + vInit. multiply(0.5*T) + elif acceleration> 0 : + if vFinal<= vLimit: + velocity= moving.Point(vFinal* cos(theta),vFinal* sin(theta)) + position= position+ (vInit+ velocity). multiply(0.5*deltaT) + else: + time1= fabs((vLimit-vInitial)/acceleration) + velocity= moving.Point(vLimit* cos(theta),vLimit* sin(theta)) + position= (position+ (velocity+vInit). multiply(0.5*time1)) + (velocity.multiply (deltaT-time1)) + elif acceleration == 0: + position= position + velocity. multiply(deltaT) + + return position def timePET (position, velocity, acceleration, intersectedPoint ): - ''' extrapolation hypothesis: constant acceleration for calculating pPET ''' - from math import atan2,cos,sin,fabs - vInit= velocity - vInitial= velocity.norm2() - theta= atan2(velocity.y,velocity.x) - vFinal= vInitial+acceleration * deltaT - if acceleration< 0: - if vFinal> 0: - velocity= moving.Point(vFinal* cos(theta),vFinal* sin(theta)) - time= fabs((intersectedPoint.x-position.x)/(0.5*(vInit.x+ velocity.x))) - else: - time= fabs((intersectedPoint.x-position.x)/(0.5*(vInit.x))) - elif acceleration> 0 : - if vFinal<= vLimit: - velocity= moving.Point(vFinal* cos(theta),vFinal* sin(theta)) - time= fabs((intersectedPoint.x-position.x)/(0.5*(vInit.x+ velocity.x))) - else: - time1= fabs((vLimit-vInitial)/acceleration) - velocity= moving.Point(vLimit* cos(theta),vLimit* sin(theta)) - time2= fabs((intersectedPoint.x-position.x)/(0.5*(vInit.x+ velocity.x))) - if time2<=time1: - time= time2 - else: - position2= (position+ (velocity+vInit). multiply(0.5*time1)) - time= time1+fabs((intersectedPoint.x-position2.x)/( velocity.x)) - elif acceleration == 0: - time= fabs((intersectedPoint.x-position.x)/(velocity.x)) - - return time - + ''' extrapolation hypothesis: constant acceleration for calculating pPET ''' + from math import atan2,cos,sin,fabs + vInit= velocity + vInitial= velocity.norm2() + theta= atan2(velocity.y,velocity.x) + vFinal= vInitial+acceleration * deltaT + if acceleration< 0: + if vFinal> 0: + velocity= moving.Point(vFinal* cos(theta),vFinal* sin(theta)) + time= fabs((intersectedPoint.x-position.x)/(0.5*(vInit.x+ velocity.x))) + else: + time= fabs((intersectedPoint.x-position.x)/(0.5*(vInit.x))) + elif acceleration> 0 : + if vFinal<= vLimit: + velocity= moving.Point(vFinal* cos(theta),vFinal* sin(theta)) + time= fabs((intersectedPoint.x-position.x)/(0.5*(vInit.x+ velocity.x))) + else: + time1= fabs((vLimit-vInitial)/acceleration) + velocity= moving.Point(vLimit* cos(theta),vLimit* sin(theta)) + time2= fabs((intersectedPoint.x-position.x)/(0.5*(vInit.x+ velocity.x))) + if time2<=time1: + time= time2 + else: + position2= (position+ (velocity+vInit). multiply(0.5*time1)) + time= time1+fabs((intersectedPoint.x-position2.x)/( velocity.x)) + elif acceleration == 0: + time= fabs((intersectedPoint.x-position.x)/(velocity.x)) + + return time + def motionSteering (position, velocity, deltaTheta, deltaT ): - ''' extrapolation hypothesis: steering with deltaTheta''' - from math import atan2,cos,sin - vInitial= velocity.norm2() - theta= atan2(velocity.y,velocity.x) - newTheta= theta + deltaTheta - velocity= moving.Point(vInitial* cos(newTheta),vInitial* sin(newTheta)) - position= position+ (velocity). multiply(deltaT) - return position - + ''' extrapolation hypothesis: steering with deltaTheta''' + from math import atan2,cos,sin + vInitial= velocity.norm2() + theta= atan2(velocity.y,velocity.x) + newTheta= theta + deltaTheta + velocity= moving.Point(vInitial* cos(newTheta),vInitial* sin(newTheta)) + position= position+ (velocity). multiply(deltaT) + return position + def MonteCarlo(movingObject1,movingObject2, instant): - ''' Monte Carlo Simulation : estimate the probability of collision''' - from random import uniform - from math import pow, sqrt, sin, cos,atan2 - N=1000 - ProbOfCollision = 0 - for n in range (1, N): - # acceleration limit - acc1 = uniform(-0.040444,0) - acc2 = uniform(-0.040444,0) - p1= movingObject1.getPositionAtInstant(instant) - p2= movingObject2.getPositionAtInstant(instant) - v1= movingObject1.getVelocityAtInstant(instant) - v2= movingObject2.getVelocityAtInstant(instant) - distance= (p1-p2).norm2() - distanceThreshold= 1.8 - t=1 - while distance > distanceThreshold and t <= deltaT: - # Extrapolation position - (p1,v1) = motion(p1,v1,acc1) - (p2,v2) = motion(p2,v2,acc2) - distance= (p1-p2).norm2() - if distance <=distanceThreshold: - ProbOfCollision= ProbOfCollision+1 - t+=1 - POC= float(ProbOfCollision)/N - return POC - + ''' Monte Carlo Simulation : estimate the probability of collision''' + from random import uniform + from math import pow, sqrt, sin, cos,atan2 + N=1000 + ProbOfCollision = 0 + for n in range (1, N): + # acceleration limit + acc1 = uniform(-0.040444,0) + acc2 = uniform(-0.040444,0) + p1= movingObject1.getPositionAtInstant(instant) + p2= movingObject2.getPositionAtInstant(instant) + v1= movingObject1.getVelocityAtInstant(instant) + v2= movingObject2.getVelocityAtInstant(instant) + distance= (p1-p2).norm2() + distanceThreshold= 1.8 + t=1 + while distance > distanceThreshold and t <= deltaT: + # Extrapolation position + (p1,v1) = motion(p1,v1,acc1) + (p2,v2) = motion(p2,v2,acc2) + distance= (p1-p2).norm2() + if distance <=distanceThreshold: + ProbOfCollision= ProbOfCollision+1 + t+=1 + POC= float(ProbOfCollision)/N + return POC + def velocitySteering(velocity,steering): - from math import atan2,cos,sin - vInitial= velocity.norm2() - theta= atan2(velocity.y,velocity.x) - newTheta= theta + steering - v= moving.Point(vInitial* cos(newTheta),vInitial* sin(newTheta)) - return v + from math import atan2,cos,sin + vInitial= velocity.norm2() + theta= atan2(velocity.y,velocity.x) + newTheta= theta + steering + v= moving.Point(vInitial* cos(newTheta),vInitial* sin(newTheta)) + return v def MonteCarloSteering(movingObject1,movingObject2, instant,steering1,steering2): - ''' Monte Carlo Simulation : estimate the probability of collision in case of steering''' - from random import uniform - from math import pow, sqrt, sin, cos,atan2 - N=1000 - L= 2.4 - ProbOfCollision = 0 - for n in range (1, N): - # acceleration limit - acc1 = uniform(-0.040444,0) - acc2 = uniform(-0.040444,0) - p1= movingObject1.getPositionAtInstant(instant) - p2= movingObject2.getPositionAtInstant(instant) - vInit1= movingObject1.getVelocityAtInstant(instant) - v1= velocitySteering (vInit1,steering1) - vInit2= movingObject2.getVelocityAtInstant(instant) - v2= velocitySteering (vInit2,steering2) - distance= (p1-p2).norm2() - distanceThreshold= 1.8 - t=1 - while distance > distanceThreshold and t <= deltaT: - # Extrapolation position - (p1,v1) = motion(p1,v1,acc1) - (p2,v2) = motion(p2,v2,acc2) - distance= (p1-p2).norm2() - if distance <=distanceThreshold: - ProbOfCollision= ProbOfCollision+1 - t+=1 - POC= float(ProbOfCollision)/N - return POC + ''' Monte Carlo Simulation : estimate the probability of collision in case of steering''' + from random import uniform + from math import pow, sqrt, sin, cos,atan2 + N=1000 + L= 2.4 + ProbOfCollision = 0 + for n in range (1, N): + # acceleration limit + acc1 = uniform(-0.040444,0) + acc2 = uniform(-0.040444,0) + p1= movingObject1.getPositionAtInstant(instant) + p2= movingObject2.getPositionAtInstant(instant) + vInit1= movingObject1.getVelocityAtInstant(instant) + v1= velocitySteering (vInit1,steering1) + vInit2= movingObject2.getVelocityAtInstant(instant) + v2= velocitySteering (vInit2,steering2) + distance= (p1-p2).norm2() + distanceThreshold= 1.8 + t=1 + while distance > distanceThreshold and t <= deltaT: + # Extrapolation position + (p1,v1) = motion(p1,v1,acc1) + (p2,v2) = motion(p2,v2,acc2) + distance= (p1-p2).norm2() + if distance <=distanceThreshold: + ProbOfCollision= ProbOfCollision+1 + t+=1 + POC= float(ProbOfCollision)/N + return POC
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/python/indicators.py Tue Jul 17 00:15:42 2012 -0400 @@ -0,0 +1,171 @@ +#! /usr/bin/env python +'''Class for indicators, temporal indicators, and safety indicators''' + +__metaclass__ = type + +# need for a class representing the indicators, their units, how to print them in graphs... +class TemporalIndicator: + '''Class for temporal indicators + i.e. indicators that take a value at specific instants + + values should be + * a dict, for the values at specific time instants + * or a list with a time interval object if continuous measurements + + it should have more information like name, unit''' + + def __init__(self, name, values, timeInterval=None): + self.name = name + self.isCosine = name.find('Cosine') + self.values = values + self.timeInterval = timeInterval + if timeInterval: + assert len(values) == timeInterval.length() + + def empty(self): + return len(self.values) == 0 + + def __getitem__(self, i): + if self.timeInterval: + if self.timeInterval.contains(i): + return self.values[i-self.timeInterval.first] + else: + if i in self.values.keys(): + return self.values[i] + return None # default + + def __iter__(self): + self.iterInstantNum = 0 # index in the interval or keys of the dict + return self + + def next(self): + if self.iterInstantNum >= len(self.values):#(self.timeInterval and self.iterInstantNum>=self.timeInterval.length())\ + # or (self.iterInstantNum >= self.values) + raise StopIteration + else: + self.iterInstantNum += 1 + if self.timeInterval: + return self.values[self.iterInstantNum-1] + else: + return self.values.values()[self.iterInstantNum-1] + + def getTimeInterval(self): + if not self.timeInterval and type(self.values)==dict: + instants = self.values.keys() + if instants: + self.timeInterval = TimeInterval(instants[0], instants[-1]) + else: + self.timeInterval = TimeInterval() + return self.timeInterval + + def getValues(self): + if self.timeInterval: + return self.values + else: + return self.values.values() + + def getAngleValues(self): + '''if the indicator is a function of an angle, + transform it to an angle (eg cos) + (no transformation otherwise)''' + from numpy import arccos + values = self.getValues() + if self.isCosine >= 0: + return [arccos(c) for c in values] + else: + return values + +class SeverityIndicator(TemporalIndicator): + '''Class for severity indicators + field mostSevereIsMax is True + if the most severe value taken by the indicator is the maximum''' + + def __init__(self, name, values, timeInterval=None, mostSevereIsMax=True, ignoredValue = None): + TemporalIndicator.__init__(self, name, values, timeInterval) + self.mostSevereIsMax = mostSevereIsMax + self.ignoredValue = ignoredValue + + def getMostSevereValue(self, minNInstants=1): + from matplotlib.mlab import find + from numpy.core.multiarray import array + from numpy.core.fromnumeric import mean + values = array(self.values.values()) + if self.ignoredValue: + indices = find(values != self.ignoredValue) + else: + indices = range(len(values)) + if len(indices) >= minNInstants: + values = sorted(values[indices], reverse = self.mostSevereIsMax) # inverted if most severe is max -> take the first values + return mean(values[:minNInstants]) + else: + return None + +# functions to aggregate discretized maps of indicators +# TODO add values in the cells between the positions (similar to discretizing vector graphics to bitmap) + +def indicatorMap(indicatorValues, trajectory, squareSize): + '''Returns a dictionary + with keys for the indices of the cells (squares) + in which the trajectory positions are located + at which the indicator values are attached + + ex: speeds and trajectory''' + + from numpy import floor, mean + assert len(indicatorValues) == trajectory.length() + indicatorMap = {} + for k in xrange(trajectory.length()): + p = trajectory[k] + i = floor(p.x/squareSize) + j = floor(p.y/squareSize) + if indicatorMap.has_key((i,j)): + indicatorMap[(i,j)].append(indicatorValues[k]) + else: + indicatorMap[(i,j)] = [indicatorValues[k]] + for k in indicatorMap.keys(): + indicatorMap[k] = mean(indicatorMap[k]) + return indicatorMap + +def indicatorMapFromPolygon(value, polygon, squareSize): + '''Fills an indicator map with the value within the polygon + (array of Nx2 coordinates of the polygon vertices)''' + import matplotlib.nxutils as nx + from numpy.core.multiarray import array, arange + from numpy import floor + + points = [] + for x in arange(min(polygon[:,0])+squareSize/2, max(polygon[:,0]), squareSize): + for y in arange(min(polygon[:,1])+squareSize/2, max(polygon[:,1]), squareSize): + points.append([x,y]) + inside = nx.points_inside_poly(array(points), polygon) + indicatorMap = {} + for i in xrange(len(inside)): + if inside[i]: + indicatorMap[(floor(points[i][0]/squareSize), floor(points[i][1]/squareSize))] = 0 + return indicatorMap + +def indicatorMapFromAxis(value, limits, squareSize): + '''axis = [xmin, xmax, ymin, ymax] ''' + from numpy.core.multiarray import arange + from numpy import floor + indicatorMap = {} + for x in arange(limits[0], limits[1], squareSize): + for y in arange(limits[2], limits[3], squareSize): + indicatorMap[(floor(x/squareSize), floor(y/squareSize))] = value + return indicatorMap + +def combineIndicatorMaps(maps, squareSize, combinationFunction): + '''Puts many indicator maps together + (averaging the values in each cell + if more than one maps has a value)''' + #from numpy import mean + indicatorMap = {} + for m in maps: + for k,v in m.iteritems(): + if indicatorMap.has_key(k): + indicatorMap[k].append(v) + else: + indicatorMap[k] = [v] + for k in indicatorMap.keys(): + indicatorMap[k] = combinationFunction(indicatorMap[k]) + return indicatorMap
--- a/python/moving.py Mon Jul 16 04:57:35 2012 -0400 +++ b/python/moving.py Tue Jul 17 00:15:42 2012 -0400 @@ -211,6 +211,11 @@ from matplotlib.pyplot import scatter scatter([p.x for p in points],[p.y for p in points], c=color) + @staticmethod + def predictPosition(nTimeSteps, initialPosition, initialVelocity, initialAcceleration = Point(0,0)): + '''Predicts the position in nTimeSteps at constant speed/acceleration''' + return initalPosition+velocity.multiply(nTimeSteps) + initialAcceleration.multiply(nTimeSteps**2) + class FlowVector: '''Class to represent 4-D flow vectors, ie a position and a velocity''' @@ -508,10 +513,10 @@ indices = self.positions.getIntersections(p1, p2) return [t+self.getFirstInstant() for t in indices] - def predictPosition(self, instant, deltaT, externalAcceleration = Point(0,0)): + def predictPosition(self, instant, nTimeSteps, externalAcceleration = Point(0,0)): '''Predicts the position of object at instant+deltaT, at constant speed''' - return self.getPositionAtInstant(instant) + self.getVelocityAtInstant(instant).multiply(deltaT) + externalAcceleration.multiply(deltaT**2) + return Point.predictPosition(nTimeSteps, self.getPositionAtInstant(instant), self.getVelocityAtInstant(instant), externalAcceleration) @staticmethod def collisionCourseDotProduct(movingObject1, movingObject2, instant): @@ -536,173 +541,6 @@ axis('equal') -# need for a class representing the indicators, their units, how to print them in graphs... -class TemporalIndicator: - '''Class for temporal indicators - i.e. indicators that take a value at specific instants - - values should be - * a dict, for the values at specific time instants - * or a list with a time interval object if continuous measurements - - it should have more information like name, unit''' - - def __init__(self, name, values, timeInterval=None): - self.name = name - self.isCosine = name.find('Cosine') - self.values = values - self.timeInterval = timeInterval - if timeInterval: - assert len(values) == timeInterval.length() - - def empty(self): - return len(self.values) == 0 - - def __getitem__(self, i): - if self.timeInterval: - if self.timeInterval.contains(i): - return self.values[i-self.timeInterval.first] - else: - if i in self.values.keys(): - return self.values[i] - return None # default - - def __iter__(self): - self.iterInstantNum = 0 # index in the interval or keys of the dict - return self - - def next(self): - if self.iterInstantNum >= len(self.values):#(self.timeInterval and self.iterInstantNum>=self.timeInterval.length())\ - # or (self.iterInstantNum >= self.values) - raise StopIteration - else: - self.iterInstantNum += 1 - if self.timeInterval: - return self.values[self.iterInstantNum-1] - else: - return self.values.values()[self.iterInstantNum-1] - - def getTimeInterval(self): - if not self.timeInterval and type(self.values)==dict: - instants = self.values.keys() - if instants: - self.timeInterval = TimeInterval(instants[0], instants[-1]) - else: - self.timeInterval = TimeInterval() - return self.timeInterval - - def getValues(self): - if self.timeInterval: - return self.values - else: - return self.values.values() - - def getAngleValues(self): - '''if the indicator is a function of an angle, - transform it to an angle (eg cos) - (no transformation otherwise)''' - from numpy import arccos - values = self.getValues() - if self.isCosine >= 0: - return [arccos(c) for c in values] - else: - return values - -class SeverityIndicator(TemporalIndicator): - '''Class for severity indicators - field mostSevereIsMax is True - if the most severe value taken by the indicator is the maximum''' - - def __init__(self, name, values, timeInterval=None, mostSevereIsMax=True, ignoredValue = None): - TemporalIndicator.__init__(self, name, values, timeInterval) - self.mostSevereIsMax = mostSevereIsMax - self.ignoredValue = ignoredValue - - def getMostSevereValue(self, minNInstants=1): - from matplotlib.mlab import find - from numpy.core.multiarray import array - from numpy.core.fromnumeric import mean - values = array(self.values.values()) - if self.ignoredValue: - indices = find(values != self.ignoredValue) - else: - indices = range(len(values)) - if len(indices) >= minNInstants: - values = sorted(values[indices], reverse = self.mostSevereIsMax) # inverted if most severe is max -> take the first values - return mean(values[:minNInstants]) - else: - return None - -# functions to aggregate discretized maps of indicators -# TODO add values in the cells between the positions (similar to discretizing vector graphics to bitmap) - -def indicatorMap(indicatorValues, trajectory, squareSize): - '''Returns a dictionary - with keys for the indices of the cells (squares) - in which the trajectory positions are located - at which the indicator values are attached - - ex: speeds and trajectory''' - - from numpy import floor, mean - assert len(indicatorValues) == trajectory.length() - indicatorMap = {} - for k in xrange(trajectory.length()): - p = trajectory[k] - i = floor(p.x/squareSize) - j = floor(p.y/squareSize) - if indicatorMap.has_key((i,j)): - indicatorMap[(i,j)].append(indicatorValues[k]) - else: - indicatorMap[(i,j)] = [indicatorValues[k]] - for k in indicatorMap.keys(): - indicatorMap[k] = mean(indicatorMap[k]) - return indicatorMap - -def indicatorMapFromPolygon(value, polygon, squareSize): - '''Fills an indicator map with the value within the polygon - (array of Nx2 coordinates of the polygon vertices)''' - import matplotlib.nxutils as nx - from numpy.core.multiarray import array, arange - from numpy import floor - - points = [] - for x in arange(min(polygon[:,0])+squareSize/2, max(polygon[:,0]), squareSize): - for y in arange(min(polygon[:,1])+squareSize/2, max(polygon[:,1]), squareSize): - points.append([x,y]) - inside = nx.points_inside_poly(array(points), polygon) - indicatorMap = {} - for i in xrange(len(inside)): - if inside[i]: - indicatorMap[(floor(points[i][0]/squareSize), floor(points[i][1]/squareSize))] = 0 - return indicatorMap - -def indicatorMapFromAxis(value, limits, squareSize): - '''axis = [xmin, xmax, ymin, ymax] ''' - from numpy.core.multiarray import arange - from numpy import floor - indicatorMap = {} - for x in arange(limits[0], limits[1], squareSize): - for y in arange(limits[2], limits[3], squareSize): - indicatorMap[(floor(x/squareSize), floor(y/squareSize))] = value - return indicatorMap - -def combineIndicatorMaps(maps, squareSize, combinationFunction): - '''Puts many indicator maps together - (averaging the values in each cell - if more than one maps has a value)''' - #from numpy import mean - indicatorMap = {} - for m in maps: - for k,v in m.iteritems(): - if indicatorMap.has_key(k): - indicatorMap[k].append(v) - else: - indicatorMap[k] = [v] - for k in indicatorMap.keys(): - indicatorMap[k] = combinationFunction(indicatorMap[k]) - return indicatorMap - if __name__ == "__main__": import doctest import unittest