I’ve been looking for an interesting problem to solve using MapReduce for some time now. I’ve been curious about the paradigm and how it can be applied to churn through large sets of data across multiple processing cores: something that’s especially relevant as we need to distribute processing – whether to cores that are local or up in the cloud – in order to improve software performance.
I talked about much of this when I looked at F#’s Asynchronous Workflows way back when (nearly 6 years ago – ouch).
MapReduce frameworks – and there are a number out there: in this case we’re using Google’s “experimental” version, but Amazon also hosts the Hadoop framework to run Elastic MapReduce on EC2 and S3 – work on the principle of chunking big data-processing tasks into lots of tiny ones that can be run independently (the “map” piece) and have the results combined at the end (the “reduce” piece).
The series I’ve been working on to to de-skew perspective photos and bring the results into AutoCAD seemed a reasonable opportunity to investigate this technology. The nature of this particular task is such that we can transform parts of the image in parallel and then generate the output pixels in parallel, too.
With Google App Engine’s MapReduce, your map() function takes some data input (and in GAE there are various input readers, whether taking lines of text from a file, files from a .zip or entities from a data-store), works out the results and returns a key-value pair. The key helps identify like results: all values with a certain key associated will be grouped together during an interim shuffle stage (it’s really MapShuffleReduce, but then the shuffling happens invisibly and so doesn’t get a specific mention ;-) and then your reduce() function is called with a key and a list of the values that were generated for that key.
Looking specifically at our example of de-skewing images: in this case we’re chaining together a couple of MapReduce pipelines. The first takes rows of pixels in the input image and transforms them to the target coordinate system, and the second works through and generates the rows of pixels for the output image that will then get “reduced” into a resultant PNG.
We start by focusing on a subset of the original image to work with:
The “transform” pipeline then takes this – with a separate “task” for each row in the area of the input image that we care about – and transforms the pixel information into our target coordinate system (where the skewed area is rectangular):
The “output” pipeline uses a “task” to generate each row of the output image:
The results of the first pipeline need to be in place before we can start the second – as the second is going to query them when creating its own results – but I could even imagine a scenario where there’s some overlap managed between the two. It might be possible to delay certain operations in the second until the required data is in place, for example, but it would certainly take more effort to coordinate.
The original code has already been adjusted to implement a crude caching mechanism for each task in the output pipeline: when they need a row of input data they haven’t already asked for (which will happen a lot: as you can see the input rows on the left a very skewed compared with the output pixels we care about) this will be read in on-demand. This saves from us reading in the whole image for each instance of the mapper – or sharing memory across multiple mappers, which would reduce flexibility if it’s even possible – and different output rows will need a different combination of input rows, in any case. This caching mechanism could probably be extended to wait for rows that are not yet ready, but – again – more coordination work would be needed.
And on the subject of coordination… splitting these tasks up, marshalling the data around, shuffling the results and reducing them into something coherent… all this takes resources. There has to be sufficient benefit in doing this for it to be worth the overhead, so for small images I’d imagine keeping a linear code-path and then only using this technique where the benefits justify the resource investment (i.e. with images that the linear implementation can’t easily cope with). I haven’t done much analysis on where this line is, but it has to be there somewhere.
Before showing you the code, I should probably mention that I’ve almost certainly done things that are sub-optimal. GAE has various ways of storing and passing data: I use the NDB Datastore to create the input records that defines the tasks for the two pipelines – using the DatastoreInputReader to query them – but then I use memcache to store the results of each of the map() functions (as the results otherwise get serialized via JSON for the shuffle and reduce phase) which I then pick up later on when they’re needed. The standard BlobstoreOutputWriter is used to generate our PNG in the Blobstore.
These decisions were mostly made through an unfortunate combination of expediency and ignorance: it took me quite some time to get this far and I know the implementation is far from perfect. In fact, for larger images some of the datastore input records are simply not being found – which seems to be better when I reduce the batch size for the input reader, but still isn’t 100% – but the principles of how the task has been split up are sound, even if the configuration of the processing mechanism could probably benefit from further tweaking.
So this is very much a work in progress (even though it remains to be seen whether I’ll spend much more time on it than I have thus far – I think I’ve learned as much as I need to about GAE’s implementation of MapReduce, for now).
In terms of how the code works currently: it seems to work tolerably well, but it’s not quick. But then I haven’t thrown much by way of the cloud’s resources at it: it’d be interesting to see whether assigning lots more “shards” to process the mapping tasks for each pipeline would lead to things happening more quickly – or even having the work done via more powerful back-end instances, as alluded to last time. I’ve chosen not to spend time looking at that, as I’ve so far been more interested in the theoretical problem of implementing MapReduce, rather than the practical one of having it work well enough to actually use. :-)
Anyway, here’s the Python code, for people to take a look at:
import os
import urllib
import webapp2
import cgi
import logging
import pickle
import time
from deskew import *
from image import image2writer
from io import BytesIO
from google.appengine.ext import blobstore
from google.appengine.ext.webapp import blobstore_handlers
from google.appengine.ext import ndb
from google.appengine.api import memcache
from mapreduce import base_handler
from mapreduce import mapreduce_pipeline
from mapreduce import operation as op
from mapreduce import shuffler
from mapreduce import context
from mapreduce import model
class MainHandler(webapp2.RequestHandler):
def get(self):
upload_url = blobstore.create_upload_url('/upload')
sro = self.response.out
sro.write('<html><body>')
sro.write(
'<form action="%s" method="POST" enctype="multipart/form-data">'
% upload_url)
sro.write('Upload File:')
sro.write('<input type="file" name="file"><br/>')
sro.write('Top left: ')
sro.write('<input type="number" name="xtl" value="82">')
sro.write('<input type="number" name="ytl" value="73"><br/>')
sro.write('Bottom left: ')
sro.write('<input type="number" name="xbl" value="81">')
sro.write('<input type="number" name="ybl" value="103"><br/>')
sro.write('Top right: ')
sro.write('<input type="number" name="xtr" value="105">')
sro.write('<input type="number" name="ytr" value="69"><br/>')
sro.write('Bottom right: ')
sro.write('<input type="number" name="xbr" value="105">')
sro.write('<input type="number" name="ybr" value="102"><br/>')
sro.write('Width over height: ')
sro.write(
'<input type="number" name="fac" step="0.1" value="1.0"><br/>')
sro.write('<input type="submit" name="submit" value="Submit">')
sro.write('</form></body></html>')
class UploadHandler(blobstore_handlers.BlobstoreUploadHandler):
def post(self):
# Get the posted PNG file in the variable img1
upload_files = self.get_uploads('file')
if len(upload_files) == 0:
self.redirect("/")
return;
blob_info = upload_files[0]
blob_reader = blobstore.BlobReader(blob_info)
img1 = blob_reader.read()
blob_reader.close()
filekey = str(blob_info.key())
# Get the various coordinate inputs and the width factor
xtl = int(cgi.escape(self.request.get('xtl')))
ytl = int(cgi.escape(self.request.get('ytl')))
xbl = int(cgi.escape(self.request.get('xbl')))
ybl = int(cgi.escape(self.request.get('ybl')))
xtr = int(cgi.escape(self.request.get('xtr')))
ytr = int(cgi.escape(self.request.get('ytr')))
xbr = int(cgi.escape(self.request.get('xbr')))
ybr = int(cgi.escape(self.request.get('ybr')))
xscale = float(cgi.escape(self.request.get('fac')))
tl = xtl, ytl
bl = xbl, ybl
tr = xtr, ytr
br = xbr, ybr
# Some constants
orgx,orgy,orgu = 'x1','x2','x3'
outx,outy,outu = 'y1','y2','y3'
orgd = {orgx, orgy, orgu}
outd = {outx, outy, outu}
orgrows = tuple(sorted(orgd))
outrows = tuple(sorted(orgd))
logging.info("Setting up the math")
H = coord_map_matrix(orgx, outx, orgd, outd,
tl, bl, tr, br, xscale)
logging.info("Loading the image from file")
# Calculate the width and height of the skewed portion of our
# input image and then calculate the output height as the
# diagonal distance (which gives more than enough pixels)
orgwid = max(abs(tr[0]-tl[0]), abs(br[0]-bl[0]))
orghgt = max(abs(br[1]-tr[1]), abs(bl[1]-tl[1]))
outhgt = floor(max(sqrt(orgwid**2 + orghgt**2), orghgt))
# Get the extents of our input area with a bit of padding
ext=((min(xtl,xbl,xtr,xbr)-2,min(ytl,ybl,ytr,ybr)-2),
(max(xtl,xbl,xtr,xbr)+2,max(ytl,ybl,ytr,ybr)+2))
# Load the image data into memory and then map it out to
# records in the NDB datastore
img = image.bytes2image(img1)
imgkey, futures = image2map(img, outhgt, ext, filekey, orgrows)
# Once done, we need to wait on each of our async writes
# (wait() is not enough - we also need to count the records
ytotal = len(futures)
for f in futures:
f.wait()
attempts = 0
threshold = 20
q = TransformData.query(ancestor=imgkey)
cnt = q.count()
if cnt == ytotal:
logging.info("We have enough TransformData objects (%d)",
ytotal)
else:
logging.info("We DON'T have enough TransformData objects (%d)",
cnt)
return
# Launch our transformation pipeline: this starts with a
# map-reduce to transform the points, followed by another to
# output the resulting image
transpipe = TransformPipeline(outx, outy, outhgt, tl, bl, tr, br,
pickle.dumps(H), xscale, ext,
filekey, imgkey.urlsafe())
transpipe.start()
self.redirect("/serve/" + filekey)
class ServeHandler(blobstore_handlers.BlobstoreDownloadHandler):
def get(self, resource):
while True:
q = ResultsData.query(ResultsData.file == resource)
cnt = q.count()
if cnt == 0:
logging.info("We don't yet have results (%s)" % resource)
time.sleep(5)
else:
break
res = q.get()
blob_info = blobstore.BlobInfo.get(res.image)
self.send_blob(blob_info)
res.key.delete()
app = webapp2.WSGIApplication(
[
('/', MainHandler),
('/upload', UploadHandler),
('/serve/([^/]+)?', ServeHandler)
],
debug=True)
def image2map(image, outhgt, ext, filekey, row_labels):
# Get the extents of the portion we're interested in, along
# with the width and height of the overall image
min,max = ext
h = len(image)
w = len(image[0])
rx, ry, ru = row_labels
xrng = range(min[0],max[0]+1,1)
yrng = range(min[1],max[1]+1,1)
ytotal = max[1]+1-min[1]
id = ImageData(file=filekey, input_rows=ytotal,
output_rows=int(outhgt))
imgkey = id.put()
logging.info(
"Will create {0} records for each ({1} to {2})".format(
ytotal, min[1], max[1]+1))
createprogmeter("Extracting points from the image", int(ytotal))
futures = []
# Loop over y: we're going to create one TransformData and one
# color record for each y value in the input image section
for y in yrng:
# Reset the accumulators for our point an color data
ptsD = (set(row_labels), set())
ptsF = {}
colorsD = ({'r', 'g', 'b'}, set())
colorsF = {}
# Loop over x, storing a row of point and color data in our
# various locations before writing them out
for x in xrng:
if x < w and y < h:
pt = (x,y)
ptsD[1].add(pt)
ptsF[(rx, pt)] = x
ptsF[(ry, pt)] = y
ptsF[(ru, pt)] = 1
col = image[y][x]
if type(col) is int:
red, green, blue = col, col, col
else:
red, green, blue = col
colorsD[1].add(pt)
colorsF[('r', pt)] = red
colorsF[('g', pt)] = green
colorsF[('b', pt)] = blue
# Write a row of point data as a mappable entry in the DataStore
td = TransformData(parent=imgkey, i=y,
pts=mat2rec(mat.Mat(ptsD, ptsF)))
futures.append(td.put_async())
# Our color data (also by row) will be in the memcache
cr = mat2rec(mat.Mat(colorsD, colorsF))
memcache.set("%s_col%d" % (filekey, y), cr)
progress()
finishprogress()
return imgkey, futures
# Classes for specifying map operations in the DataStore
class ImageData(ndb.Model):
file = ndb.StringProperty()
input_rows = ndb.IntegerProperty()
output_rows = ndb.IntegerProperty()
class TransformData(ndb.Model):
i = ndb.IntegerProperty()
pts = ndb.PickleProperty()
class OutputData(ndb.Model):
i = ndb.IntegerProperty()
k1 = ndb.FloatProperty()
k2 = ndb.FloatProperty()
y = ndb.FloatProperty()
class ResultsData(ndb.Model):
file = ndb.StringProperty()
image = ndb.StringProperty()
# MapReduce pipeline to transform the lines of our input image
class TransformPipeline(base_handler.PipelineBase):
def run(self, outx, outy, outhgt, tl, bl, tr, br, H, xscale,
ext, filekey, imgkey):
output = yield mapreduce_pipeline.MapreducePipeline(
"transform_images",
"main.transform_map",
"main.transform_reduce",
"mapreduce.input_readers.DatastoreInputReader",
"mapreduce.output_writers.BlobstoreOutputWriter",
mapper_params={
"entity_kind": "main.TransformData",
"batch_size": 1,
"top_left": tl,
"bottom_left": bl,
"top_right": tr,
"bottom_right": br,
"x_label": outx,
"y_label": outy,
"H": H
},
reducer_params={
"mime_type": "text/plain"
},
shards=16)
yield ProcessOutput(output, outhgt, tl, bl, tr, br, xscale,
ext, filekey, imgkey)
def transform_map(data):
# Get data from our model object
i = data.i
logging.info("TransformMap called on %d", i)
idkey = data.key.parent()
key = idkey.get().file
pts = rec2mat(data.pts)
data.key.delete()
# Get per-mapper data, too
ctx = context.get()
params = ctx.mapreduce_spec.mapper.params
tl = tuple(params["top_left"])
bl = tuple(params["bottom_left"])
tr = tuple(params["top_right"])
br = tuple(params["bottom_right"])
orgx = params["x_label"]
orgy = params["y_label"]
rawH = params["H"]
H = pickle.loads(str(rawH))
# Transform a single row of points
hpts = H * pts
opts = mat_move2board(hpts)
# Save the results to the memcache
reckey = "%s_pts%d" % (key, i)
memcache.set(reckey, mat2rec(opts))
logging.info("TransformMap wrote for %d (%s)", i, reckey)
# To give the reducer something to do, let's see if the row
# contains any of the extents points of our area (we could also
# just calculate this - it's min = (0,0) & max = (xscale,1),
# but it's more fun to have the reducer calculate it :-)
res = []
if tl[1] == i:
res.append((opts[(orgx,tl)],opts[(orgy,tl)]))
if bl[1] == i:
res.append((opts[(orgx,bl)],opts[(orgy,bl)]))
if tr[1] == i:
res.append((opts[(orgx,tr)],opts[(orgy,tr)]))
if br[1] == i:
res.append((opts[(orgx,br)],opts[(orgy,br)]))
yield idkey.urlsafe(), res
def transform_reduce(key, values):
logging.info(
"TransformReduce called for {0} with {1} values".format(
key,
len(values)))
idkey = ndb.Key(urlsafe=key)
id = idkey.get()
if len(values) < id.input_rows:
logging.info("Did not get enough input rows. Terminating.")
else:
# We get a lits of strings, each of which can be evaluated to
# a list
v1 = list(map(eval,values))
# We then have a list of lists, most of which will be empty
# Flatten it down to a list of coordinate pairs that we
# perform a min-max calculation on
vs = [item for sublist in v1 for item in sublist]
if len(vs) == 0:
yield""
else:
yield ((min(vs, key = lambda t: t[0])[0],
min(vs, key = lambda t: t[1])[1]),
(max(vs, key = lambda t: t[0])[0],
max(vs, key = lambda t: t[1])[1]))
# The continuation pipeline that fires once the transformations
# are complete
class ProcessOutput(base_handler.PipelineBase):
def run(self, output, outhgt, ltl, lbl, ltr, lbr, xscale, ext,
filekey, imgkey):
# Get the output of the first MapReduce pipeline, which is
# a textual blog containing the min/max of the area we care
# about in the target coord system
parts = output[0].split("/")
blob_key = blobstore.BlobKey(parts[len(parts)-1])
blob_reader = blobstore.BlobReader(blob_key)
minmax = eval(blob_reader.read())
blobstore.delete(blob_key)
# Get a usable ket for the parent image record
idkey = ndb.Key(urlsafe=imgkey)
logging.info("TransformReduce returned: {0}".format(minmax))
# Convert the JSON-serialized/de-serialized parameters in a form
# we can use (tuples get converted to lists, interestingly)
tl = tuple(ltl)
bl = tuple(lbl)
tr = tuple(ltr)
br = tuple(lbr)
xbl,ybl = bl
xtl,ytl = tl
# Calculate the output width and height
xmin = minmax[0][0]
xmax = minmax[1][0]
ymin = minmax[0][1]
ymax = minmax[1][1]
xext = floor(outhgt * xscale)
yinc = (ymax-ymin)/outhgt
xinc = (xmax-xmin)/xext
# We're going to loop through our output rows, collecting
# futures for each of our async write operations
rowcount = int(outhgt)
futures = []
for i in range(rowcount):
fac = i/outhgt
k = (round(xtl+((xbl-xtl)*fac)), round(ytl+((ybl-ytl)*fac)))
y = ymin + i * yinc
# Create an NDB entity for each row we want to output
od = OutputData(parent=idkey, i=i, k1=k[0], k2=k[1], y=y)
futures.append(od.put_async())
# Wait for all our future writes to finish
# (still better than a sync call)
for f in futures:
f.wait()
q = OutputData.query(ancestor=idkey)
cnt = q.count()
if cnt == rowcount:
logging.info("We have enough OutputData objects (%d)", cnt)
else:
logging.info("We DON'T have enough OutputData objects (%d)",
cnt)
return
# Create and launch our next MapReduce pipeline, this time to
# output
outpipe = OutputPipeline(ext, xmin, xext, xinc, filekey, imgkey)
outpipe.start()
# Some helper functions to serialize our point matrices
# to a format that's a little more compact
def mat2rec(M):
mn = min(M.D[1])
mx = max(M.D[1])
dom0 = list(M.D[0])
rows = []
for x in range(mn[0], mx[0]+1, 1):
row = []
for y in range(mn[1], mx[1]+1, 1):
tup = []
pt = (x,y)
if pt in M.D[1]:
for t in dom0:
tup.append(M[(t,pt)])
row.append(tuple(tup))
else:
row.append(None)
rows.append(row)
return (mn, mx, dom0, rows)
def rec2mat(rec):
mn, mx, dom0, rows = rec
dom1 = set()
f = {}
for x in range(mx[0]-mn[0]+1):
for y in range(mx[1]-mn[1]+1):
pt = (mn[0]+x,mn[1]+y)
val = rows[x][y]
if val != None:
dom1.add(pt)
for i,t in enumerate(dom0):
f[(t,pt)] = val[i]
return Mat((set(dom0),dom1),f)
def loadrec(rec, m):
mn, mx, dom0, rows = rec
dom1 = set()
f = {}
for x in range(mx[0]-mn[0]+1):
for y in range(mx[1]-mn[1]+1):
pt = (mn[0]+x,mn[1]+y)
val = rows[x][y]
if val != None:
dom1.add(pt)
for i,t in enumerate(dom0):
f[(t,pt)] = val[i]
m.D = (set(dom0), dom1)
m.f = f
# MapReduce pipeline to output rows of our image to a .PNG
class OutputPipeline(base_handler.PipelineBase):
def run(self, ext, xmin, xext, xinc, filekey, imgkey):
output = yield mapreduce_pipeline.MapreducePipeline(
"output_image",
"main.output_map",
"main.output_reduce",
"mapreduce.input_readers.DatastoreInputReader",
"mapreduce.output_writers.BlobstoreOutputWriter",
mapper_params={
"entity_kind": "main.OutputData",
"batch_size": 1,
"ext_min": ext[0],
"ext_max": ext[1],
"xmin": xmin,
"xext": xext,
"xinc": xinc
},
reducer_params={
"mime_type": "image/png"
},
shards=16)
yield FinishUp(output, filekey, imgkey)
class FinishUp(base_handler.PipelineBase):
def run(self, output, filekey, imgkey):
# Delete the image info entity - no longer needed
ndb.Key(urlsafe=imgkey).delete()
# Store the location of the output blob in a record
# that's associated with the input file
parts = output[0].split("/")
rd = ResultsData(file=filekey)
rd.image = parts[len(parts)-1]
rd.put()
def output_map(data):
# Get model data
i = data.i
logging.info("OutputMap called on %d", i)
parent = data.key.parent()
key = parent.get().file
k = data.k1, data.k2
y = data.y
# Get our per-mapper parameters
ctx = context.get()
params = ctx.mapreduce_spec.mapper.params
xmin = params["xmin"]
xext = params["xext"]
xinc = params["xinc"]
extmin = tuple(params["ext_min"])
extmax = tuple(params["ext_max"])
ext = (extmin,extmax)
# Build a row of colour information for our output image
row = []
outpts = Mat((set(),set()), {})
colors = Mat((set(),set()), {})
for j in range(int(xext)):
x = xmin + j * xinc
col,k = get_color_ex(k, outpts, colors, ext, x, y, key)
row.append(col)
# Delete the originating record
data.key.delete()
# Store the row in the memcache for later retrieval
idkey = parent.urlsafe()
memcache.set("%s_row%d" % (idkey, i), row)
# Yield the fact we're done for our reduce function
# (the data isn't especially important)
yield idkey, i
# We'll track the rows that have come through for a particular key
def output_reduce(key, values):
logging.info("OutputReduce called for %s with %d values",
key, len(values))
idkey = ndb.Key(urlsafe=key)
id = idkey.get()
logging.info("id.output_rows: %d", id.output_rows)
# If we have 90% of rows or more (not great: shouldn't have
# to put up with any drop off) then let's continue
if len(values) < id.output_rows * 0.9:
logging.info("Did not get enough output rows. Terminating.")
else:
# Collect our rows, reading them from the memcache
rows = []
for i in range(id.output_rows):
row = memcache.get("%s_row%d" % (key, i))
if row != None:
rows.append(row)
# Then we write them out via a BytesIO object
with BytesIO() as bio:
image.image2writer(rows, bio)
yield bio.getvalue()
# Versions of older functions that pull in point and colour
# objects from the memcache as they're needed
def get_color_ex(k, pts, colors, ext, x, y, filekey):
adj = get_adjacent_ex(k,pts,ext,x,y,filekey)
best = [k for k,v in sorted(adj, key=lambda kv: kv[1])][0]
ret = (getcolors(colors, ('r',best), filekey),
getcolors(colors, ('g',best), filekey),
getcolors(colors, ('b',best), filekey)), best
return ret
def get_adjacent_ex(k, pts, ext, x, y, filekey):
kx, ky = k
((minx,miny),(maxx,maxy)) = ext
xr = range(-1,2,1)
yr = range(-1,2,1)
adj = [(kx+i,ky+j) for i in xr for j in yr
if kx+i > minx and kx+i < maxx and ky+j > miny
and ky+j < maxy]
return [(xy,quad_midpoint_ex(xy,pts,x,y, filekey)) for xy in adj]
def getpts(pts, key, filekey):
add_to_cache(key[1], pts, "pts", filekey)
return pts[key]
def getcolors(cols, key, filekey):
add_to_cache(key[1], cols, "col", filekey)
return cols[key]
def add_to_cache(pt, m, id, key):
x, y = pt
reckey = "%s_%s%d" % (key, id, y)
mat_empty = m.D[0] == set()
if mat_empty ornot pt in m.D[1]:
rec = memcache.get(reckey)
if rec == None:
logging.info("Failed to read from {0}".format(reckey))
else:
if mat_empty:
loadrec(rec, m)
else:
mn, mx, dom0, rows = rec
for x1 in range(mx[0]-mn[0]+1):
for y1 in range(mx[1]-mn[1]+1):
pt = (mn[0]+x1,mn[1]+y1)
val = rows[x1][y1]
if val != None:
m.D[1].add(pt)
for i,t in enumerate(dom0):
m[(t,pt)] = val[i]
return m
def quad_midpoint_ex(k, pts, x, y, filekey):
kx,ky = k
rx,ry = 'y1','y2'
x0 = getpts(pts, (rx, k), filekey)
y0 = getpts(pts, (ry, k), filekey)
x2 = getpts(pts, (rx, (kx+1, ky+1)), filekey)
y2 = getpts(pts, (ry, (kx+1, ky+1)), filekey)
midx = x0+((x2-x0)/2)
midy = y0+((y2-y0)/2)
return sqrt((x-midx)**2 + (y-midy)**2)
There are some other files needed to make this work on GAE – such as app.yaml and mapreduce.yaml – so if anyone is interested in actually giving this a try, please email me or post a comment and I’ll make them available.