idok-commit AT lists.psi.ch
Subject: Commit emails of the iDok project
List archive
[idok-commit] idok commit r328 - in branches/dmsd2/java/ch/idok: common/errorhandling dmsd/impl/config/svnlucene dmsd/impl/controller dmsd/impl/indexer/lucene dmsd/impl/queue dmsd/impl/updatelist dmsd/pipeline
Chronological Thread
- From: "AFS account Stadler Hans Christian" <stadler_h AT savannah.psi.ch>
- To: idok-commit AT lists.psi.ch
- Subject: [idok-commit] idok commit r328 - in branches/dmsd2/java/ch/idok: common/errorhandling dmsd/impl/config/svnlucene dmsd/impl/controller dmsd/impl/indexer/lucene dmsd/impl/queue dmsd/impl/updatelist dmsd/pipeline
- Date: Thu, 26 Feb 2009 11:35:48 +0100
- List-archive: <https://lists.web.psi.ch/pipermail/idok-commit/>
- List-id: Commit emails of the iDok project <idok-commit.lists.psi.ch>
Author: stadler_h
Date: Thu Feb 26 11:35:48 2009
New Revision: 328
Log:
Single threaded event based pipeline implementation
Added:
branches/dmsd2/java/ch/idok/dmsd/impl/queue/DataEvent.java
branches/dmsd2/java/ch/idok/dmsd/impl/queue/Event.java
branches/dmsd2/java/ch/idok/dmsd/impl/queue/EventPipelineQueue.java
branches/dmsd2/java/ch/idok/dmsd/impl/queue/EventQueue.java
branches/dmsd2/java/ch/idok/dmsd/impl/queue/SignalEvent.java
branches/dmsd2/java/ch/idok/dmsd/pipeline/Signal.java
Modified:
branches/dmsd2/java/ch/idok/common/errorhandling/Util.java
branches/dmsd2/java/ch/idok/dmsd/impl/config/svnlucene/Config.java
branches/dmsd2/java/ch/idok/dmsd/impl/controller/SimpleController.java
branches/dmsd2/java/ch/idok/dmsd/impl/indexer/lucene/Indexer.java
branches/dmsd2/java/ch/idok/dmsd/impl/updatelist/SimpleUpdateListGenerator.java
branches/dmsd2/java/ch/idok/dmsd/pipeline/PipelineData.java
branches/dmsd2/java/ch/idok/dmsd/pipeline/PipelineQueue.java
Modified: branches/dmsd2/java/ch/idok/common/errorhandling/Util.java
==============================================================================
--- branches/dmsd2/java/ch/idok/common/errorhandling/Util.java (original)
+++ branches/dmsd2/java/ch/idok/common/errorhandling/Util.java Thu Feb 26
11:35:48 2009
@@ -55,26 +55,83 @@
}
/**
- * @brief Append the stack trace of an exception to a string buffer.
+ * @brief Get the stack trace of the current thread
+ * as a string buffer.
+ *
+ * @return The stack trace as a string buffer.
+ */
+ public static StringBuffer getStackTrace() {
+ return getStackTrace(null, null);
+ }
+
+ /**
+ * @brief Append the stack trace of the current thread
+ * to a string buffer.
*
* @param sb
* The string buffer to which the stack trace will be
- * appended.
+ * appended. If <code>sb</code> is <code>null</code>
+ * a new string buffer will be allocated and returned.
+ * @return The stack trace as a string buffer.
+ */
+ public static StringBuffer getStackTrace(StringBuffer sb) {
+ return getStackTrace(sb, null);
+ }
+
+ /**
+ * @brief Append the stack trace of an exception or the current thread
+ * to a string buffer.
+ *
* @param th
* The exception whose stack trace is to be retrieved.
- * @return The stack trace of th as a string buffer.
+ * If <code>th</code> is <code>null</code>, the stack
trace of the
+ * current thread will be returned.
+ * @return The stack trace as a string buffer.
+ */
+ public static StringBuffer getStackTrace(Throwable th) {
+ return getStackTrace(null, th);
+ }
+
+ /**
+ * @brief Append the stack trace of an exception or the current thread
+ * to a string buffer.
+ *
+ * @param sb
+ * The string buffer to which the stack trace will be
+ * appended. If <code>sb</code> is <code>null</code>
+ * a new string buffer will be allocated and returned.
+ * @param th
+ * The exception whose stack trace is to be retrieved.
+ * If <code>th</code> is <code>null</code>, the stack
trace of the
+ * current thread will be appended to <code>sb</code>.
+ * @return The stack trace as a string buffer.
*/
public static StringBuffer getStackTrace(StringBuffer sb, Throwable th) {
- assert (th != null);
- while (true) {
- StackTraceElement[] stackTrace = th.getStackTrace();
+ if (sb == null)
+ sb = new StringBuffer();
+ while (true) {
+ StackTraceElement[] stackTrace;
+ if (th == null) {
+ sb.append("Thread: ");
+ sb.append(Thread.currentThread().toString());
+ stackTrace = Thread.currentThread().getStackTrace();
+ } else {
+ sb.append("Exception: ");
+ sb.append(th.toString());
+ stackTrace = th.getStackTrace();
+ }
+ sb.append('\n');
for (StackTraceElement ste : stackTrace) {
sb.append(ste.toString());
sb.append('\n');
}
+ if (th == null)
+ break;
th = th.getCause();
if (th != null) {
- sb.append("Caused by: " + th.toString() + "\n");
+ sb.append("Caused by: ");
+ sb.append(th.toString());
+ sb.append('\n');
} else
break;
}
Modified: branches/dmsd2/java/ch/idok/dmsd/impl/config/svnlucene/Config.java
==============================================================================
--- branches/dmsd2/java/ch/idok/dmsd/impl/config/svnlucene/Config.java
(original)
+++ branches/dmsd2/java/ch/idok/dmsd/impl/config/svnlucene/Config.java Thu
Feb 26 11:35:48 2009
@@ -70,7 +70,7 @@
import ch.idok.dmsd.impl.extractor.zip.ZipExtractorFactory;
import ch.idok.dmsd.impl.indexer.lucene.Indexer;
import ch.idok.dmsd.impl.loader.ContentLoader;
-import ch.idok.dmsd.impl.queue.SimplePipelineQueue;
+import ch.idok.dmsd.impl.queue.EventPipelineQueue;
import ch.idok.dmsd.impl.updatelist.SimpleUpdateListGenerator;
import ch.idok.dmsd.indexer.ContentExtractor;
import ch.idok.dmsd.indexer.ContentExtractorFactory;
@@ -127,10 +127,10 @@
private String lastVersionFile;
/** @brief The head queue for the reconfigurable pipeline part. */
- private SimplePipelineQueue headQueue;
+ private EventPipelineQueue headQueue;
/** @brief The tail queue for the reconfigurable pipeline part. */
- private SimplePipelineQueue tailQueue;
+ private EventPipelineQueue tailQueue;
/**
* @brief The NoOp pipeline stage used when theres is no repository to be
@@ -262,12 +262,12 @@
controller = new SimpleController();
controller.initialize();
logger.fine("Created pipeline controller");
- SimplePipelineQueue queue = new SimplePipelineQueue();
+ EventPipelineQueue queue = new EventPipelineQueue();
queue.connectHead(controller);
headQueue = queue;
// ----------------------------------------------
// Pipeline piece after the reconfigurable part
- queue = new SimplePipelineQueue();
+ queue = new EventPipelineQueue();
queue.connectTail(controller);
tailQueue = queue;
// -----------------------------
@@ -528,21 +528,21 @@
repositoryId);
updateListGenerator.initialize();
logger.fine("Created list generator");
- SimplePipelineQueue queue = new SimplePipelineQueue();
+ EventPipelineQueue queue = new EventPipelineQueue();
queue.connectHead(updateListGenerator);
ContentLoader loader = new ContentLoader("Content Loader for "
+ repositoryId);
loader.initialize();
logger.fine("Created content loader");
queue.connectTail(loader);
- queue = new SimplePipelineQueue();
+ queue = new EventPipelineQueue();
queue.connectHead(loader);
Extractor extractor = new Extractor("Content Extractor for "
+ repositoryId);
extractor.initialize();
logger.fine("Created extractor");
queue.connectTail(extractor);
- queue = new SimplePipelineQueue();
+ queue = new EventPipelineQueue();
queue.connectHead(extractor);
Indexer indexer = new Indexer(indexFile);
indexer.initialize();
Modified:
branches/dmsd2/java/ch/idok/dmsd/impl/controller/SimpleController.java
==============================================================================
--- branches/dmsd2/java/ch/idok/dmsd/impl/controller/SimpleController.java
(original)
+++ branches/dmsd2/java/ch/idok/dmsd/impl/controller/SimpleController.java
Thu Feb 26 11:35:48 2009
@@ -37,6 +37,8 @@
import ch.idok.dmsd.config.Config;
import ch.idok.dmsd.config.Setup;
import ch.idok.dmsd.impl.extractor.Extractor;
+import ch.idok.dmsd.impl.queue.Event;
+import ch.idok.dmsd.impl.queue.EventQueue;
import ch.idok.dmsd.indexer.ContentExtractor;
import ch.idok.dmsd.management.DaemonManager;
import ch.idok.dmsd.monitoring.MonitoringData;
@@ -45,7 +47,6 @@
import ch.idok.dmsd.pipeline.PipelineData;
import ch.idok.dmsd.pipeline.PipelineSignal;
import ch.idok.dmsd.pipeline.PipelineSignalException;
-import ch.idok.dmsd.pipeline.PipelineStage;
import ch.idok.dmsd.pipeline.Signal;
/**
@@ -137,20 +138,6 @@
}
// -------------- Error Handling ------------------------
- /**
- * @brief Print a stack trace.
- *
- * @return The stack trace as a string buffer.
- */
- private StringBuffer printStackTrace() {
- StackTraceElement[] stackTrace =
Thread.currentThread().getStackTrace();
- StringBuffer sb = new StringBuffer();
- for (StackTraceElement ste : stackTrace) {
- sb.append(ste.toString());
- sb.append('\n');
- }
- return sb;
- }
/**
* @brief Check if the metaData parameter is ok.
@@ -161,12 +148,8 @@
*/
private boolean badMetaData(Map<String, String> metaData) {
if (metaData == null) {
- StringBuffer sb = printStackTrace();
- sb
- .insert(
- 0,
- "Internal bug detected, handlePipelineError
called with data.documentMetaData=null\n");
- logger.warning(sb.toString());
+ StringBuffer sb = new StringBuffer("Internal bug detected,
handlePipelineError called with data.documentMetaData=null\n");
+ logger.warning(Util.getStackTrace(sb).toString());
return true;
}
return false;
@@ -207,8 +190,10 @@
else
break;
msg.append("=null\n");
- msg.append(printStackTrace());
- logger.severe(msg.toString());
+ if (data.exception != null)
+ Util.getStackTrace(msg, data.exception);
+ msg.append("Here:\n");
+ logger.severe(Util.getStackTrace(msg).toString());
return;
} while (false);
@@ -274,10 +259,8 @@
}
logger.warning(msg.toString());
} catch (Throwable th) {
- StringBuffer msg = new StringBuffer(
- "Bug detected in error handler, panic");
- msg.append(printStackTrace());
- logger.severe(msg.toString());
+ StringBuffer msg = new StringBuffer("Bug detected in error
handler, panic!\n");
+ logger.severe(Util.getStackTrace(msg, th).toString());
System.exit(-1);
}
}
@@ -301,11 +284,13 @@
Level level = Level.WARNING;
StringBuffer sb = new StringBuffer();
DmsException exception = ex.exception;
- sb.append(ex.failedStage.getName()
- + " reported an error during signal handling:\n");
- sb.append(exception.getLogMessage() + "\n");
+ sb.append(ex.failedStage.getName());
+ sb.append(" reported an error during signal handling!\n");
if (exception.getErrorType() == ErrorType.INTERNAL)
Util.getStackTrace(sb, exception);
+ else
+ sb.append(exception.getLogMessage());
+ sb.append('\n');
if ((ex.signal.type == Signal.START)
|| (ex.signal.type == Signal.COMMIT)) {
sb.append("Signalling DOWNSTREAM_PROBLEM\n");
@@ -314,16 +299,13 @@
ex.failedStage));
} catch (Throwable th) {
level = Level.SEVERE;
- sb
- .append("Severe: exception received during
DOWNSTREAM_PROBLEM signal handling.\n"
- + th);
+ sb.append("Severe: exception received during
DOWNSTREAM_PROBLEM signal handling.\n");
Util.getStackTrace(sb, th);
}
}
logger.log(level, sb.toString());
} catch (Throwable th) {
- StringBuffer sb = new StringBuffer();
- sb.append("Bug detected in error handler, panic\n");
+ StringBuffer sb = new StringBuffer("Bug detected in error
handler, panic!\n");
logger.severe(Util.getStackTrace(sb, th).toString());
System.exit(-1);
}
@@ -432,6 +414,16 @@
logger.finest("Releasing token");
token.stage = this;
getOut().push(this, token);
+ Event event = EventQueue.dequeue();
+ while (event != null) {
+ try {
+ event.handle();
+ event = EventQueue.dequeue();
+ } catch (Throwable th) {
+ StringBuffer msg = new StringBuffer("Exception
in event loop:\n");
+ logger.warning(Util.getStackTrace(msg,
th).toString());
+ }
+ }
token.clear();
}
logger.finest("Signalling STOP");
@@ -461,7 +453,7 @@
} catch (InterruptedException ex) {
logger.warning("Received interrupt. Stopping operation..");
} catch (DmsException ex) {
- logger.warning(ex.getLogMessage());
+ logger.warning(Util.getStackTrace(ex).toString());
} finally {
isStopped = true;
token.dispose();
Modified: branches/dmsd2/java/ch/idok/dmsd/impl/indexer/lucene/Indexer.java
==============================================================================
--- branches/dmsd2/java/ch/idok/dmsd/impl/indexer/lucene/Indexer.java
(original)
+++ branches/dmsd2/java/ch/idok/dmsd/impl/indexer/lucene/Indexer.java Thu
Feb 26 11:35:48 2009
@@ -33,7 +33,6 @@
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.NumberTools;
-import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
@@ -126,7 +125,8 @@
*/
private void setIndexWriter(boolean createIndex, boolean setSearcher)
throws Throwable {
- indexWriter = new IndexWriter(indexDirectory, analyzer, createIndex,
IndexWriter.MaxFieldLength.UNLIMITED);
+ indexWriter = new IndexWriter(indexDirectory, analyzer, createIndex,
+ IndexWriter.MaxFieldLength.UNLIMITED);
indexWriter.setMergeFactor(20);
indexWriter.setMaxBufferedDocs(15);
if (setSearcher)
@@ -207,51 +207,27 @@
if (hits.totalHits != 0) {
logger.finest("Found " + docId + " in index");
if (indexingMode == IndexingMode.LAST_VERSION) {
- logger.info("Deleting existing document " + docId
- + " from index");
- IndexReader indexReader = null;
- try {
- commitOperation(true);
- indexReader = IndexReader.open(indexDirectory);
- indexReader.deleteDocuments(idTerm);
- indexReader.close();
- indexReader = null;
- setIndexWriter(false, true);
- } catch (Throwable th) {
- DmsException.throwIt(ErrorType.INDEX_ACCESS,
this,
- "Cannot delete document " + docId, "",
th);
- } finally {
- if (indexReader != null)
- try {
- indexReader.close();
- } catch (Throwable th) {/* ignore */
- }
- }
+ logger.info("Deleting existing document "+docId+"
from index");
+ indexWriter.deleteDocuments(idTerm);
} else {
- logger
- .info("Skipping indexing of "
- + docId
- + ", the document already exists in
the index.");
+ logger.info("Skipping indexing of "+docId+
+ ", the document already exists in the
index.");
return;
}
}
if (repoChange.type == RepositoryChange.Type.DELETED) {
- logger.finest("Skipping indexing of " + docId
- + ", a deleted document.");
+ logger.finest("Skipping indexing of "+docId+", a deleted
document.");
} else {
monitor.resumeTimer("lucene.indexing.time");
Document luceneDocument = new Document();
StringBuffer sb = new StringBuffer();
Map<String, String> metadata = data.documentMetadata;
- ByteArrayInputStream input = new ByteArrayInputStream(
- data.documentContent);
- AllFieldsReader reader = new AllFieldsReader(input,
- metadata);
+ ByteArrayInputStream input = new
ByteArrayInputStream(data.documentContent);
+ AllFieldsReader reader = new AllFieldsReader(input,
metadata);
// Create string for indexed content field value
- // WARNING: Only the first 10000 tokens are analysed by
+ // WARNING: Only the first 10000 tokens are analyzed by
// default
- StringWriter stringWriter = new StringWriter(
- data.documentContent.length + 1024);
+ StringWriter stringWriter = new
StringWriter(data.documentContent.length+1024);
char[] charBuf = new char[1024 * 1024];
int amountRead;
do {
@@ -262,9 +238,8 @@
reader.close();
input.close();
charBuf = null;
- luceneDocument.add(new Field("content", stringWriter
- .toString(), Field.Store.COMPRESS,
- Field.Index.ANALYZED));
+ luceneDocument.add(new Field("content",
stringWriter.toString(),
+ Field.Store.COMPRESS, Field.Index.ANALYZED));
stringWriter.close();
sb.append("Added field: content\n");
String version = metadata.get(ch.idok.common.config.Setup
@@ -347,11 +322,12 @@
logger.finest("Received signal " + signal + " from " +
signal.originator);
if (signal.type == Signal.START) {
IndexWriter indexWriter = new IndexWriter(indexDirectory,
- analyzer, !indexDirectory.isDirectory(),
IndexWriter.MaxFieldLength.UNLIMITED);
+ analyzer, !indexDirectory.isDirectory(),
+ IndexWriter.MaxFieldLength.UNLIMITED);
indexWriter.close();
if (!indexDirectory.isDirectory())
- logger
- .warning("Sanity check failed, indexWriter did
not create an empty directory!");
+ logger.warning("Sanity check failed, indexWriter did not
create an"+
+ "empty directory!");
operationStopped = false;
} else if (signal.type == Signal.STOP) {
stopOperation(true);
Added: branches/dmsd2/java/ch/idok/dmsd/impl/queue/DataEvent.java
==============================================================================
--- (empty file)
+++ branches/dmsd2/java/ch/idok/dmsd/impl/queue/DataEvent.java Thu Feb 26
11:35:48 2009
@@ -0,0 +1,84 @@
+/*
+ * Copyright (C) 2006-2008 iDok team.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307,
USA.
+ */
+
+package ch.idok.dmsd.impl.queue;
+
+import ch.idok.common.errorhandling.DmsException;
+import ch.idok.common.errorhandling.ErrorType;
+import ch.idok.dmsd.pipeline.PipelineData;
+
+/**
+ * @brief Pipeline Data Event
+ */
+public final class DataEvent extends Event {
+
+ /** @brief The pipeline data item */
+ public PipelineData data;
+
+ /**
+ * @brief Constructor
+ * @param item The pipeline data item
+ */
+ private DataEvent(PipelineData item) {
+ super(DATA);
+ data = item;
+ }
+
+ /**
+ * @brief Create a signaling event object
+ */
+ public static DataEvent getInstance(PipelineData data) {
+ return new DataEvent(data);
+ }
+
+ /**
+ * @brief Dispose off a signaling event object
+ */
+ @Override
+ public void dispose() {
+ clear();
+ }
+
+ /**
+ * @brief Release all resources held by this object
+ */
+ @Override
+ public void clear() {
+ data = null;
+ }
+
+ /**
+ * @brief Handles this data event
+ */
+ @Override
+ public void handle() throws DmsException {
+ if (data != null)
+ if (data.stage != null)
+ data.stage.push(data);
+ DmsException.throwIt(ErrorType.INTERNAL, this, "Bug detected", "Data
event handler called without valid data object!");
+ }
+
+ /**
+ * @brief Return a string representation of this event
+ */
+ @Override
+ public String toString() {
+ return this.getClass().toString()+": "+data.toString();
+ }
+}
Added: branches/dmsd2/java/ch/idok/dmsd/impl/queue/Event.java
==============================================================================
--- (empty file)
+++ branches/dmsd2/java/ch/idok/dmsd/impl/queue/Event.java Thu Feb 26
11:35:48 2009
@@ -0,0 +1,61 @@
+/*
+ * Copyright (C) 2006-2008 iDok team.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307,
USA.
+ */
+
+package ch.idok.dmsd.impl.queue;
+
+import ch.idok.common.errorhandling.DmsException;
+
+/**
+ * @brief Generic event
+ */
+public abstract class Event {
+ /** @brief DATA event type */
+ public static final int DATA = 0;
+ /** @brief SIGNAL event type */
+ public static final int SIGNAL = 1;
+
+ /** @brief Event type of this event */
+ public int type;
+
+ /**
+ * @brief Handle this event
+ * @throws DmsException If the event cannot be handled properly
+ */
+ public abstract void handle() throws DmsException;
+
+ /**
+ * @brief Delete this event
+ *
+ * Must be called whenever the event object is no longer used
+ */
+ public abstract void dispose();
+
+ /**
+ * @brief Release all resources held by this event
+ */
+ public abstract void clear();
+
+ /**
+ * @brief Constructor
+ * @param kind Which event type is this?
+ */
+ public Event(int kind) {
+ type = kind;
+ }
+}
Added: branches/dmsd2/java/ch/idok/dmsd/impl/queue/EventPipelineQueue.java
==============================================================================
--- (empty file)
+++ branches/dmsd2/java/ch/idok/dmsd/impl/queue/EventPipelineQueue.java Thu
Feb 26 11:35:48 2009
@@ -0,0 +1,60 @@
+/*
+ * Copyright (C) 2006-2008 iDok team.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307,
USA.
+ */
+
+package ch.idok.dmsd.impl.queue;
+
+import ch.idok.common.errorhandling.DmsException;
+import ch.idok.dmsd.pipeline.PipelineData;
+import ch.idok.dmsd.pipeline.PipelineSignal;
+import ch.idok.dmsd.pipeline.PipelineStage;
+import ch.idok.dmsd.pipeline.Signal;
+
+/**
+ * @brief Implements the event based pipeline queue
+ */
+public class EventPipelineQueue extends ch.idok.dmsd.pipeline.PipelineQueue {
+
+ /**
+ * @brief Enqueue the pipeline data into the event queue and return
+ */
+ @Override
+ public void push(PipelineStage stage, PipelineData data) {
+ boolean first = true;
+ for (PipelineStage ps : out) {
+ if (! first)
+ data = PipelineData.getInstance(data);
+ data.stage = ps;
+ EventQueue.enqueue(DataEvent.getInstance(data));
+ first = false;
+ }
+ }
+
+ /**
+ * @brief Enqueue the pipeline sgnal into the signalling event queue and
return
+ */
+ @Override
+ public void signal(PipelineSignal signal) throws DmsException {
+ if (signal.direction() == Signal.Direction.UP) // upstream
+ for (PipelineStage stage : in)
+ EventQueue.enqueue(SignalEvent.getInstance(stage, signal));
+ else if (signal.direction() == Signal.Direction.DOWN) // downstream
+ for (PipelineStage stage : out)
+ EventQueue.enqueue(SignalEvent.getInstance(stage, signal));
+ }
+}
Added: branches/dmsd2/java/ch/idok/dmsd/impl/queue/EventQueue.java
==============================================================================
--- (empty file)
+++ branches/dmsd2/java/ch/idok/dmsd/impl/queue/EventQueue.java Thu Feb 26
11:35:48 2009
@@ -0,0 +1,82 @@
+/*
+ * Copyright (C) 2006-2008 iDok team.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307,
USA.
+ */
+
+package ch.idok.dmsd.impl.queue;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.logging.Logger;
+
+import ch.idok.dmsd.config.Setup;
+
+/**
+ * @brief Event queue for pipeline tokens
+ */
+
+public final class EventQueue {
+
+ /**
+ * @brief Local logger with name "dmsd.eventqueue"
+ */
+
+ private static Logger logger;
+ /**
+ * @brief List for queuing up events
+ *
+ * The event queue must be a FIFO queue, because
+ * pipeline stages count on in order handling of
+ * events.
+ */
+ private ConcurrentLinkedQueue<Event> eventQueue;
+
+ /**
+ * @brief Hidden constructor, this is a singleton
+ */
+ private EventQueue() {
+ eventQueue = new ConcurrentLinkedQueue<Event>();
+ }
+
+ /**
+ * @brief Enqueue an event
+ * @param event The event to be enqueued
+ */
+ public static void enqueue(Event event) {
+ instance().eventQueue.add(event);
+ logger.finest("Enqueued event: "+event);
+ }
+
+ /**
+ * @brief Retrieve and remove an event from the queue
+ * @return An event on the queue
+ */
+ public static Event dequeue() {
+ Event event = instance().eventQueue.poll();
+ logger.finest("Dequeueing event: "+event);
+ return event;
+ }
+
+ public static EventQueue instance() {
+ if (singleton == null) {
+ logger = Setup.getConfig().getLogger("dmsd.eventqueue");
+ singleton = new EventQueue();
+ }
+ return singleton;
+ }
+
+ private static EventQueue singleton;
+}
Added: branches/dmsd2/java/ch/idok/dmsd/impl/queue/SignalEvent.java
==============================================================================
--- (empty file)
+++ branches/dmsd2/java/ch/idok/dmsd/impl/queue/SignalEvent.java Thu
Feb 26 11:35:48 2009
@@ -0,0 +1,93 @@
+/*
+ * Copyright (C) 2006-2008 iDok team.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307,
USA.
+ */
+
+package ch.idok.dmsd.impl.queue;
+
+import ch.idok.common.errorhandling.DmsException;
+import ch.idok.common.errorhandling.ErrorType;
+import ch.idok.dmsd.pipeline.PipelineSignal;
+import ch.idok.dmsd.pipeline.PipelineStage;
+
+/**
+ * @brief Signaling event
+ *
+ */
+public final class SignalEvent extends Event {
+ /**
+ * @brief Signal destination
+ */
+ public PipelineStage destination;
+
+ /**
+ * @brief The signal
+ */
+ public PipelineSignal signal;
+
+ /**
+ * @brief Constructor
+ */
+ private SignalEvent(PipelineStage dest, PipelineSignal sig) {
+ super(SIGNAL);
+ destination = dest;
+ signal = sig;
+ }
+
+ /**
+ * @brief Create a signaling event object
+ */
+ public static SignalEvent getInstance(PipelineStage dest, PipelineSignal
sig) {
+ return new SignalEvent(dest, sig);
+ }
+
+ /**
+ * @brief Dispose off a signaling event object
+ */
+ @Override
+ public void dispose() {
+ clear();
+ }
+
+ /**
+ * @brief Release all resources held by this object
+ */
+ @Override
+ public void clear() {
+ destination = null;
+ signal = null;
+ }
+
+ /**
+ * @brief Handle this signaling event
+ */
+ @Override
+ public void handle() throws DmsException {
+ if (signal != null)
+ if (destination != null)
+ destination.signal(signal);
+ DmsException.throwIt(ErrorType.INTERNAL, this, "Bug detected",
"Signal handler was called without a valid signal object!");
+ }
+
+ /**
+ * @brief Return a string representation of this event
+ */
+ @Override
+ public String toString() {
+ return this.getClass().toString()+"("+signal+"): "+destination;
+ }
+}
Modified:
branches/dmsd2/java/ch/idok/dmsd/impl/updatelist/SimpleUpdateListGenerator.java
==============================================================================
---
branches/dmsd2/java/ch/idok/dmsd/impl/updatelist/SimpleUpdateListGenerator.java
(original)
+++
branches/dmsd2/java/ch/idok/dmsd/impl/updatelist/SimpleUpdateListGenerator.java
Thu Feb 26 11:35:48 2009
@@ -154,6 +154,7 @@
if (updateListIterator == null)
setUpdateListIterator();
if (updateListIterator == null) {
+ // executed when the last indexed version = last repository
version
stopOperation(true);
logger.fine("Index for " + repositoryId + " is complete.");
return;
@@ -165,7 +166,6 @@
} else {
updateListIterator = null;
getOut().signal(new PipelineSignal(Signal.COMMIT, this)); //
May call
- // stopOperation(true)
if (repository != null) {
stopOperation(true);
config.setLastVersion(repositoryId, lastVersion);
@@ -181,9 +181,7 @@
"Bug Detected", "", th);
}
controller.handlePipelineError(data);
- logger
- .warning("Stopping operation due to exception: "
- + data.exception);
+ logger.warning("Stopping operation due to exception:
"+data.exception);
stopOperation(true);
}
Modified: branches/dmsd2/java/ch/idok/dmsd/pipeline/PipelineData.java
==============================================================================
--- branches/dmsd2/java/ch/idok/dmsd/pipeline/PipelineData.java (original)
+++ branches/dmsd2/java/ch/idok/dmsd/pipeline/PipelineData.java Thu Feb 26
11:35:48 2009
@@ -86,12 +86,30 @@
/**
* @brief Create a pipeline data instance.
*
- * This method in unioson with the dispose() method might be enhanced to
+ * This method in unison with the dispose() method might be enhanced to
* provide smart memory management.
*/
public static PipelineData getInstance() {
return new PipelineData();
}
+
+ /**
+ * @brief Create a pipeline data instance from another one.
+ *
+ * This method creates a new instance with fields that
+ * reference the same values the other instance does.
+ */
+ public static PipelineData getInstance(PipelineData other) {
+ PipelineData newobj = new PipelineData();
+ newobj.documentContent = other.documentContent;
+ newobj.documentMetadata = other.documentMetadata;
+ newobj.exception = other.exception;
+ newobj.monitoringData = other.monitoringData;
+ newobj.repositoryChange = other.repositoryChange;
+ newobj.retryCounter = other.retryCounter;
+ newobj.stage = other.stage;
+ return newobj;
+ }
/**
* @brief Dispose a pipeline data instance.
Modified: branches/dmsd2/java/ch/idok/dmsd/pipeline/PipelineQueue.java
==============================================================================
--- branches/dmsd2/java/ch/idok/dmsd/pipeline/PipelineQueue.java
(original)
+++ branches/dmsd2/java/ch/idok/dmsd/pipeline/PipelineQueue.java Thu
Feb 26 11:35:48 2009
@@ -61,7 +61,7 @@
* stage.
*
* The stage argument to push might seem strange at first, but they are
- * actually usefull if the queue implementation allows connections to
+ * actually useful if the queue implementation allows connections to
* multiple input and output stages. The stage argument lets the queue
know
* which stage is performing operations.
*/
Added: branches/dmsd2/java/ch/idok/dmsd/pipeline/Signal.java
==============================================================================
--- (empty file)
+++ branches/dmsd2/java/ch/idok/dmsd/pipeline/Signal.java Thu Feb 26
11:35:48 2009
@@ -0,0 +1,75 @@
+/*
+ * Copyright (C) 2006-2008 iDok team.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307,
USA.
+ */
+
+package ch.idok.dmsd.pipeline;
+
+/**
+ * @brief Enumeration of the different pipeline signal types
+ */
+public enum Signal {
+ /** @name Downstream signals */
+ // @{
+ /** Start operation cycle. */
+ START(Direction.DOWN),
+ /** Stop operation cycle. */
+ STOP(Direction.DOWN),
+ /** Commit operation. */
+ COMMIT(Direction.DOWN),
+ /** Get monitoring counters. */
+ GET_COUNTERS(Direction.DOWN),
+ /**
+ * Signal unrecoverable upstream problems with last upstream signal or
+ * operation.
+ */
+ UPSTREAM_PROBLEM(Direction.DOWN),
+ // @}
+ /** @name Upstream signals */
+ // @{
+ /**
+ * Signal unrecoverable downstream problems with last downstream signal
or
+ * operation.
+ */
+ DOWNSTREAM_PROBLEM(Direction.UP),
+ // @}
+ /** @name Nonpropagating signals */
+ // @{
+ /** Announce job accomplishment. */
+ FINISHED(Direction.NONE);
+ // @}
+
+ /**
+ * @brief Signal propagation directions.
+ */
+ public enum Direction {
+ UP, DOWN, NONE
+ }
+
+ /** The propagation direction of this signal. */
+ public Direction direction;
+
+ /**
+ * Create a pipeline signal.
+ *
+ * @param dir
+ * The propagation direction of the signal.
+ */
+ Signal(Direction dir) {
+ direction = dir;
+ }
+}
- [idok-commit] idok commit r328 - in branches/dmsd2/java/ch/idok: common/errorhandling dmsd/impl/config/svnlucene dmsd/impl/controller dmsd/impl/indexer/lucene dmsd/impl/queue dmsd/impl/updatelist dmsd/pipeline, AFS account Stadler Hans Christian, 02/26/2009
Archive powered by MHonArc 2.6.19.