Skip to Content.
Sympa Menu

idok-commit - [idok-commit] idok commit r332 - in trunk/java/ch/idok: common/errorhandling dmsd/impl/config/svnlucene dmsd/impl/controller dmsd/impl/extractor dmsd/impl/indexer/lucene dmsd/impl/loader dmsd/impl/queue dmsd/impl/updatelist dmsd/pipeline

idok-commit AT lists.psi.ch

Subject: Commit emails of the iDok project

List archive

[idok-commit] idok commit r332 - in trunk/java/ch/idok: common/errorhandling dmsd/impl/config/svnlucene dmsd/impl/controller dmsd/impl/extractor dmsd/impl/indexer/lucene dmsd/impl/loader dmsd/impl/queue dmsd/impl/updatelist dmsd/pipeline


Chronological Thread 
  • From: "AFS account Stadler Hans Christian" <stadler_h AT savannah.psi.ch>
  • To: idok-commit AT lists.psi.ch
  • Subject: [idok-commit] idok commit r332 - in trunk/java/ch/idok: common/errorhandling dmsd/impl/config/svnlucene dmsd/impl/controller dmsd/impl/extractor dmsd/impl/indexer/lucene dmsd/impl/loader dmsd/impl/queue dmsd/impl/updatelist dmsd/pipeline
  • Date: Mon, 2 Mar 2009 10:58:39 +0100
  • List-archive: <https://lists.web.psi.ch/pipermail/idok-commit/>
  • List-id: Commit emails of the iDok project <idok-commit.lists.psi.ch>

Author: stadler_h
Date: Mon Mar 2 10:58:39 2009
New Revision: 332

Log:
Merged branch dms2 into trunk

Added:
trunk/java/ch/idok/dmsd/impl/queue/DataEvent.java
- copied unchanged from r331,
/branches/dmsd2/java/ch/idok/dmsd/impl/queue/DataEvent.java
trunk/java/ch/idok/dmsd/impl/queue/Event.java
- copied unchanged from r331,
/branches/dmsd2/java/ch/idok/dmsd/impl/queue/Event.java
trunk/java/ch/idok/dmsd/impl/queue/EventPipelineQueue.java
- copied unchanged from r331,
/branches/dmsd2/java/ch/idok/dmsd/impl/queue/EventPipelineQueue.java
trunk/java/ch/idok/dmsd/impl/queue/EventQueue.java
- copied unchanged from r331,
/branches/dmsd2/java/ch/idok/dmsd/impl/queue/EventQueue.java
trunk/java/ch/idok/dmsd/impl/queue/SignalEvent.java
- copied unchanged from r331,
/branches/dmsd2/java/ch/idok/dmsd/impl/queue/SignalEvent.java
trunk/java/ch/idok/dmsd/pipeline/Signal.java
- copied unchanged from r331,
/branches/dmsd2/java/ch/idok/dmsd/pipeline/Signal.java
Removed:
trunk/java/ch/idok/dmsd/impl/queue/SimplePipelineQueue.java
Modified:
trunk/java/ch/idok/common/errorhandling/Util.java
trunk/java/ch/idok/dmsd/impl/config/svnlucene/Config.java
trunk/java/ch/idok/dmsd/impl/controller/SimpleController.java
trunk/java/ch/idok/dmsd/impl/extractor/Extractor.java
trunk/java/ch/idok/dmsd/impl/indexer/lucene/Indexer.java
trunk/java/ch/idok/dmsd/impl/loader/ContentLoader.java
trunk/java/ch/idok/dmsd/impl/updatelist/SimpleUpdateListGenerator.java
trunk/java/ch/idok/dmsd/pipeline/NoOp.java
trunk/java/ch/idok/dmsd/pipeline/PipelineController.java
trunk/java/ch/idok/dmsd/pipeline/PipelineData.java
trunk/java/ch/idok/dmsd/pipeline/PipelineQueue.java
trunk/java/ch/idok/dmsd/pipeline/PipelineSignal.java
trunk/java/ch/idok/dmsd/pipeline/PipelineSignalException.java
trunk/java/ch/idok/dmsd/pipeline/PipelineStage.java

Modified: trunk/java/ch/idok/common/errorhandling/Util.java
==============================================================================
--- trunk/java/ch/idok/common/errorhandling/Util.java (original)
+++ trunk/java/ch/idok/common/errorhandling/Util.java Mon Mar 2 10:58:39
2009
@@ -55,26 +55,83 @@
}

/**
- * @brief Append the stack trace of an exception to a string buffer.
+ * @brief Get the stack trace of the current thread
+ * as a string buffer.
+ *
+ * @return The stack trace as a string buffer.
+ */
+ public static StringBuffer getStackTrace() {
+ return getStackTrace(null, null);
+ }
+
+ /**
+ * @brief Append the stack trace of the current thread
+ * to a string buffer.
*
* @param sb
* The string buffer to which the stack trace will be
- * appended.
+ * appended. If <code>sb</code> is <code>null</code>
+ * a new string buffer will be allocated and returned.
+ * @return The stack trace as a string buffer.
+ */
+ public static StringBuffer getStackTrace(StringBuffer sb) {
+ return getStackTrace(sb, null);
+ }
+
+ /**
+ * @brief Append the stack trace of an exception or the current thread
+ * to a string buffer.
+ *
* @param th
* The exception whose stack trace is to be retrieved.
- * @return The stack trace of th as a string buffer.
+ * If <code>th</code> is <code>null</code>, the stack
trace of the
+ * current thread will be returned.
+ * @return The stack trace as a string buffer.
+ */
+ public static StringBuffer getStackTrace(Throwable th) {
+ return getStackTrace(null, th);
+ }
+
+ /**
+ * @brief Append the stack trace of an exception or the current thread
+ * to a string buffer.
+ *
+ * @param sb
+ * The string buffer to which the stack trace will be
+ * appended. If <code>sb</code> is <code>null</code>
+ * a new string buffer will be allocated and returned.
+ * @param th
+ * The exception whose stack trace is to be retrieved.
+ * If <code>th</code> is <code>null</code>, the stack
trace of the
+ * current thread will be appended to <code>sb</code>.
+ * @return The stack trace as a string buffer.
*/
public static StringBuffer getStackTrace(StringBuffer sb, Throwable th) {
- assert (th != null);
- while (true) {
- StackTraceElement[] stackTrace = th.getStackTrace();
+ if (sb == null)
+ sb = new StringBuffer();
+ while (true) {
+ StackTraceElement[] stackTrace;
+ if (th == null) {
+ sb.append("Thread: ");
+ sb.append(Thread.currentThread().toString());
+ stackTrace = Thread.currentThread().getStackTrace();
+ } else {
+ sb.append("Exception: ");
+ sb.append(th.toString());
+ stackTrace = th.getStackTrace();
+ }
+ sb.append('\n');
for (StackTraceElement ste : stackTrace) {
sb.append(ste.toString());
sb.append('\n');
}
+ if (th == null)
+ break;
th = th.getCause();
if (th != null) {
- sb.append("Caused by: " + th.toString() + "\n");
+ sb.append("Caused by: ");
+ sb.append(th.toString());
+ sb.append('\n');
} else
break;
}

Modified: trunk/java/ch/idok/dmsd/impl/config/svnlucene/Config.java
==============================================================================
--- trunk/java/ch/idok/dmsd/impl/config/svnlucene/Config.java (original)
+++ trunk/java/ch/idok/dmsd/impl/config/svnlucene/Config.java Mon Mar 2
10:58:39 2009
@@ -70,7 +70,7 @@
import ch.idok.dmsd.impl.extractor.zip.ZipExtractorFactory;
import ch.idok.dmsd.impl.indexer.lucene.Indexer;
import ch.idok.dmsd.impl.loader.ContentLoader;
-import ch.idok.dmsd.impl.queue.SimplePipelineQueue;
+import ch.idok.dmsd.impl.queue.EventPipelineQueue;
import ch.idok.dmsd.impl.updatelist.SimpleUpdateListGenerator;
import ch.idok.dmsd.indexer.ContentExtractor;
import ch.idok.dmsd.indexer.ContentExtractorFactory;
@@ -127,10 +127,10 @@
private String lastVersionFile;

/** @brief The head queue for the reconfigurable pipeline part. */
- private SimplePipelineQueue headQueue;
+ private EventPipelineQueue headQueue;

/** @brief The tail queue for the reconfigurable pipeline part. */
- private SimplePipelineQueue tailQueue;
+ private EventPipelineQueue tailQueue;

/**
* @brief The NoOp pipeline stage used when theres is no repository to be
@@ -262,12 +262,12 @@
controller = new SimpleController();
controller.initialize();
logger.fine("Created pipeline controller");
- SimplePipelineQueue queue = new SimplePipelineQueue();
+ EventPipelineQueue queue = new EventPipelineQueue();
queue.connectHead(controller);
headQueue = queue;
// ----------------------------------------------
// Pipeline piece after the reconfigurable part
- queue = new SimplePipelineQueue();
+ queue = new EventPipelineQueue();
queue.connectTail(controller);
tailQueue = queue;
// -----------------------------
@@ -528,21 +528,21 @@
repositoryId);
updateListGenerator.initialize();
logger.fine("Created list generator");
- SimplePipelineQueue queue = new SimplePipelineQueue();
+ EventPipelineQueue queue = new EventPipelineQueue();
queue.connectHead(updateListGenerator);
ContentLoader loader = new ContentLoader("Content Loader for "
+ repositoryId);
loader.initialize();
logger.fine("Created content loader");
queue.connectTail(loader);
- queue = new SimplePipelineQueue();
+ queue = new EventPipelineQueue();
queue.connectHead(loader);
Extractor extractor = new Extractor("Content Extractor for "
+ repositoryId);
extractor.initialize();
logger.fine("Created extractor");
queue.connectTail(extractor);
- queue = new SimplePipelineQueue();
+ queue = new EventPipelineQueue();
queue.connectHead(extractor);
Indexer indexer = new Indexer(indexFile);
indexer.initialize();

Modified: trunk/java/ch/idok/dmsd/impl/controller/SimpleController.java
==============================================================================
--- trunk/java/ch/idok/dmsd/impl/controller/SimpleController.java
(original)
+++ trunk/java/ch/idok/dmsd/impl/controller/SimpleController.java Mon
Mar 2 10:58:39 2009
@@ -37,6 +37,8 @@
import ch.idok.dmsd.config.Config;
import ch.idok.dmsd.config.Setup;
import ch.idok.dmsd.impl.extractor.Extractor;
+import ch.idok.dmsd.impl.queue.Event;
+import ch.idok.dmsd.impl.queue.EventQueue;
import ch.idok.dmsd.indexer.ContentExtractor;
import ch.idok.dmsd.management.DaemonManager;
import ch.idok.dmsd.monitoring.MonitoringData;
@@ -45,7 +47,7 @@
import ch.idok.dmsd.pipeline.PipelineData;
import ch.idok.dmsd.pipeline.PipelineSignal;
import ch.idok.dmsd.pipeline.PipelineSignalException;
-import ch.idok.dmsd.pipeline.PipelineStage;
+import ch.idok.dmsd.pipeline.Signal;

/**
* @brief Simple pipeline controller implementation.
@@ -62,11 +64,6 @@
private Config config;

/**
- * @brief The token that will be sent through the pipeline.
- */
- private PipelineData token;
-
- /**
* @brief Local logger with name "dmsd.controller"
*/
public Logger logger;
@@ -136,20 +133,6 @@
}

// -------------- Error Handling ------------------------
- /**
- * @brief Print a stack trace.
- *
- * @return The stack trace as a string buffer.
- */
- private StringBuffer printStackTrace() {
- StackTraceElement[] stackTrace =
Thread.currentThread().getStackTrace();
- StringBuffer sb = new StringBuffer();
- for (StackTraceElement ste : stackTrace) {
- sb.append(ste.toString());
- sb.append('\n');
- }
- return sb;
- }

/**
* @brief Check if the metaData parameter is ok.
@@ -160,12 +143,8 @@
*/
private boolean badMetaData(Map<String, String> metaData) {
if (metaData == null) {
- StringBuffer sb = printStackTrace();
- sb
- .insert(
- 0,
- "Internal bug detected, handlePipelineError
called with data.documentMetaData=null\n");
- logger.warning(sb.toString());
+ StringBuffer sb = new StringBuffer("Internal bug detected,
handlePipelineError called with data.documentMetaData=null\n");
+ logger.warning(Util.getStackTrace(sb).toString());
return true;
}
return false;
@@ -206,8 +185,10 @@
else
break;
msg.append("=null\n");
- msg.append(printStackTrace());
- logger.severe(msg.toString());
+ if (data.exception != null)
+ Util.getStackTrace(msg, data.exception);
+ msg.append("Here:\n");
+ logger.severe(Util.getStackTrace(msg).toString());
return;
} while (false);

@@ -258,8 +239,7 @@
if (ex.getErrorType() == ErrorType.INDEX_ACCESS) {
msg.append("\nAbandon indexing for the repository.");
logger.warning(msg.toString());
- data.stage
- .signal(PipelineSignal.DOWNSTREAM_PROBLEM,
data.stage);
+ data.stage.signal(new
PipelineSignal(Signal.DOWNSTREAM_PROBLEM, data.stage));
return;
}
if (ex.getErrorType() == ErrorType.INTERNAL) {
@@ -274,10 +254,8 @@
}
logger.warning(msg.toString());
} catch (Throwable th) {
- StringBuffer msg = new StringBuffer(
- "Bug detected in error handler, panic");
- msg.append(printStackTrace());
- logger.severe(msg.toString());
+ StringBuffer msg = new StringBuffer("Bug detected in error
handler, panic!\n");
+ logger.severe(Util.getStackTrace(msg, th).toString());
System.exit(-1);
}
}
@@ -301,29 +279,28 @@
Level level = Level.WARNING;
StringBuffer sb = new StringBuffer();
DmsException exception = ex.exception;
- sb.append(ex.failedStage.getName()
- + " reported an error during signal handling:\n");
- sb.append(exception.getLogMessage() + "\n");
+ sb.append(ex.failedStage.getName());
+ sb.append(" reported an error during signal handling!\n");
if (exception.getErrorType() == ErrorType.INTERNAL)
Util.getStackTrace(sb, exception);
- if ((ex.signal == PipelineSignal.START)
- || (ex.signal == PipelineSignal.COMMIT)) {
+ else
+ sb.append(exception.getLogMessage());
+ sb.append('\n');
+ if ((ex.signal.type == Signal.START)
+ || (ex.signal.type == Signal.COMMIT)) {
sb.append("Signalling DOWNSTREAM_PROBLEM\n");
try {
- ex.failedStage.signal(PipelineSignal.DOWNSTREAM_PROBLEM,
- ex.failedStage);
+ ex.failedStage.signal(new
PipelineSignal(Signal.DOWNSTREAM_PROBLEM,
+ ex.failedStage));
} catch (Throwable th) {
level = Level.SEVERE;
- sb
- .append("Severe: exception received during
DOWNSTREAM_PROBLEM signal handling.\n"
- + th);
+ sb.append("Severe: exception received during
DOWNSTREAM_PROBLEM signal handling.\n");
Util.getStackTrace(sb, th);
}
}
logger.log(level, sb.toString());
} catch (Throwable th) {
- StringBuffer sb = new StringBuffer();
- sb.append("Bug detected in error handler, panic\n");
+ StringBuffer sb = new StringBuffer("Bug detected in error
handler, panic!\n");
logger.severe(Util.getStackTrace(sb, th).toString());
System.exit(-1);
}
@@ -347,7 +324,7 @@
public void push(PipelineData data) {
logger.fine("Indexed " + data.repositoryChange.path.getName());
monitor.incCounter("operation.indexed.documents", 1);
- data.clear();
+ data.dispose();
}

/**
@@ -378,14 +355,14 @@
* The originator of the pipeline signal.
*/
@Override
- public void processSignal(PipelineSignal signal, PipelineStage
originator)
+ public void processSignal(PipelineSignal signal)
throws DmsException {
- logger.fine("Received signal " + signal + " from " + originator);
- if (signal == PipelineSignal.FINISHED) {
+ logger.fine("Received signal " + signal + " from " +
signal.originator);
+ if (signal.type == Signal.FINISHED) {
++numFinishedSignals;
int lastSignal = getOut().getOutSize();
logger.fine("Received signal FINISHED(" + numFinishedSignals +
"/"
- + lastSignal + ") from " + originator);
+ + lastSignal + ") from " + signal.originator);
if (numFinishedSignals == lastSignal)
finished = true;
}
@@ -394,6 +371,24 @@
}

/**
+ * @brief Handle events in the queue until there are no more
+ */
+ void eventLoop() {
+ Event event = EventQueue.dequeue();
+ while (event != null) {
+ try {
+ event.handle();
+ event.dispose();
+ event = EventQueue.dequeue();
+ } catch (Throwable th) {
+ StringBuffer msg = new StringBuffer("Exception in event
loop:\n");
+ logger.warning(Util.getStackTrace(msg, th).toString());
+ }
+ Thread.yield();
+ }
+ }
+
+ /**
* @brief Start daemon operation.
*
* This method starts the inner daemon loop. One iteration corresponds to
@@ -419,7 +414,6 @@
MonitoringHandler monitoringHandler =
config.getMonitoringManager();
isStopped = false;
stopnow = false;
- token = PipelineData.getInstance();
loop: do {
monitoringHandler.startCycle();
monitor.resumeTimer("operation.time");
@@ -427,18 +421,21 @@
numFinishedSignals = 0;
finished = false;
logger.finest("Signalling START");
- getOut().signal(PipelineSignal.START, this);
+ getOut().signal(new PipelineSignal(Signal.START, this));
while (!(finished || stopnow)) {
+ eventLoop();
logger.finest("Releasing token");
+ PipelineData token = PipelineData.getInstance();
token.stage = this;
getOut().push(this, token);
- token.clear();
}
logger.finest("Signalling STOP");
- getOut().signal(PipelineSignal.STOP, this);
+ getOut().signal(new PipelineSignal(Signal.STOP, this));
+ eventLoop();
monitor.suspendTimer("operation.time");
logger.finest("Signalling GET_COUNTERS");
- getOut().signal(PipelineSignal.GET_COUNTERS, this);
+ getOut().signal(new PipelineSignal(Signal.GET_COUNTERS,
this));
+ eventLoop();
// Handle our own monitoring data here to avoid multiple
// invocations
monitoringHandler.handleMonitoringData(monitor);
@@ -461,10 +458,9 @@
} catch (InterruptedException ex) {
logger.warning("Received interrupt. Stopping operation..");
} catch (DmsException ex) {
- logger.warning(ex.getLogMessage());
+ logger.warning(Util.getStackTrace(ex).toString());
} finally {
isStopped = true;
- token.dispose();
monitor.suspendTimer("operation.time");
SimpleControllerManager.cyclicCleanupThread.interrupt();
config.destroy();
@@ -560,9 +556,9 @@
* @brief Called at the end of an indexing cycle.
*/
public void endCycle() {
- if (monitorValues == null)
- monitorValues = new StringBuilder();
- monitorValues.append("--- ACCUMULATED VALUES ---\n");
+ if (monitorValues != null)
+ logger.fine(monitorValues.toString());
+ monitorValues = new StringBuilder("\n--- ACCUMULATED MONITOR VALUES
---\n");
for (String key : accum.keySet()) {
monitorValues.append(key);
monitorValues.append(" = ");
@@ -570,7 +566,7 @@
monitorValues.append("\n");
}
accum.clear();
- monitorValues.append("--------------------------");
+ monitorValues.append("----------------------------------");
logger.info(monitorValues.toString());
monitorValues = null;
}

Modified: trunk/java/ch/idok/dmsd/impl/extractor/Extractor.java
==============================================================================
--- trunk/java/ch/idok/dmsd/impl/extractor/Extractor.java (original)
+++ trunk/java/ch/idok/dmsd/impl/extractor/Extractor.java Mon Mar 2
10:58:39 2009
@@ -38,6 +38,7 @@
import ch.idok.dmsd.pipeline.PipelineData;
import ch.idok.dmsd.pipeline.PipelineSignal;
import ch.idok.dmsd.pipeline.PipelineStage;
+import ch.idok.dmsd.pipeline.Signal;


/**
@@ -156,10 +157,10 @@
* manager with the local monitoring data.
*/
@Override
- public void processSignal(PipelineSignal signal, PipelineStage
originator)
+ public void processSignal(PipelineSignal signal)
throws DmsException {
- logger.finest("Received signal " + signal + " from " + originator);
- if (signal == PipelineSignal.GET_COUNTERS)
+ logger.finest("Received signal " + signal + " from " +
signal.originator);
+ if (signal.type == Signal.GET_COUNTERS)
config.getMonitoringManager().handleMonitoringData(monitor);
}


Modified: trunk/java/ch/idok/dmsd/impl/indexer/lucene/Indexer.java
==============================================================================
--- trunk/java/ch/idok/dmsd/impl/indexer/lucene/Indexer.java (original)
+++ trunk/java/ch/idok/dmsd/impl/indexer/lucene/Indexer.java Mon Mar 2
10:58:39 2009
@@ -33,7 +33,6 @@
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.NumberTools;
-import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
@@ -55,6 +54,7 @@
import ch.idok.dmsd.pipeline.PipelineData;
import ch.idok.dmsd.pipeline.PipelineSignal;
import ch.idok.dmsd.pipeline.PipelineStage;
+import ch.idok.dmsd.pipeline.Signal;

/**
* @brief Pipeline stage for creating and updating the Lucene index.
@@ -125,7 +125,8 @@
*/
private void setIndexWriter(boolean createIndex, boolean setSearcher)
throws Throwable {
- indexWriter = new IndexWriter(indexDirectory, analyzer, createIndex,
IndexWriter.MaxFieldLength.UNLIMITED);
+ indexWriter = new IndexWriter(indexDirectory, analyzer, createIndex,
+ IndexWriter.MaxFieldLength.UNLIMITED);
indexWriter.setMergeFactor(20);
indexWriter.setMaxBufferedDocs(15);
if (setSearcher)
@@ -206,51 +207,27 @@
if (hits.totalHits != 0) {
logger.finest("Found " + docId + " in index");
if (indexingMode == IndexingMode.LAST_VERSION) {
- logger.info("Deleting existing document " + docId
- + " from index");
- IndexReader indexReader = null;
- try {
- commitOperation(true);
- indexReader = IndexReader.open(indexDirectory);
- indexReader.deleteDocuments(idTerm);
- indexReader.close();
- indexReader = null;
- setIndexWriter(false, true);
- } catch (Throwable th) {
- DmsException.throwIt(ErrorType.INDEX_ACCESS,
this,
- "Cannot delete document " + docId, "",
th);
- } finally {
- if (indexReader != null)
- try {
- indexReader.close();
- } catch (Throwable th) {/* ignore */
- }
- }
+ logger.info("Deleting existing document "+docId+"
from index");
+ indexWriter.deleteDocuments(idTerm);
} else {
- logger
- .info("Skipping indexing of "
- + docId
- + ", the document already exists in
the index.");
+ logger.info("Skipping indexing of "+docId+
+ ", the document already exists in the
index.");
return;
}
}
if (repoChange.type == RepositoryChange.Type.DELETED) {
- logger.finest("Skipping indexing of " + docId
- + ", a deleted document.");
+ logger.finest("Skipping indexing of "+docId+", a deleted
document.");
} else {
monitor.resumeTimer("lucene.indexing.time");
Document luceneDocument = new Document();
StringBuffer sb = new StringBuffer();
Map<String, String> metadata = data.documentMetadata;
- ByteArrayInputStream input = new ByteArrayInputStream(
- data.documentContent);
- AllFieldsReader reader = new AllFieldsReader(input,
- metadata);
+ ByteArrayInputStream input = new
ByteArrayInputStream(data.documentContent);
+ AllFieldsReader reader = new AllFieldsReader(input,
metadata);
// Create string for indexed content field value
- // WARNING: Only the first 10000 tokens are analysed by
+ // WARNING: Only the first 10000 tokens are analyzed by
// default
- StringWriter stringWriter = new StringWriter(
- data.documentContent.length + 1024);
+ StringWriter stringWriter = new
StringWriter(data.documentContent.length+1024);
char[] charBuf = new char[1024 * 1024];
int amountRead;
do {
@@ -261,9 +238,8 @@
reader.close();
input.close();
charBuf = null;
- luceneDocument.add(new Field("content", stringWriter
- .toString(), Field.Store.COMPRESS,
- Field.Index.ANALYZED));
+ luceneDocument.add(new Field("content",
stringWriter.toString(),
+ Field.Store.COMPRESS, Field.Index.ANALYZED));
stringWriter.close();
sb.append("Added field: content\n");
String version = metadata.get(ch.idok.common.config.Setup
@@ -340,25 +316,26 @@
* The originator of the pipline signal.
*/
@Override
- public void processSignal(PipelineSignal signal, PipelineStage
originator)
+ public void processSignal(PipelineSignal signal)
throws DmsException {
try {
- logger.finest("Received signal " + signal + " from " +
originator);
- if (signal == PipelineSignal.START) {
+ logger.finest("Received signal " + signal + " from " +
signal.originator);
+ if (signal.type == Signal.START) {
IndexWriter indexWriter = new IndexWriter(indexDirectory,
- analyzer, !indexDirectory.isDirectory(),
IndexWriter.MaxFieldLength.UNLIMITED);
+ analyzer, !indexDirectory.isDirectory(),
+ IndexWriter.MaxFieldLength.UNLIMITED);
indexWriter.close();
if (!indexDirectory.isDirectory())
- logger
- .warning("Sanity check failed, indexWriter did
not create an empty directory!");
+ logger.warning("Sanity check failed, indexWriter did not
create an"+
+ "empty directory!");
operationStopped = false;
- } else if (signal == PipelineSignal.STOP) {
+ } else if (signal.type == Signal.STOP) {
stopOperation(true);
- } else if (signal == PipelineSignal.COMMIT) {
+ } else if (signal.type == Signal.COMMIT) {
commitOperation(true);
- } else if (signal == PipelineSignal.DOWNSTREAM_PROBLEM) {
+ } else if (signal.type == Signal.DOWNSTREAM_PROBLEM) {
stopOperation(true);
- } else if (signal == PipelineSignal.GET_COUNTERS) {
+ } else if (signal.type == Signal.GET_COUNTERS) {
config.getMonitoringManager().handleMonitoringData(monitor);
}
} catch (IOException ex) {

Modified: trunk/java/ch/idok/dmsd/impl/loader/ContentLoader.java
==============================================================================
--- trunk/java/ch/idok/dmsd/impl/loader/ContentLoader.java (original)
+++ trunk/java/ch/idok/dmsd/impl/loader/ContentLoader.java Mon Mar 2
10:58:39 2009
@@ -44,6 +44,7 @@
import ch.idok.dmsd.pipeline.PipelineData;
import ch.idok.dmsd.pipeline.PipelineSignal;
import ch.idok.dmsd.pipeline.PipelineStage;
+import ch.idok.dmsd.pipeline.Signal;

/**
* @brief Pipeline stage for loading the raw document content and meta data
from
@@ -201,10 +202,10 @@
* The originator of the signal.
*/
@Override
- public void processSignal(PipelineSignal signal, PipelineStage
originator)
+ public void processSignal(PipelineSignal signal)
throws DmsException {
- logger.finest("Received signal " + signal + " from " + originator);
- if (signal == PipelineSignal.GET_COUNTERS)
+ logger.finest("Received signal " + signal + " from " +
signal.originator);
+ if (signal.type == Signal.GET_COUNTERS)
config.getMonitoringManager().handleMonitoringData(monitor);
}


Modified:
trunk/java/ch/idok/dmsd/impl/updatelist/SimpleUpdateListGenerator.java
==============================================================================
--- trunk/java/ch/idok/dmsd/impl/updatelist/SimpleUpdateListGenerator.java
(original)
+++ trunk/java/ch/idok/dmsd/impl/updatelist/SimpleUpdateListGenerator.java
Mon Mar 2 10:58:39 2009
@@ -40,6 +40,7 @@
import ch.idok.dmsd.pipeline.PipelineData;
import ch.idok.dmsd.pipeline.PipelineSignal;
import ch.idok.dmsd.pipeline.PipelineStage;
+import ch.idok.dmsd.pipeline.Signal;

/**
* @brief Pipeline stage for assembling the list of updated documents.
@@ -92,7 +93,7 @@
try {
logger.finest("Stop operation");
if (signalFinished)
- controller.signal(PipelineSignal.FINISHED, this);
+ controller.signal(new PipelineSignal(Signal.FINISHED, this));
} catch (Throwable th) {
// ignore
} finally {
@@ -153,6 +154,7 @@
if (updateListIterator == null)
setUpdateListIterator();
if (updateListIterator == null) {
+ // executed when the last indexed version = last repository
version
stopOperation(true);
logger.fine("Index for " + repositoryId + " is complete.");
return;
@@ -163,8 +165,7 @@
getOut().push(this, data);
} else {
updateListIterator = null;
- getOut().signal(PipelineSignal.COMMIT, this); // May call
- // stopOperation(true)
+ getOut().signal(new PipelineSignal(Signal.COMMIT, this)); //
May call
if (repository != null) {
stopOperation(true);
config.setLastVersion(repositoryId, lastVersion);
@@ -180,9 +181,7 @@
"Bug Detected", "", th);
}
controller.handlePipelineError(data);
- logger
- .warning("Stopping operation due to exception: "
- + data.exception);
+ logger.warning("Stopping operation due to exception:
"+data.exception);
stopOperation(true);
}

@@ -213,23 +212,23 @@
* The originator of the pipeline signal.
*/
@Override
- public void processSignal(PipelineSignal signal, PipelineStage
originator)
+ public void processSignal(PipelineSignal signal)
throws DmsException {
DmsException exception;
try {
- logger.finest("Received signal " + signal + " from " +
originator);
- if (signal == PipelineSignal.START) {
+ logger.finest("Received signal " + signal + " from " +
signal.originator);
+ if (signal.type == Signal.START) {
updateListIterator = null;
Repository repo =
repositoryManager.getRepository(repositoryId);
latestVersion = repo.getLatestVersion();
repository = repo;
- } else if (signal == PipelineSignal.STOP) {
+ } else if (signal.type == Signal.STOP) {
if (updateListIterator != null)
logger.warning("Received stop during operation cycle!");
stopOperation(false);
- } else if (signal == PipelineSignal.DOWNSTREAM_PROBLEM) {
+ } else if (signal.type == Signal.DOWNSTREAM_PROBLEM) {
stopOperation(true);
- } else if (signal == PipelineSignal.GET_COUNTERS) {
+ } else if (signal.type == Signal.GET_COUNTERS) {
config.getMonitoringManager().handleMonitoringData(monitor);
}
return;

Modified: trunk/java/ch/idok/dmsd/pipeline/NoOp.java
==============================================================================
--- trunk/java/ch/idok/dmsd/pipeline/NoOp.java (original)
+++ trunk/java/ch/idok/dmsd/pipeline/NoOp.java Mon Mar 2 10:58:39 2009
@@ -45,7 +45,7 @@
public void push(PipelineData data) {
try {
data.stage = this;
- controller.signal(PipelineSignal.FINISHED, this);
+ controller.signal(new PipelineSignal(Signal.FINISHED, this));
return;
} catch (Throwable th) {
data.exception = new DmsException(ErrorType.INTERNAL, this,
@@ -62,7 +62,7 @@

/** @brief Do nothing. */
@Override
- public void processSignal(PipelineSignal signal, PipelineStage
originator)
+ public void processSignal(PipelineSignal signal)
throws DmsException {
}


Modified: trunk/java/ch/idok/dmsd/pipeline/PipelineController.java
==============================================================================
--- trunk/java/ch/idok/dmsd/pipeline/PipelineController.java (original)
+++ trunk/java/ch/idok/dmsd/pipeline/PipelineController.java Mon Mar 2
10:58:39 2009
@@ -45,9 +45,9 @@
}

@Override
- public void signal(PipelineSignal signal, PipelineStage originator)
+ public void signal(PipelineSignal signal)
throws DmsException {
- processSignal(signal, originator);
+ processSignal(signal);
}

/**

Modified: trunk/java/ch/idok/dmsd/pipeline/PipelineData.java
==============================================================================
--- trunk/java/ch/idok/dmsd/pipeline/PipelineData.java (original)
+++ trunk/java/ch/idok/dmsd/pipeline/PipelineData.java Mon Mar 2 10:58:39
2009
@@ -86,12 +86,30 @@
/**
* @brief Create a pipeline data instance.
*
- * This method in unioson with the dispose() method might be enhanced to
+ * This method in unison with the dispose() method might be enhanced to
* provide smart memory management.
*/
public static PipelineData getInstance() {
return new PipelineData();
}
+
+ /**
+ * @brief Create a pipeline data instance from another one.
+ *
+ * This method creates a new instance with fields that
+ * reference the same values the other instance does.
+ */
+ public static PipelineData getInstance(PipelineData other) {
+ PipelineData newobj = new PipelineData();
+ newobj.documentContent = other.documentContent;
+ newobj.documentMetadata = other.documentMetadata;
+ newobj.exception = other.exception;
+ newobj.monitoringData = other.monitoringData;
+ newobj.repositoryChange = other.repositoryChange;
+ newobj.retryCounter = other.retryCounter;
+ newobj.stage = other.stage;
+ return newobj;
+ }

/**
* @brief Dispose a pipeline data instance.

Modified: trunk/java/ch/idok/dmsd/pipeline/PipelineQueue.java
==============================================================================
--- trunk/java/ch/idok/dmsd/pipeline/PipelineQueue.java (original)
+++ trunk/java/ch/idok/dmsd/pipeline/PipelineQueue.java Mon Mar 2 10:58:39
2009
@@ -61,7 +61,7 @@
* stage.
*
* The stage argument to push might seem strange at first, but they are
- * actually usefull if the queue implementation allows connections to
+ * actually useful if the queue implementation allows connections to
* multiple input and output stages. The stage argument lets the queue
know
* which stage is performing operations.
*/
@@ -152,13 +152,13 @@
* @throws DmsException
* If the signal cannot be distributed cleanly.
*/
- public void signal(PipelineSignal signal, PipelineStage originator)
+ public void signal(PipelineSignal signal)
throws DmsException {
- if (signal.direction == PipelineSignal.Direction.UP) // upstream
+ if (signal.direction() == Signal.Direction.UP) // upstream
for (PipelineStage stage : in)
- stage.signal(signal, originator);
- else if (signal.direction == PipelineSignal.Direction.DOWN) //
downstream
+ stage.signal(signal);
+ else if (signal.direction() == Signal.Direction.DOWN) // downstream
for (PipelineStage stage : out)
- stage.signal(signal, originator);
+ stage.signal(signal);
}
}

Modified: trunk/java/ch/idok/dmsd/pipeline/PipelineSignal.java
==============================================================================
--- trunk/java/ch/idok/dmsd/pipeline/PipelineSignal.java (original)
+++ trunk/java/ch/idok/dmsd/pipeline/PipelineSignal.java Mon Mar 2
10:58:39 2009
@@ -20,58 +20,49 @@
package ch.idok.dmsd.pipeline;

/**
- * @brief The different pipeline signals. - Upstream signals have direction
UP. -
- * Downstream signals have direction DOWN. - Nonpropagating signals
have
- * direction NONE.
+ * @brief Pipeline signals.
+ * - Upstream signals have direction UP.
+ * - Downstream signals have direction DOWN.
+ * - Non propagating signals have direction NONE.
*/
-public enum PipelineSignal {
- /** @name Downstream signals */
- // @{
- /** Start operation cycle. */
- START(Direction.DOWN),
- /** Stop operation cycle. */
- STOP(Direction.DOWN),
- /** Commit operation. */
- COMMIT(Direction.DOWN),
- /** Get monitoring counters. */
- GET_COUNTERS(Direction.DOWN),
+public class PipelineSignal {
+
/**
- * Signal unrecoverable upstream problems with last upstream signal or
- * operation.
+ * @brief Return the direction of the emitted signal
+ *
+ * @return Direction of the emitted signal
*/
- UPSTREAM_PROBLEM(Direction.DOWN),
- // @}
- /** @name Upstream signals */
- // @{
- /** Request stop of the operation cycle. */
- STOP_REQUEST(Direction.UP),
+ public Signal.Direction direction() {
+ return type.direction;
+ }
+
/**
- * Signal unrecoverable downstream problems with last downstream signal
or
- * operation.
+ * @brief The emitted signal type
*/
- DOWNSTREAM_PROBLEM(Direction.UP),
- // @}
- /** @name Nonpropagating signals */
- // @{
- /** Announce job accomplishment. */
- FINISHED(Direction.NONE);
- // @}
-
- /** Signal propagation directions. */
- public enum Direction {
- UP, DOWN, NONE
- }
-
- /** The propagation direction of this signal. */
- public Direction direction;
+ public Signal type;
+
+ /**
+ * @brief The pipeline stage that emitted this signal originally
+ */
+ public PipelineStage originator;

/**
- * Create a pipeline signal.
+ * @brief Construct a pipeline signal
*
- * @param dir
- * The propagation direction of the signal.
+ * @param sig The emitted signal
+ * @param orig The original emitter of this signal
+ */
+ public PipelineSignal(Signal sig, PipelineStage orig) {
+ type = sig;
+ originator = orig;
+ }
+
+ /**
+ * @brief Return a description of this object
+ * @return String describing this object
*/
- PipelineSignal(Direction dir) {
- direction = dir;
+ @Override
+ public String toString() {
+ return this.getClass().toString()+"("+type+", "+originator+")";
}
}

Modified: trunk/java/ch/idok/dmsd/pipeline/PipelineSignalException.java
==============================================================================
--- trunk/java/ch/idok/dmsd/pipeline/PipelineSignalException.java
(original)
+++ trunk/java/ch/idok/dmsd/pipeline/PipelineSignalException.java Mon
Mar 2 10:58:39 2009
@@ -57,11 +57,11 @@
* The originator of the signal that caused the failure.
*/
public PipelineSignalException(DmsException cause, PipelineStage failed,
- PipelineSignal sig, PipelineStage origin) {
+ PipelineSignal sig) {
super(cause);
exception = cause;
signal = sig;
- signalOriginator = origin;
+ signalOriginator = sig.originator;
failedStage = failed;
}


Modified: trunk/java/ch/idok/dmsd/pipeline/PipelineStage.java
==============================================================================
--- trunk/java/ch/idok/dmsd/pipeline/PipelineStage.java (original)
+++ trunk/java/ch/idok/dmsd/pipeline/PipelineStage.java Mon Mar 2 10:58:39
2009
@@ -150,11 +150,11 @@
* @throws DmsException
* If the signal cannot be distributed cleanly.
*/
- public void signal(PipelineSignal signal, PipelineStage originator)
+ public void signal(PipelineSignal signal)
throws DmsException {
DmsException exception = null;
try {
- processSignal(signal, originator);
+ processSignal(signal);
} catch (DmsException ex) {
exception = ex;
} catch (Throwable th) {
@@ -164,18 +164,18 @@

if (exception != null) {
PipelineSignalException ex = new
PipelineSignalException(exception,
- this, signal, originator);
+ this, signal);

Setup.getConfig().getPipelineController().handleSignalingError(ex);
return;
}

PipelineQueue q = null;
- if (signal.direction == PipelineSignal.Direction.UP) // upstream
+ if (signal.direction() == Signal.Direction.UP) // upstream
q = in;
- else if (signal.direction == PipelineSignal.Direction.DOWN) //
downstream
+ else if (signal.direction() == Signal.Direction.DOWN) // downstream
q = out;
if (q != null)
- q.signal(signal, originator);
+ q.signal(signal);
}

/**
@@ -214,8 +214,7 @@
* @throws DmsException
* If the pipeline signal cannot be handled.
*/
- public abstract void processSignal(PipelineSignal signal,
- PipelineStage originator) throws DmsException;
+ public abstract void processSignal(PipelineSignal signal) throws
DmsException;

/**
* @brief Retrieve the name of this pipeline stage.



  • [idok-commit] idok commit r332 - in trunk/java/ch/idok: common/errorhandling dmsd/impl/config/svnlucene dmsd/impl/controller dmsd/impl/extractor dmsd/impl/indexer/lucene dmsd/impl/loader dmsd/impl/queue dmsd/impl/updatelist dmsd/pipeline, AFS account Stadler Hans Christian, 03/02/2009

Archive powered by MHonArc 2.6.19.

Top of Page