Do Task Queues Dream of MapReduce?
Tips and tricks about Google App Engine's Task Queue service and parallel processing with it. (by @kazunori_279) 1. What is Task Queue, 2. Parallel Query Demo, 3. The App Engine Parallelism, 4. Concurrency Control on TQ
»
Do Task Queues
Dream of MapReduce?
Kazunori Sato / Stillhouse Co.
Jobs:
Coding: App Engine/Java, Adobe Flex/AIR, Rails etc
Writing: @IT, Nikkei and etc
Seminar instructor
Translating
My Feeds:
twitter: @kazunori_279
blog/web: www.sth.co.jp
Disclaimer:
This slide is a personal publication of the author and is not any kind formal information.
Titles:
Stillhouse Co. president
appengine ja night admin
Google API Expert (App Engine)
Adobe Certified Flex/AIR Developer & Instructor
What is Task Queue?
by inserting sub-tasks into a queue
Background processing
30 sec limit
Everything should be done in 30 sec!
so, do the big task as
POST /_ah/queue/doMyTask (reqId=123)
POST /_ah/queue/doMyTask (reqId=590)
POST /_ah/queue/doMyTask (reqId=882)
...
Web Hook
Each task will be called as
Queue queue = QueueFactory.getDefaultQueue();
queue.add(url("/_ah/queue/doMyTask").param("reqId", 123))
Just add a task to the queue with params
Easy to Use
It's Asynchronous
The requestor don't have to wait
The tasks will be called later
Failed tasks will be called again
→requires Idempotence
It's Parallel
Dispatched to multiple App Servers
Can scale out when busy
Tasks on the Console
When to Use?
Batch processing on large data set
Parallel Query / Updates
Sending out e-mails
Prefetch
Gathering feeds from outside
"If it takes time, do it later"
Parallel Query Demo
100,000 entities of "Doc" kind:
docId: Serial ID
docText: 500 chars of Random alphabets
Task: Full Text Search
Reads all entities by query
Checks if docText contains a keyword
Sample Random Text
Sequential Search Demo
SequentialSearchServlet.java
Parallel Search Demo
package jp.co.sth.tqdemo;
import java.io.IOException;
import java.util.List;
import java.util.logging.Logger;
import javax.jdo.PersistenceManager;
import javax.jdo.Query;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import com.google.appengine.api.labs.taskqueue.Queue;
import com.google.appengine.api.labs.taskqueue.QueueFactory;
import com.google.appengine.api.labs.taskqueue.TaskOptions;
/**
* Performs full text search on specified block of the {@link Doc} entities. It
* searches 1000 entities starting from "fromDocId" param for keyword specified
* by "keyword" param. If it finds the keyword, it will send a XMPP message to
* JID specified by "toJid" param.
*
* <pre>
* HTTP params
*
* keyword: a keyword to be searched
* initiatedAt: a long value of a timestamp when requestor started the search.
* This will be used to calculate elapsed time when the search is finished.
* toJid: XMPP destination JID of the found notification.
* fromJid: XMPP source JIS of the found notification.
* </pre>
*
* @author kazunori_279
*/
@SuppressWarnings("serial")
public class ParallelSearchServlet extends HttpServlet {
public static final int MAX_ENTITIES_FOR_A_QUERY = 1000;
public static final String QUEUE_NAME_PAR_SEARCH = "parSearch";
private static final String QUEUE_URL_PAR_SEARCH = "/_ah/queue/par_search";
private static final String MC_KEY_TASK_COUNT_PREFIX = "MC_KEY_TASK_COUNT_";
// logger
protected final Logger log = Logger.getLogger(this.getClass().getName());
/**
* Adds a new task to do parallel search with the specified parameter.
*
* @param sr
* {@link SearchRequest} from which the {@link TaskOptions} will
* be created
*/
public static void addNewTask(final SearchRequest sr) {
// add an search task
final Queue queue = QueueFactory.getQueue(QUEUE_NAME_PAR_SEARCH);
queue.add(sr.getTaskOptions(QUEUE_URL_PAR_SEARCH));
// increase the task count
MemcacheUtils.i.increment(MC_KEY_TASK_COUNT_PREFIX + sr.getReqId());
}
public void service(HttpServletRequest req, HttpServletResponse resp)
throws IOException {
// execute query
final long startTime = System.currentTimeMillis();
final SearchRequest sr = SearchRequest
.createSearchRequestByHttpReq(req);
final PersistenceManager pm = Utils.pmf.getPersistenceManager();
final Query query = pm.newQuery(Doc.class);
try {
doQuery(sr, query);
query.closeAll();
} finally {
pm.close();
}
log.info("Elapsed: " + (System.currentTimeMillis() - startTime));
}
@SuppressWarnings("unchecked")
private void doQuery(final SearchRequest sr, final Query query) {
// do query
query.setFilter("docId >= _docId");
query.setOrdering("docId asc");
query.declareParameters("Long _docId");
query.setRange(0, MAX_ENTITIES_FOR_A_QUERY);
final List<Doc> results = (List<Doc>) query.execute(sr
.getStartingDocId());
// full text search on the query result
for (Doc doc : results) {
final boolean isFound = doc.getDocText().toString().indexOf(
sr.getKeyword()) != -1;
if (isFound) {
sr.sendXmppMsg("Found " + sr.getKeyword() + " on #"
+ doc.getDocId());
}
}
// record instance ID of this servlet if requested
if (sr.getCountInstances()) {
Utils.i.countInstances(getServletContext(), sr.getReqId());
}
// decrease the task count
final long taskCount = MemcacheUtils.i
.decrement(MC_KEY_TASK_COUNT_PREFIX + sr.getReqId());
// send finish message when finished
if (taskCount == 0) {
Utils.i.sendFinishXmppMsg(sr);
}
}
}
ParallelSearchServlet.java
XMPPReceiverServlet.java
The XMPP shell
Provides very simple UI
"ping": responds with "HELLO"
"search <any keywords>": starts searching
XMPP JIDs:
sequential@<app-id>.appspotchat.com
parallel@<app-id>.appspotchat.com
The Sample Code
Available at "sth-samples" project of Google Code
The App Engine Parallelism
Counting App Server Instances
public void countInstances(final ServletContext sc, final String reqId) {
// retrive an instance ID of this servlet
final String instId;
if (sc.getAttribute(reqId) != null) {
instId = (String) sc.getAttribute(reqId);
} else {
instId = UUID.randomUUID().toString();
sc.setAttribute(reqId, instId);
}
The "request was aborted" error
"Spin Up" of App Server Instance
Important factor for TQ responsiveness
Launching a new instance of the app
Could take 0.5 sec ~ >10 secs
Requests will be queued up to 10 sec
The App Engine Stack
Quoted from "From Spark Plug to Drive Train: Life of an App Engine Request", Alon Levi, Google I/O 2009
How do I make it parallel?
App Server level parallelism
Multiple client requests
Task Queue
Service level parallelism
batch update (persistentAll)
makeAsyncCall (N/A yet)
How do App Servers scale out?
An "idling" app has no instance
A few instances created upon receiving a request
Scale out to several instances for higher load
It may take a few minutes for "the warming up"
May involve the "aborted" errors during spin ups
App Engine can handle up to "30 active requests"
(This limit can be removed by request)
Concurrency Control on TQ
Thank
You!
Map and Reduce on TQ
Divide a big task
Map to small sub-tasks
Reduce those results
... sounds like MapReduce? :)
ああ
Add 100 tasks for searching 1000 entities each
(check the Task Queue console)
But TQ is not MapReduce
MapReduce is based on pure functions
(= no side effects)
Don't have to care about concurrency control
(That's the beauty of the data parallel skeleton)
Can be very large scale distributed processing
Quoted from: MapReduce: Simplied Data Processing on Large Clusters, Jeffrey Dean and Sanjay Ghemawat, Google, Inc.
Concurrency Control is required
for sharing data in TQ
If you don't share anything among tasks:
e.g. Deleting all the entities
Don't have to care about CC
If you use Datastore:
e.g. Storing word counts on Datastore
Use Datastore TX for optimistic lock
But it's slow :(
If you use Memcache:
e.g. Storing word counts on Memcache
It's fast and global (not like local memcached)
But it's volatile and has no lock
MemcacheService#increment
package jp.co.sth.tqdemo;
import java.util.Collections;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.cache.Cache;
import javax.cache.CacheException;
import javax.cache.CacheManager;
import com.google.appengine.api.memcache.MemcacheService;
import com.google.appengine.api.memcache.MemcacheServiceFactory;
/**
* Provides utility methods for Memcache access.
*
* @author kazunori_279
*/
public class MemcacheUtils {
// the singleton instance
public static final MemcacheUtils i = new MemcacheUtils();
private MemcacheUtils() {
}
// logger
protected final Logger log = Logger.getLogger(this.getClass().getName());
// Memcache instance
private Cache memcache;
// MemcacheService instance
public static final MemcacheService memcacheService = MemcacheServiceFactory
.getMemcacheService();
/**
* Returns {@link Cache} instance.
*
* @return {@link Cache}
*/
public Cache getMemcache() {
if (memcache == null) {
try {
memcache = CacheManager.getInstance().getCacheFactory()
.createCache(Collections.emptyMap());
} catch (CacheException e) {
throw new RuntimeException(e);
}
}
return memcache;
}
/**
* Increments the counter identified by the key.
*
* @param key
* @return the latest counter value
*/
public long increment(String key) {
return memcacheService.increment(key, 1, Long.valueOf(0));
}
/**
* Decrements the counter identified by the key.
*
* @param key
* @return the latest counter value
*/
public long decrement(String key) {
return memcacheService.increment(key, -1, Long.valueOf(0));
}
/**
* It tries to acquire a lock identified by the lockKey. If acquired, it
* runs the specified {@link Runnable}. If failed, it sleeps for 500ms and
* retry. It will repeat the retry for max 10 times.
*
* @param lockKey
* @param r
*/
public void runSynchronized(String lockKey, Runnable r) {
// retry until it aquires the lock
boolean acquired = false;
for (int i = 0; acquired == false && i < 10; i++) {
acquired = MemcacheUtils.i.acquireLock(lockKey);
if (acquired) {
break;
} else {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
// throw an exception if it failed
if (!acquired) {
throw new RuntimeException("Failed to lock key: " + lockKey);
}
// execute the Runnable if it aquired the lock
try {
r.run();
} finally {
MemcacheUtils.i.releaseLock(lockKey);
}
}
private boolean acquireLock(String key) {
// increase the value atomically
key += "LOCK_";
final Long l = (Long) memcacheService.increment(key, 1, new Long(0));
// check if it acuired the lock or not
if (l == 1) {
// acquired
return true;
} else {
// failed, rollback
memcacheService.increment(key, -1);
return false;
}
}
private void releaseLock(String key) {
key += "LOCK_";
memcacheService.increment(key, -1);
}
}
MemcacheUtils.java
Aggregating by Lockless Queue
The concept based on: Memcache lockless queue implementation by Jonathan
http://www.redredred.com.au/memcache-lockless-queue-implementation/
By using this, you can implement:
Atomic counter
Lockless Queue
"synchronized" clause by mutex lock
/**
* Detects an instance ID for specified {@link ServletContext} and increment
* count for it.
*
* @param sc
*/
public void countInstances(final ServletContext sc, final String reqId) {
// retrive an instance ID of this servlet
final String instId;
if (sc.getAttribute(reqId) != null) {
instId = (String) sc.getAttribute(reqId);
} else {
instId = UUID.randomUUID().toString();
sc.setAttribute(reqId, instId);
}
// aquire a lock on Memcache and increment request count for the instId
final String instIdListKey = MC_KEY_INST_ID_LIST_PREFIX + reqId;
MemcacheUtils.i.runSynchronized(instIdListKey, new Runnable() {
@SuppressWarnings("unchecked")
@Override
public void run() {
// get the instId Map from Memcache (or creaate new)
Map<String, Integer> instIdMap = (Map<String, Integer>) MemcacheUtils.memcacheService
.get(instIdListKey);
... snip ...
// count up and store it to the Memcache
count++;
instIdMap.put(instId, count);
MemcacheUtils.memcacheService.put(instIdListKey, instIdMap);
}
});
}
Utils#countInstances
TQ limitations
Still an "experimental" feature
The 30 sec limit also applies
Max 10 queues
Max task rate: 20 tasks/s per app
Max tasks/day: 100k (free)/1M(billing)
Add one task for searching 10000 entities
Each task adds next one at the end
(total 10 tasks chained)
In App Engine, all the requests have the
"synchronized" clause by mutex lock
synchronized (lock) {
... // exclusive access
}
XMPP JIDs:
sequential-count@<app-id>.appspotchat.com
parallel-count@<app-id>.appspotchat.com
Counting Instance Demo
Until we get the CAS on Memcache, the only solution is ...
From: "Ikai L (Google)" <ika...@google.com>
Date: Thu, 4 Mar 2010 11:43:31 -0800
Local: Fri, Mar 5 2010 4:43 am
Subject: Re: [appengine-java] Sporadic problems with very high response times
(Molson from the IRC office hours?)
Some small percentage of your application's requests will always be
loading requests, as this is us spinning up a new instance of your
application to either grow for capacity or tearing down your instance
and putting it back up as resource allocation demands. We can't
predict when this will happen. You may want to star this issue:
http://code.google.com/p/googleappengine/issues/detail?id=2456
Startup time is generally a function of several different things:
- Spinning up the JVM (Relatively cheap, but on the order of magnitude of 500ms - 1s)
- How many dependencies are you loading? (Relatively cheap compared to JVM spinup)
- Framework init (Can be VERY expensive - loading up a dynamic language runtime
will always take a few seconds. Some frameworks will also scan every class in your
classpath. Spring, for instance, does this to look for annotations eagerly on init time)
Strategies to counteract these factors include optimizing for lazy
loading, which spreads the total load time across acess to several
different resources. Not many existing frameworks do this.
As your application grows, loading requests should account for a
smaller and smaller percentage of your total requests. I've seen
solutions with rich applications that show a static page loading
dynamic resources as a general landing page. This doesn't solve the
load time solution, but it meets the user halfway by making a web app
appear to load faster as opposed to causing a user's brower window to
be blank while waiting for a request to be handled.
--
Ikai Lan
Developer Programs Engineer, Google App Engine
http://googleappengine.blogspot.com | http://twitter.com/app_engine
The choice of framework is the key
How Googler explained about the spin up time on GAE ML
Test Result
Searched for "ABCD" 5 times each
Parallel is 4x faster when it's "warmed up"
package jp.co.sth.tqdemo;
import java.io.IOException;
import java.util.UUID;
import java.util.logging.Logger;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import com.google.appengine.api.xmpp.Message;
/**
* Receives XMPP messages and add appropriate task (sequential search or
* parallel search) on the Task Queues.
*
* @author kazunori_279
*/
@SuppressWarnings("serial")
public class XMPPReceiverServlet extends HttpServlet {
public static final String JID_PREFIX_SEQ = "sequential";
public static final String JID_PREFIX_PAR = "parallel";
public static final String JID_SUFFIX_COUNT = "-count";
// logger
protected final Logger log = Logger.getLogger(this.getClass().getName());
public void service(HttpServletRequest req, HttpServletResponse resp)
throws IOException {
// build SearchRequest from the XMPP message
final Message reqMsg = Utils.xmppService.parseMessage(req);
final SearchRequest sr = new SearchRequest();
sr.setFromJid(reqMsg.getRecipientJids()[0]);
sr.setToJid(reqMsg.getFromJid());
sr.setInitiatedAt(Utils.i.getGlobalTimestamp());
sr.setReqId(UUID.randomUUID().toString());
final String recipientJid = sr.getFromJid().getId();
sr.setCountInstances(recipientJid.indexOf(JID_SUFFIX_COUNT) != -1);
// parse the request
final String[] tokens = reqMsg.getBody().trim().toUpperCase()
.split(" ");
final String command = tokens[0];
final String param;
if (tokens.length == 2) {
param = tokens[1];
} else {
param = null;
}
// ping?
if (command.equals("PING")) {
sr.setPing(true);
if (param != null) {
sr.setCountdownMillis(Long.parseLong(param));
} else {
sr.setCountdownMillis(null);
}
SequentialSearchServlet.addNewTask(sr);
sr.sendXmppMsg("Ping task added.");
return;
}
// do search
if (command.equals("SEARCH")) {
sr.setKeyword(param);
doSearch(sr, recipientJid);
return;
}
// no such command
sr.sendXmppMsg("No such command: " + command);
}
private void doSearch(final SearchRequest sr, final String recipientJid) {
// if sequential search, add one task for it
final boolean isSequentialSearch = recipientJid
.startsWith(JID_PREFIX_SEQ);
if (isSequentialSearch) {
sr.sendXmppMsg("Sequential search started on: " + sr.getKeyword());
SequentialSearchServlet.addNewTask(sr);
}
// if parallel search, add 100 tasks for it
final boolean isParallelSearch = recipientJid
.startsWith(JID_PREFIX_PAR);
if (isParallelSearch) {
sr.sendXmppMsg("Parallel search started on: " + sr.getKeyword());
for (long i = 0; i < Utils.TEST_DOCS_COUNT; i += ParallelSearchServlet.MAX_ENTITIES_FOR_A_QUERY) {
sr.setStartingDocId(i);
ParallelSearchServlet.addNewTask(sr);
}
}
}
}
package jp.co.sth.tqdemo;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.logging.Logger;
import javax.jdo.PersistenceManager;
import javax.jdo.Query;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.datanucleus.store.appengine.query.JDOCursorHelper;
import com.google.appengine.api.labs.taskqueue.Queue;
import com.google.appengine.api.labs.taskqueue.QueueFactory;
/**
* Performs full text search on specified block of the {@link Doc} entities. If
* it finds the keyword, it will send a XMPP message to JID specified by "toJid"
* param.
*
* <pre>
* HTTP params
*
* keyword: a keyword to be searched
* initiatedAt: a long value of a timestamp when requestor started the search.
* This will be used to calculate elapsed time when the search is finished.
* toJid: XMPP destination JID of the found notification.
* fromJid: XMPP source JIS of the found notification.
* </pre>
*
* @author kazunori_279
*/
@SuppressWarnings("serial")
public class SequentialSearchServlet extends HttpServlet {
public static final String QUEUE_NAME_SEQ_SEARCH = "seqSearch";
private static final String QUEUE_URL_SEQ_SEARCH = "/_ah/queue/seq_search";
private static final int MAX_QUERY_SIZE = 20000;
// logger
protected final Logger log = Logger.getLogger(this.getClass().getName());
/**
* Adds a new task to do sequential search with the specified parameter.
*
* @param sr
*/
public static void addNewTask(final SearchRequest sr) {
final Queue queue = QueueFactory.getQueue(QUEUE_NAME_SEQ_SEARCH);
queue.add(sr.getTaskOptions(QUEUE_URL_SEQ_SEARCH));
}
public void service(HttpServletRequest req, HttpServletResponse resp)
throws IOException {
// retrieve SearchRequest and respond to ping
final SearchRequest sr = SearchRequest
.createSearchRequestByHttpReq(req);
if (sr.isPing()) {
sr.sendXmppMsg("HELLO! Elapsed: " + sr.getElapsedTime() + " ms");
return;
}
// execute query
final long startTime = System.currentTimeMillis();
final PersistenceManager pm = Utils.pmf.getPersistenceManager();
final Query query = pm.newQuery(Doc.class);
try {
doQuery(query, sr);
query.closeAll();
} finally {
pm.close();
}
log.info("Elapsed: " + (System.currentTimeMillis() - startTime));
}
@SuppressWarnings("unchecked")
private void doQuery(final Query query, final SearchRequest sr) {
// apply cursor if there is
if (sr.getCursor() != null) {
final Map<String, Object> extMap = new HashMap<String, Object>();
extMap.put(JDOCursorHelper.CURSOR_EXTENSION, sr.getCursor());
query.setExtensions(extMap);
}
// execute a query
query.setOrdering("docId asc");
query.setRange(0, MAX_QUERY_SIZE);
final List<Doc> results = (List<Doc>) query.execute();
// full text search on the query result
boolean isFinished = false;
for (Doc doc : results) {
final boolean isFound = doc.getDocText().toString().indexOf(
sr.getKeyword()) != -1;
if (isFound) {
sr.sendXmppMsg("Found " + sr.getKeyword() + " on #"
+ doc.getDocId());
}
isFinished = doc.getDocId() + 1 >= Utils.TEST_DOCS_COUNT;
}
// record instance ID of this servlet if requested
if (sr.getCountInstances()) {
Utils.i.countInstances(getServletContext(), sr.getReqId());
}
if (!isFinished) {
// if it's not finished add one more task with new cursor
sr.setCursor(JDOCursorHelper.getCursor(results));
addNewTask(sr);
} else {
// if it's finished, send the finish message
Utils.i.sendFinishXmppMsg(sr);
}
}
}
http://code.google.com/p/sth-samples/source/browse/#svn/trunk/tqdemo