Mercurial Hosting > traffic-intelligence
diff python/moving.py @ 244:5027c174ab90
moved indicators to new file, added ExtrapolatedTrajectory class to extrapolation file
author | Nicolas Saunier <nicolas.saunier@polymtl.ca> |
---|---|
date | Tue, 17 Jul 2012 00:15:42 -0400 |
parents | e0988a8ace0c |
children | bd8ab323c198 |
line wrap: on
line diff
--- a/python/moving.py Mon Jul 16 04:57:35 2012 -0400 +++ b/python/moving.py Tue Jul 17 00:15:42 2012 -0400 @@ -211,6 +211,11 @@ from matplotlib.pyplot import scatter scatter([p.x for p in points],[p.y for p in points], c=color) + @staticmethod + def predictPosition(nTimeSteps, initialPosition, initialVelocity, initialAcceleration = Point(0,0)): + '''Predicts the position in nTimeSteps at constant speed/acceleration''' + return initalPosition+velocity.multiply(nTimeSteps) + initialAcceleration.multiply(nTimeSteps**2) + class FlowVector: '''Class to represent 4-D flow vectors, ie a position and a velocity''' @@ -508,10 +513,10 @@ indices = self.positions.getIntersections(p1, p2) return [t+self.getFirstInstant() for t in indices] - def predictPosition(self, instant, deltaT, externalAcceleration = Point(0,0)): + def predictPosition(self, instant, nTimeSteps, externalAcceleration = Point(0,0)): '''Predicts the position of object at instant+deltaT, at constant speed''' - return self.getPositionAtInstant(instant) + self.getVelocityAtInstant(instant).multiply(deltaT) + externalAcceleration.multiply(deltaT**2) + return Point.predictPosition(nTimeSteps, self.getPositionAtInstant(instant), self.getVelocityAtInstant(instant), externalAcceleration) @staticmethod def collisionCourseDotProduct(movingObject1, movingObject2, instant): @@ -536,173 +541,6 @@ axis('equal') -# need for a class representing the indicators, their units, how to print them in graphs... -class TemporalIndicator: - '''Class for temporal indicators - i.e. indicators that take a value at specific instants - - values should be - * a dict, for the values at specific time instants - * or a list with a time interval object if continuous measurements - - it should have more information like name, unit''' - - def __init__(self, name, values, timeInterval=None): - self.name = name - self.isCosine = name.find('Cosine') - self.values = values - self.timeInterval = timeInterval - if timeInterval: - assert len(values) == timeInterval.length() - - def empty(self): - return len(self.values) == 0 - - def __getitem__(self, i): - if self.timeInterval: - if self.timeInterval.contains(i): - return self.values[i-self.timeInterval.first] - else: - if i in self.values.keys(): - return self.values[i] - return None # default - - def __iter__(self): - self.iterInstantNum = 0 # index in the interval or keys of the dict - return self - - def next(self): - if self.iterInstantNum >= len(self.values):#(self.timeInterval and self.iterInstantNum>=self.timeInterval.length())\ - # or (self.iterInstantNum >= self.values) - raise StopIteration - else: - self.iterInstantNum += 1 - if self.timeInterval: - return self.values[self.iterInstantNum-1] - else: - return self.values.values()[self.iterInstantNum-1] - - def getTimeInterval(self): - if not self.timeInterval and type(self.values)==dict: - instants = self.values.keys() - if instants: - self.timeInterval = TimeInterval(instants[0], instants[-1]) - else: - self.timeInterval = TimeInterval() - return self.timeInterval - - def getValues(self): - if self.timeInterval: - return self.values - else: - return self.values.values() - - def getAngleValues(self): - '''if the indicator is a function of an angle, - transform it to an angle (eg cos) - (no transformation otherwise)''' - from numpy import arccos - values = self.getValues() - if self.isCosine >= 0: - return [arccos(c) for c in values] - else: - return values - -class SeverityIndicator(TemporalIndicator): - '''Class for severity indicators - field mostSevereIsMax is True - if the most severe value taken by the indicator is the maximum''' - - def __init__(self, name, values, timeInterval=None, mostSevereIsMax=True, ignoredValue = None): - TemporalIndicator.__init__(self, name, values, timeInterval) - self.mostSevereIsMax = mostSevereIsMax - self.ignoredValue = ignoredValue - - def getMostSevereValue(self, minNInstants=1): - from matplotlib.mlab import find - from numpy.core.multiarray import array - from numpy.core.fromnumeric import mean - values = array(self.values.values()) - if self.ignoredValue: - indices = find(values != self.ignoredValue) - else: - indices = range(len(values)) - if len(indices) >= minNInstants: - values = sorted(values[indices], reverse = self.mostSevereIsMax) # inverted if most severe is max -> take the first values - return mean(values[:minNInstants]) - else: - return None - -# functions to aggregate discretized maps of indicators -# TODO add values in the cells between the positions (similar to discretizing vector graphics to bitmap) - -def indicatorMap(indicatorValues, trajectory, squareSize): - '''Returns a dictionary - with keys for the indices of the cells (squares) - in which the trajectory positions are located - at which the indicator values are attached - - ex: speeds and trajectory''' - - from numpy import floor, mean - assert len(indicatorValues) == trajectory.length() - indicatorMap = {} - for k in xrange(trajectory.length()): - p = trajectory[k] - i = floor(p.x/squareSize) - j = floor(p.y/squareSize) - if indicatorMap.has_key((i,j)): - indicatorMap[(i,j)].append(indicatorValues[k]) - else: - indicatorMap[(i,j)] = [indicatorValues[k]] - for k in indicatorMap.keys(): - indicatorMap[k] = mean(indicatorMap[k]) - return indicatorMap - -def indicatorMapFromPolygon(value, polygon, squareSize): - '''Fills an indicator map with the value within the polygon - (array of Nx2 coordinates of the polygon vertices)''' - import matplotlib.nxutils as nx - from numpy.core.multiarray import array, arange - from numpy import floor - - points = [] - for x in arange(min(polygon[:,0])+squareSize/2, max(polygon[:,0]), squareSize): - for y in arange(min(polygon[:,1])+squareSize/2, max(polygon[:,1]), squareSize): - points.append([x,y]) - inside = nx.points_inside_poly(array(points), polygon) - indicatorMap = {} - for i in xrange(len(inside)): - if inside[i]: - indicatorMap[(floor(points[i][0]/squareSize), floor(points[i][1]/squareSize))] = 0 - return indicatorMap - -def indicatorMapFromAxis(value, limits, squareSize): - '''axis = [xmin, xmax, ymin, ymax] ''' - from numpy.core.multiarray import arange - from numpy import floor - indicatorMap = {} - for x in arange(limits[0], limits[1], squareSize): - for y in arange(limits[2], limits[3], squareSize): - indicatorMap[(floor(x/squareSize), floor(y/squareSize))] = value - return indicatorMap - -def combineIndicatorMaps(maps, squareSize, combinationFunction): - '''Puts many indicator maps together - (averaging the values in each cell - if more than one maps has a value)''' - #from numpy import mean - indicatorMap = {} - for m in maps: - for k,v in m.iteritems(): - if indicatorMap.has_key(k): - indicatorMap[k].append(v) - else: - indicatorMap[k] = [v] - for k in indicatorMap.keys(): - indicatorMap[k] = combinationFunction(indicatorMap[k]) - return indicatorMap - if __name__ == "__main__": import doctest import unittest