logo separator

[mkgmap-dev] [PATCH 14/14] New thread design.

From Jeffrey C. Ollie jeff at ocjtech.us on Thu Sep 9 21:12:14 BST 2010

From: Scott Crosby <scrosby at cs.rice.edu>

---
 src/uk/me/parabola/splitter/SplitProcessor.java |  132 ++++++++++-------------
 1 files changed, 56 insertions(+), 76 deletions(-)

diff --git a/src/uk/me/parabola/splitter/SplitProcessor.java b/src/uk/me/parabola/splitter/SplitProcessor.java
index 4f7220e..71957d9 100644
--- a/src/uk/me/parabola/splitter/SplitProcessor.java
+++ b/src/uk/me/parabola/splitter/SplitProcessor.java
@@ -18,9 +18,7 @@ import java.util.Arrays;
 import java.util.BitSet;
 import java.util.Collections;
 import java.util.Date;
-import java.util.HashMap;
-import java.util.Map.Entry;
-import java.util.TreeMap;
+import java.util.List;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 
@@ -30,15 +28,18 @@ import uk.me.parabola.splitter.Relation.Member;
  * Splits a map into multiple areas.
  */
 class SplitProcessor implements MapProcessor {
-
+	public static final int NO_ELEMENTS = 10;
+	public static final int BUNDLE_SIZE = 2000;
+	
+	
 	private final SplitIntMap coords = new SplitIntMap();
 	private final SplitIntMap ways = new SplitIntMap();
 	private final IntObjMap<long[]> bigWays = new IntObjMap<long[]>();
 
 	private final OSMWriter[] writers;
-	private final BlockingQueue<Element>[] writerInputQueues;
-	private final BlockingQueue<InputQueueInfo> writerInputQueue;
-	private final ArrayList<Thread> workerThreads;
+	private final BlockingQueue<List<Element>>[] writerInputQueues;
+	private final List<Element>[] bundlingQueues;
+	Thread threads[];
 
 	private int currentNodeAreaSet;
 	private ArrayList<Integer> currentWayAreaSet, tmpWayAreaSet;
@@ -86,24 +87,18 @@ class SplitProcessor implements MapProcessor {
 		this.writers = writers;
 		makeWriterMap();
 		this.maxThreads = maxThreads;
-		this.writerInputQueue = new ArrayBlockingQueue<InputQueueInfo>(writers.length); 
 		this.writerInputQueues = new BlockingQueue[writers.length];
+		this.bundlingQueues = new ArrayList[writers.length];
+		this.threads = new Thread[writers.length];
 		for (int i = 0; i < writerInputQueues.length;i++) {
-			writerInputQueues[i] = new ArrayBlockingQueue<Element>(NO_ELEMENTS);
-			writerInputQueue.add(new InputQueueInfo(this.writers[i], writerInputQueues[i]));
+			writerInputQueues[i] = new ArrayBlockingQueue<List<Element>>(NO_ELEMENTS);
+			bundlingQueues[i] = new ArrayList<Element>(BUNDLE_SIZE);
+			threads[i] = new Thread(new OSMWriterWorker(writers[i],writerInputQueues[i]));
+			threads[i].start();
 		}
 		tmpWayAreaSet = new ArrayList<Integer>(10);
 		currentWayAreaSet = new ArrayList<Integer>(10);
 		currentRelAreaSet = new BitSet(writers.length);
-		
-		int noOfWorkerThreads = this.maxThreads - 1;
-		workerThreads = new ArrayList<Thread>(noOfWorkerThreads);
-		for (int i = 0; i < noOfWorkerThreads; i++) {
-			Thread worker = new Thread(new OSMWriterWorker());
-			worker.setName("worker-" + i);
-			workerThreads.add(worker);
-			worker.start();
-		}
 	}
 
 	@Override
@@ -220,20 +215,18 @@ class SplitProcessor implements MapProcessor {
 
 	@Override
 	public void endMap() {
-		for (int i = 0; i < writerInputQueues.length; i++) {
-			try {
-				writerInputQueues[i].put(STOP_ELEMENT);
-			} catch (InterruptedException e) {
-				throw new RuntimeException("Failed to add the stop element for worker thread " + i, e);
-			}
-		}
-		for (Thread workerThread : workerThreads) {
-			try {
-				workerThread.join();
+		try {
+			// Push the stop element into every queue.
+			for (int i = 0 ; i < threads.length ; i++ )
+				addToWorkingQueue(i,STOP_ELEMENT);
+			// Wait for them to all exit.
+			for (int i = 0 ; i < threads.length ; i++ )
+				threads[i].join();
 			} catch (InterruptedException e) {
-				throw new RuntimeException("Failed to join for thread " + workerThread.getName(), e);
+				// TODO Auto-generated catch block
+				e.printStackTrace();
 			}
-		}
+
 		for (OSMWriter writer : writers) {
 			writer.finishWrite();
 		}
@@ -350,29 +343,34 @@ class SplitProcessor implements MapProcessor {
 	}
 
 	private void addToWorkingQueue(int writerNumber, Element element) {
+		List<Element> bundle=bundlingQueues[writerNumber];
+		bundle.add(element);
+		if (bundle.size() < BUNDLE_SIZE && element != STOP_ELEMENT)
+			return;
 		try {
-			writerInputQueues[writerNumber].put(element);
+			BlockingQueue<List<Element>> queue = writerInputQueues[writerNumber];
+			queue.put(bundle);
+			bundlingQueues[writerNumber] = new ArrayList<Element>(BUNDLE_SIZE);
 		} catch (InterruptedException e) {
 			throw new RuntimeException("Failed to write node " + element.getId() + " to worker thread " + writerNumber, e);
 		}
 	}
 
-	private static class InputQueueInfo {
-		private final OSMWriter writer;
-		private final BlockingQueue<Element> inputQueue;
-
-		public InputQueueInfo(OSMWriter writer, BlockingQueue<Element> inputQueue) {
-      this.writer = writer;
-			this.inputQueue = inputQueue;
-		}
-	}
 
 	private static final Element STOP_ELEMENT = new Element();
 
-	public static final int NO_ELEMENTS = 1000;
 
 	private class OSMWriterWorker implements Runnable {
 
+		
+		private OSMWriter writer;
+		private BlockingQueue<List<Element>> queue;
+
+		public OSMWriterWorker(OSMWriter writer, BlockingQueue<List<Element>> queue) {
+			this.writer = writer;
+			this.queue = queue;
+		}
+
 		public void processElement(Element element, OSMWriter writer) throws IOException {
 			if (element instanceof Node) {
 				writer.write((Node) element);
@@ -385,41 +383,23 @@ class SplitProcessor implements MapProcessor {
 
 		@Override
 		public void run() {
-			boolean finished = false;
-			while (!finished) {
-				InputQueueInfo workPackage = writerInputQueue.poll();
-				if (workPackage==null) {
-					finished=true;
-				} else {
-					while (!workPackage.inputQueue.isEmpty()) {
-						Element element =null;
-						try {
-							element = workPackage.inputQueue.poll();
-							if (element == null) {
-								writerInputQueue.put(workPackage);
-								workPackage=null;
-								break;
-							} else if (element == STOP_ELEMENT) {
-								workPackage=null;
-								break;
-							} else {
-								processElement(element, workPackage.writer);
-							}
-							
-						} catch (InterruptedException e) {
-							throw new RuntimeException("Thread " + Thread.currentThread().getName() + " failed to get next element", e);
-						} catch (IOException e) {
-							throw new RuntimeException("Thread " + Thread.currentThread().getName() + " failed to write element " + element.getId() + '(' + element.getClass().getSimpleName() + ')', e);
-						}
-					}
-					if (workPackage != null) {
-						try {
-							writerInputQueue.put(workPackage);
-						} catch (InterruptedException e) {
-							throw new RuntimeException("Thread " + Thread.currentThread().getName() + " failed to return work package", e);
-						}
-					}
+			while (true) {
+				//System.out.println("Doing loop");
+				try {
+
+					List<Element> elements = queue.take();
+					for (Element element : elements)
+						if (element == STOP_ELEMENT)
+							return;
+						else
+							processElement(element, writer);
+
+				} catch (InterruptedException e) {
+					throw new RuntimeException("Thread " + Thread.currentThread().getName() + " failed to get next element", e);
+				} catch (IOException e) {
+					throw new RuntimeException("Thread " + Thread.currentThread().getName() + " failed to write element ",e);
 				}
+				Thread.yield();
 			}
 			System.out.println("Thread " + Thread.currentThread().getName() + " has finished");
 		}
-- 
1.7.2.3




More information about the mkgmap-dev mailing list