Mercurial Hosting > traffic-intelligence
view python/cvutils.py @ 414:040e71067ff4
Win32 property sheet version updated to opencv 2.4.6.1
author | Jean-Philippe Jodoin <jpjodoin@gmail.com> |
---|---|
date | Sun, 08 Sep 2013 23:38:37 -0400 |
parents | 31604ef1cad4 |
children | 8fdbc13dad8b |
line wrap: on
line source
#! /usr/bin/env python '''Image/Video utilities''' try: import cv2 opencvAvailable = True except ImportError: print('OpenCV library could not be loaded (video replay functions will not be available)') opencvAvailable = False try: import skimage skimageAvailable = True except ImportError: print('Scikit-image library could not be loaded (HoG-based classification methods will not be available)') skimageAvailable = False from sys import stdout import utils #import aggdraw # agg on top of PIL (antialiased drawing) #import utils __metaclass__ = type cvRed = (0,0,255) cvGreen = (0,255,0) cvBlue = (255,0,0) cvColors = utils.PlottingPropertyValues([cvRed, cvGreen, cvBlue]) def quitKey(key): return chr(key&255)== 'q' or chr(key&255) == 'Q' def saveKey(key): return chr(key&255) == 's' def drawLines(filename, origins, destinations, w = 1, resultFilename='image.png'): '''Draws lines over the image ''' import Image, ImageDraw # PIL img = Image.open(filename) draw = ImageDraw.Draw(img) #draw = aggdraw.Draw(img) #pen = aggdraw.Pen("red", width) for p1, p2 in zip(origins, destinations): draw.line([p1.x, p1.y, p2.x, p2.y], width = w, fill = (256,0,0)) #draw.line([p1.x, p1.y, p2.x, p2.y], pen) del draw #out = utils.openCheck(resultFilename) img.save(resultFilename) def matlab2PointCorrespondences(filename): '''Loads and converts the point correspondences saved by the matlab camera calibration tool''' from numpy.lib.io import loadtxt, savetxt from numpy.lib.function_base import append points = loadtxt(filename, delimiter=',') savetxt(utils.removeExtension(filename)+'-point-correspondences.txt',append(points[:,:2].T, points[:,3:].T, axis=0)) def loadPointCorrespondences(filename): '''Loads and returns the corresponding points in world (first 2 lines) and image spaces (last 2 lines)''' from numpy.lib.npyio import loadtxt from numpy import float32 points = loadtxt(filename, dtype=float32) return (points[:2,:].T, points[2:,:].T) # (world points, image points) def cvMatToArray(cvmat): '''Converts an OpenCV CvMat to numpy array.''' print('Deprecated, use new interface') from numpy.core.multiarray import zeros a = zeros((cvmat.rows, cvmat.cols))#array([[0.0]*cvmat.width]*cvmat.height) for i in xrange(cvmat.rows): for j in xrange(cvmat.cols): a[i,j] = cvmat[i,j] return a if opencvAvailable: def computeHomography(srcPoints, dstPoints, method=0, ransacReprojThreshold=3.0): '''Returns the homography matrix mapping from srcPoints to dstPoints (dimension Nx2)''' H, mask = cv2.findHomography(srcPoints, dstPoints, method, ransacReprojThreshold) return H def arrayToCvMat(a, t = cv2.CV_64FC1): '''Converts a numpy array to an OpenCV CvMat, with default type CV_64FC1.''' print('Deprecated, use new interface') cvmat = cv2.cv.CreateMat(a.shape[0], a.shape[1], t) for i in range(cvmat.rows): for j in range(cvmat.cols): cvmat[i,j] = a[i,j] return cvmat def draw(img, positions, color, lastCoordinate = None): last = lastCoordinate+1 if lastCoordinate != None and lastCoordinate >=0: last = min(positions.length()-1, lastCoordinate) for i in range(0, last-1): cv2.line(img, positions[i].asint().astuple(), positions[i+1].asint().astuple(), color) def cvImshow(windowName, img, rescale = 1.0): 'Rescales the image (in particular if too large)' from cv2 import resize if rescale != 1.: size = (int(round(img.shape[1]*rescale)), int(round(img.shape[0]*rescale))) resizedImg = resize(img, size) cv2.imshow(windowName, resizedImg) else: cv2.imshow(windowName, img) def playVideo(filename, firstFrameNum = 0, frameRate = -1, interactive = False, printFrames = True, text = None, rescale = 1.): '''Plays the video''' wait = 5 if frameRate > 0: wait = int(round(1000./frameRate)) if interactive: wait = 0 capture = cv2.VideoCapture(filename) if capture.isOpened(): key = -1 ret = True frameNum = firstFrameNum capture.set(cv2.CAP_PROP_POS_FRAMES, firstFrameNum) while ret and not quitKey(key): ret, img = capture.read() if ret: if printFrames: print('frame {0}'.format(frameNum)) frameNum+=1 if text != None: cv2.putText(img, text, (10,50), cv2.FONT_HERSHEY_PLAIN, 1, cvRed) cvImshow('frame', img, rescale) key = cv2.waitKey(wait) cv2.destroyAllWindows() def getImagesFromVideo(videoFilename, firstFrameNum = 0, nFrames = 1, saveImage = False, outputPrefix = 'image'): '''Returns nFrames images from the video sequence''' from math import floor, log10 images = [] capture = cv2.VideoCapture(videoFilename) if capture.isOpened(): nDigits = int(floor(log10(capture.get(cv2.CAP_PROP_FRAME_COUNT))))+1 ret = False capture.set(cv2.CAP_PROP_POS_FRAMES, firstFrameNum) imgNum = 0 while imgNum<nFrames: ret, img = capture.read() i = 0 while not ret and i<10: ret, img = capture.read() i += 1 if img.size>0: if saveImage: imgNumStr = format(firstFrameNum+imgNum, '0{}d'.format(nDigits)) cv2.imwrite(outputPrefix+imgNumStr+'.png', img) else: images.append(img) imgNum +=1 capture.release() else: print('Video capture failed') return images def getFPS(videoFilename): capture = cv2.VideoCapture(videoFilename) if capture.isOpened(): fps = capture.get(cv2.CAP_PROP_FPS) capture.release() return fps else: print 'Cannot load file ' + videoFilename return None def imageBox(img, obj, frameNum, homography, width, height, px=.2, py=.2, PixelThreshold=800): x = [] y = [] for f in obj.features: if f.existsAtInstant(frameNum): x.append(f.getPositionAtInstant(frameNum).project(homography).x) y.append(f.getPositionAtInstant(frameNum).project(homography).y) Xmin = min(x) Xmax = max(x) Ymin = min(y) Ymax = max(y) XMm = px * (Xmax - Xmin) YMm = py * (Ymax - Ymin) a = max(Ymax - Ymin + (2 * YMm), Xmax - (Xmin + 2 * XMm)) Y_Crop_min = int(max(0, .5 * (Ymin + Ymax - a))) Y_Crop_Max = int(min(height - 1, .5 * (Ymin + Ymax + a))) X_Crop_min = int(max(0, .5 * (Xmin + Xmax - a))) X_Crop_Max = int(min(width - 1, .5 * (Xmin + Xmax + a))) if Y_Crop_Max != Y_Crop_min and X_Crop_Max != X_Crop_min and (Y_Crop_Max - Y_Crop_min) * (X_Crop_Max - X_Crop_min) > PixelThreshold: img_crop = img[Y_Crop_min : Y_Crop_Max, X_Crop_min : X_Crop_Max] else: img_crop = [] return img_crop, Y_Crop_min, Y_Crop_Max, X_Crop_min, X_Crop_Max def displayTrajectories(videoFilename, objects, boundingBoxes, homography = None, firstFrameNum = 0, lastFrameNumArg = None, printFrames = True, rescale = 1.): '''Displays the objects overlaid frame by frame over the video ''' from moving import userTypeNames capture = cv2.VideoCapture(videoFilename) width = int(capture.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(capture.get(cv2.CAP_PROP_FRAME_HEIGHT)) if capture.isOpened(): key = -1 ret = True frameNum = firstFrameNum capture.set(cv2.CAP_PROP_POS_FRAMES, firstFrameNum) if not lastFrameNumArg: from sys import maxint lastFrameNum = maxint else: lastFrameNum = lastFrameNumArg while ret and not quitKey(key) and frameNum < lastFrameNum: ret, img = capture.read() if ret: if printFrames: print('frame {0}'.format(frameNum)) for obj in objects: if obj.existsAtInstant(frameNum): if not hasattr(obj, 'projectedPositions'): if homography != None: obj.projectedPositions = obj.positions.project(homography) else: obj.projectedPositions = obj.positions draw(img, obj.projectedPositions, cvRed, frameNum-obj.getFirstInstant()) if frameNum in boundingBoxes.keys(): for rect in boundingBoxes[frameNum]: cv2.rectangle(img, rect[0].asint().astuple(), rect[1].asint().astuple(), cvRed) objDescription = '{} '.format(obj.num) if userTypeNames[obj.userType] != 'unknown': objDescription += userTypeNames[obj.userType][0].upper() cv2.putText(img, objDescription, obj.projectedPositions[frameNum-obj.getFirstInstant()].asint().astuple(), cv2.FONT_HERSHEY_PLAIN, 1, cvRed) if obj.features != None: img_crop, Y_Crop_min, Y_Crop_Max, X_Crop_min, X_Crop_Max = imageBox(img, obj, frameNum, homography, width, height) cv2.rectangle(img, (X_Crop_min, Y_Crop_min), (X_Crop_Max, Y_Crop_Max), cvBlue, 1) cvImshow('frame', img, rescale) key = cv2.waitKey() if saveKey(key): cv2.imwrite('image.png', img) frameNum += 1 cv2.destroyAllWindows() def computeHomographyFromPDTV(cameraFilename, method=0, ransacReprojThreshold=3.0): '''Returns the homography matrix at ground level from PDTV format https://bitbucket.org/hakanardo/pdtv''' import pdtv from numpy import array camera = pdtv.load(cameraFilename) srcPoints = [[x,y] for x, y in zip([1.,2.,2.,1.],[1.,1.,2.,2.])] # need floats!! dstPoints = [] for srcPoint in srcPoints: projected = camera.image_to_world(tuple(srcPoint)) dstPoints.append([projected[0], projected[1]]) H, mask = cv2.findHomography(array(srcPoints), array(dstPoints), method, ransacReprojThreshold) return H def printCvMat(cvmat, out = stdout): '''Prints the cvmat to out''' print('Deprecated, use new interface') for i in xrange(cvmat.rows): for j in xrange(cvmat.cols): out.write('{0} '.format(cvmat[i,j])) out.write('\n') def projectArray(homography, points): '''Returns the coordinates of the projected points through homography (format: array 2xN points)''' from numpy.core import dot from numpy.core.multiarray import array from numpy.lib.function_base import append if points.shape[0] != 2: raise Exception('points of dimension {0} {1}'.format(points.shape[0], points.shape[1])) if (homography!=None) and homography.size>0: augmentedPoints = append(points,[[1]*points.shape[1]], 0) prod = dot(homography, augmentedPoints) return prod[0:2]/prod[2] else: return p def project(homography, p): '''Returns the coordinates of the projection of the point p with coordinates p[0], p[1] through homography''' from numpy import array return projectArray(homography, array([[p[0]],[p[1]]])) def projectTrajectory(homography, trajectory): '''Projects a series of points in the format [[x1, x2, ...], [y1, y2, ...]]''' from numpy.core.multiarray import array return projectArray(homography, array(trajectory)) def invertHomography(homography): '''Returns an inverted homography Unnecessary for reprojection over camera image''' from numpy.linalg.linalg import inv invH = inv(homography) invH /= invH[2,2] return invH if opencvAvailable: def computeTranslation(img1, img2, img1Points, maxTranslation2, minNMatches, windowSize = (5,5), level = 5, criteria = (cv2.TERM_CRITERIA_EPS, 0, 0.01)): '''Computes the translation of img2 with respect to img1 (loaded using OpenCV as numpy arrays) img1Points are used to compute the translation TODO add diagnostic if data is all over the place, and it most likely is not a translation (eg zoom, other non linear distortion)''' from numpy.core.multiarray import array from numpy.lib.function_base import median from numpy.core.fromnumeric import sum nextPoints = array([]) (img2Points, status, track_error) = cv2.calcOpticalFlowPyrLK(img1, img2, img1Points, nextPoints, winSize=windowSize, maxLevel=level, criteria=criteria) # calcOpticalFlowPyrLK(prevImg, nextImg, prevPts[, nextPts[, status[, err[, winSize[, maxLevel[, criteria[, derivLambda[, flags]]]]]]]]) -> nextPts, status, err delta = [] for (k, (p1,p2)) in enumerate(zip(img1Points, img2Points)): if status[k] == 1: dp = p2-p1 d = sum(dp**2) if d < maxTranslation2: delta.append(dp) if len(delta) >= minNMatches: return median(delta, axis=0) else: print(dp) return None if skimageAvailable: def HOG(image, rescaleSize = (64, 64), orientations=9, pixelsPerCell=(8, 8), cellsPerBlock=(2, 2), visualize=False, normalize=False): from skimage.feature import hog from skimage import color, transform bwImg = color.rgb2gray(image) inputImg = transform.resize(bwImg, rescaleSize) features = hog(inputImg, orientations, pixelsPerCell, cellsPerBlock, visualize, normalize) if visualize: from matplotlib.pyplot import imshow, figure, subplot hogViz = features[1] features = features[0] figure() subplot(1,2,1) imshow(img) subplot(1,2,2) imshow(hogViz) return features def createHOGTrainingSet(imageDirectory, classLabel, rescaleSize = (64, 64), orientations=9, pixelsPerCell=(8, 8), cellsPerBlock=(2, 2), visualize=False, normalize=False): from os import listdir from numpy import array, float32 from matplotlib.pyplot import imread inputData = [] for filename in listdir(imageDirectory): img = imread(imageDirectory+filename) features = HOG(img, rescaleSize, orientations, pixelsPerCell, cellsPerBlock, visualize, normalize) inputData.append(features) nImages = len(inputData) return array(inputData, dtype = float32), array([classLabel]*nImages, dtype = float32)