idok-commit AT lists.psi.ch
Subject: Commit emails of the iDok project
List archive
[idok-commit] idok commit r326 - in branches/dmsd2/java/ch/idok/dmsd: impl/controller impl/extractor impl/indexer/lucene impl/loader impl/updatelist pipeline
Chronological Thread
- From: "AFS account Stadler Hans Christian" <stadler_h AT savannah.psi.ch>
- To: idok-commit AT lists.psi.ch
- Subject: [idok-commit] idok commit r326 - in branches/dmsd2/java/ch/idok/dmsd: impl/controller impl/extractor impl/indexer/lucene impl/loader impl/updatelist pipeline
- Date: Tue, 24 Feb 2009 13:48:48 +0100
- List-archive: <https://lists.web.psi.ch/pipermail/idok-commit/>
- List-id: Commit emails of the iDok project <idok-commit.lists.psi.ch>
Author: stadler_h
Date: Tue Feb 24 13:48:48 2009
New Revision: 326
Log:
Changed pipeline signal object to contain all necessary info
Modified:
branches/dmsd2/java/ch/idok/dmsd/impl/controller/SimpleController.java
branches/dmsd2/java/ch/idok/dmsd/impl/extractor/Extractor.java
branches/dmsd2/java/ch/idok/dmsd/impl/indexer/lucene/Indexer.java
branches/dmsd2/java/ch/idok/dmsd/impl/loader/ContentLoader.java
branches/dmsd2/java/ch/idok/dmsd/impl/updatelist/SimpleUpdateListGenerator.java
branches/dmsd2/java/ch/idok/dmsd/pipeline/NoOp.java
branches/dmsd2/java/ch/idok/dmsd/pipeline/PipelineController.java
branches/dmsd2/java/ch/idok/dmsd/pipeline/PipelineQueue.java
branches/dmsd2/java/ch/idok/dmsd/pipeline/PipelineSignal.java
branches/dmsd2/java/ch/idok/dmsd/pipeline/PipelineSignalException.java
branches/dmsd2/java/ch/idok/dmsd/pipeline/PipelineStage.java
Modified:
branches/dmsd2/java/ch/idok/dmsd/impl/controller/SimpleController.java
==============================================================================
--- branches/dmsd2/java/ch/idok/dmsd/impl/controller/SimpleController.java
(original)
+++ branches/dmsd2/java/ch/idok/dmsd/impl/controller/SimpleController.java
Tue Feb 24 13:48:48 2009
@@ -46,6 +46,7 @@
import ch.idok.dmsd.pipeline.PipelineSignal;
import ch.idok.dmsd.pipeline.PipelineSignalException;
import ch.idok.dmsd.pipeline.PipelineStage;
+import ch.idok.dmsd.pipeline.Signal;
/**
* @brief Simple pipeline controller implementation.
@@ -258,8 +259,7 @@
if (ex.getErrorType() == ErrorType.INDEX_ACCESS) {
msg.append("\nAbandon indexing for the repository.");
logger.warning(msg.toString());
- data.stage
- .signal(PipelineSignal.DOWNSTREAM_PROBLEM,
data.stage);
+ data.stage.signal(new
PipelineSignal(Signal.DOWNSTREAM_PROBLEM, data.stage));
return;
}
if (ex.getErrorType() == ErrorType.INTERNAL) {
@@ -306,12 +306,12 @@
sb.append(exception.getLogMessage() + "\n");
if (exception.getErrorType() == ErrorType.INTERNAL)
Util.getStackTrace(sb, exception);
- if ((ex.signal == PipelineSignal.START)
- || (ex.signal == PipelineSignal.COMMIT)) {
+ if ((ex.signal.type == Signal.START)
+ || (ex.signal.type == Signal.COMMIT)) {
sb.append("Signalling DOWNSTREAM_PROBLEM\n");
try {
- ex.failedStage.signal(PipelineSignal.DOWNSTREAM_PROBLEM,
- ex.failedStage);
+ ex.failedStage.signal(new
PipelineSignal(Signal.DOWNSTREAM_PROBLEM,
+ ex.failedStage));
} catch (Throwable th) {
level = Level.SEVERE;
sb
@@ -378,14 +378,14 @@
* The originator of the pipeline signal.
*/
@Override
- public void processSignal(PipelineSignal signal, PipelineStage
originator)
+ public void processSignal(PipelineSignal signal)
throws DmsException {
- logger.fine("Received signal " + signal + " from " + originator);
- if (signal == PipelineSignal.FINISHED) {
+ logger.fine("Received signal " + signal + " from " +
signal.originator);
+ if (signal.type == Signal.FINISHED) {
++numFinishedSignals;
int lastSignal = getOut().getOutSize();
logger.fine("Received signal FINISHED(" + numFinishedSignals +
"/"
- + lastSignal + ") from " + originator);
+ + lastSignal + ") from " + signal.originator);
if (numFinishedSignals == lastSignal)
finished = true;
}
@@ -427,7 +427,7 @@
numFinishedSignals = 0;
finished = false;
logger.finest("Signalling START");
- getOut().signal(PipelineSignal.START, this);
+ getOut().signal(new PipelineSignal(Signal.START, this));
while (!(finished || stopnow)) {
logger.finest("Releasing token");
token.stage = this;
@@ -435,10 +435,10 @@
token.clear();
}
logger.finest("Signalling STOP");
- getOut().signal(PipelineSignal.STOP, this);
+ getOut().signal(new PipelineSignal(Signal.STOP, this));
monitor.suspendTimer("operation.time");
logger.finest("Signalling GET_COUNTERS");
- getOut().signal(PipelineSignal.GET_COUNTERS, this);
+ getOut().signal(new PipelineSignal(Signal.GET_COUNTERS,
this));
// Handle our own monitoring data here to avoid multiple
// invocations
monitoringHandler.handleMonitoringData(monitor);
Modified: branches/dmsd2/java/ch/idok/dmsd/impl/extractor/Extractor.java
==============================================================================
--- branches/dmsd2/java/ch/idok/dmsd/impl/extractor/Extractor.java
(original)
+++ branches/dmsd2/java/ch/idok/dmsd/impl/extractor/Extractor.java Tue
Feb 24 13:48:48 2009
@@ -38,6 +38,7 @@
import ch.idok.dmsd.pipeline.PipelineData;
import ch.idok.dmsd.pipeline.PipelineSignal;
import ch.idok.dmsd.pipeline.PipelineStage;
+import ch.idok.dmsd.pipeline.Signal;
/**
@@ -156,10 +157,10 @@
* manager with the local monitoring data.
*/
@Override
- public void processSignal(PipelineSignal signal, PipelineStage
originator)
+ public void processSignal(PipelineSignal signal)
throws DmsException {
- logger.finest("Received signal " + signal + " from " + originator);
- if (signal == PipelineSignal.GET_COUNTERS)
+ logger.finest("Received signal " + signal + " from " +
signal.originator);
+ if (signal.type == Signal.GET_COUNTERS)
config.getMonitoringManager().handleMonitoringData(monitor);
}
Modified: branches/dmsd2/java/ch/idok/dmsd/impl/indexer/lucene/Indexer.java
==============================================================================
--- branches/dmsd2/java/ch/idok/dmsd/impl/indexer/lucene/Indexer.java
(original)
+++ branches/dmsd2/java/ch/idok/dmsd/impl/indexer/lucene/Indexer.java Tue
Feb 24 13:48:48 2009
@@ -55,6 +55,7 @@
import ch.idok.dmsd.pipeline.PipelineData;
import ch.idok.dmsd.pipeline.PipelineSignal;
import ch.idok.dmsd.pipeline.PipelineStage;
+import ch.idok.dmsd.pipeline.Signal;
/**
* @brief Pipeline stage for creating and updating the Lucene index.
@@ -340,11 +341,11 @@
* The originator of the pipline signal.
*/
@Override
- public void processSignal(PipelineSignal signal, PipelineStage
originator)
+ public void processSignal(PipelineSignal signal)
throws DmsException {
try {
- logger.finest("Received signal " + signal + " from " +
originator);
- if (signal == PipelineSignal.START) {
+ logger.finest("Received signal " + signal + " from " +
signal.originator);
+ if (signal.type == Signal.START) {
IndexWriter indexWriter = new IndexWriter(indexDirectory,
analyzer, !indexDirectory.isDirectory(),
IndexWriter.MaxFieldLength.UNLIMITED);
indexWriter.close();
@@ -352,13 +353,13 @@
logger
.warning("Sanity check failed, indexWriter did
not create an empty directory!");
operationStopped = false;
- } else if (signal == PipelineSignal.STOP) {
+ } else if (signal.type == Signal.STOP) {
stopOperation(true);
- } else if (signal == PipelineSignal.COMMIT) {
+ } else if (signal.type == Signal.COMMIT) {
commitOperation(true);
- } else if (signal == PipelineSignal.DOWNSTREAM_PROBLEM) {
+ } else if (signal.type == Signal.DOWNSTREAM_PROBLEM) {
stopOperation(true);
- } else if (signal == PipelineSignal.GET_COUNTERS) {
+ } else if (signal.type == Signal.GET_COUNTERS) {
config.getMonitoringManager().handleMonitoringData(monitor);
}
} catch (IOException ex) {
Modified: branches/dmsd2/java/ch/idok/dmsd/impl/loader/ContentLoader.java
==============================================================================
--- branches/dmsd2/java/ch/idok/dmsd/impl/loader/ContentLoader.java
(original)
+++ branches/dmsd2/java/ch/idok/dmsd/impl/loader/ContentLoader.java Tue
Feb 24 13:48:48 2009
@@ -44,6 +44,7 @@
import ch.idok.dmsd.pipeline.PipelineData;
import ch.idok.dmsd.pipeline.PipelineSignal;
import ch.idok.dmsd.pipeline.PipelineStage;
+import ch.idok.dmsd.pipeline.Signal;
/**
* @brief Pipeline stage for loading the raw document content and meta data
from
@@ -201,10 +202,10 @@
* The originator of the signal.
*/
@Override
- public void processSignal(PipelineSignal signal, PipelineStage
originator)
+ public void processSignal(PipelineSignal signal)
throws DmsException {
- logger.finest("Received signal " + signal + " from " + originator);
- if (signal == PipelineSignal.GET_COUNTERS)
+ logger.finest("Received signal " + signal + " from " +
signal.originator);
+ if (signal.type == Signal.GET_COUNTERS)
config.getMonitoringManager().handleMonitoringData(monitor);
}
Modified:
branches/dmsd2/java/ch/idok/dmsd/impl/updatelist/SimpleUpdateListGenerator.java
==============================================================================
---
branches/dmsd2/java/ch/idok/dmsd/impl/updatelist/SimpleUpdateListGenerator.java
(original)
+++
branches/dmsd2/java/ch/idok/dmsd/impl/updatelist/SimpleUpdateListGenerator.java
Tue Feb 24 13:48:48 2009
@@ -40,6 +40,7 @@
import ch.idok.dmsd.pipeline.PipelineData;
import ch.idok.dmsd.pipeline.PipelineSignal;
import ch.idok.dmsd.pipeline.PipelineStage;
+import ch.idok.dmsd.pipeline.Signal;
/**
* @brief Pipeline stage for assembling the list of updated documents.
@@ -92,7 +93,7 @@
try {
logger.finest("Stop operation");
if (signalFinished)
- controller.signal(PipelineSignal.FINISHED, this);
+ controller.signal(new PipelineSignal(Signal.FINISHED, this));
} catch (Throwable th) {
// ignore
} finally {
@@ -163,7 +164,7 @@
getOut().push(this, data);
} else {
updateListIterator = null;
- getOut().signal(PipelineSignal.COMMIT, this); // May call
+ getOut().signal(new PipelineSignal(Signal.COMMIT, this)); //
May call
// stopOperation(true)
if (repository != null) {
stopOperation(true);
@@ -213,23 +214,23 @@
* The originator of the pipeline signal.
*/
@Override
- public void processSignal(PipelineSignal signal, PipelineStage
originator)
+ public void processSignal(PipelineSignal signal)
throws DmsException {
DmsException exception;
try {
- logger.finest("Received signal " + signal + " from " +
originator);
- if (signal == PipelineSignal.START) {
+ logger.finest("Received signal " + signal + " from " +
signal.originator);
+ if (signal.type == Signal.START) {
updateListIterator = null;
Repository repo =
repositoryManager.getRepository(repositoryId);
latestVersion = repo.getLatestVersion();
repository = repo;
- } else if (signal == PipelineSignal.STOP) {
+ } else if (signal.type == Signal.STOP) {
if (updateListIterator != null)
logger.warning("Received stop during operation cycle!");
stopOperation(false);
- } else if (signal == PipelineSignal.DOWNSTREAM_PROBLEM) {
+ } else if (signal.type == Signal.DOWNSTREAM_PROBLEM) {
stopOperation(true);
- } else if (signal == PipelineSignal.GET_COUNTERS) {
+ } else if (signal.type == Signal.GET_COUNTERS) {
config.getMonitoringManager().handleMonitoringData(monitor);
}
return;
Modified: branches/dmsd2/java/ch/idok/dmsd/pipeline/NoOp.java
==============================================================================
--- branches/dmsd2/java/ch/idok/dmsd/pipeline/NoOp.java (original)
+++ branches/dmsd2/java/ch/idok/dmsd/pipeline/NoOp.java Tue Feb 24 13:48:48
2009
@@ -45,7 +45,7 @@
public void push(PipelineData data) {
try {
data.stage = this;
- controller.signal(PipelineSignal.FINISHED, this);
+ controller.signal(new PipelineSignal(Signal.FINISHED, this));
return;
} catch (Throwable th) {
data.exception = new DmsException(ErrorType.INTERNAL, this,
@@ -62,7 +62,7 @@
/** @brief Do nothing. */
@Override
- public void processSignal(PipelineSignal signal, PipelineStage
originator)
+ public void processSignal(PipelineSignal signal)
throws DmsException {
}
Modified: branches/dmsd2/java/ch/idok/dmsd/pipeline/PipelineController.java
==============================================================================
--- branches/dmsd2/java/ch/idok/dmsd/pipeline/PipelineController.java
(original)
+++ branches/dmsd2/java/ch/idok/dmsd/pipeline/PipelineController.java Tue
Feb 24 13:48:48 2009
@@ -45,9 +45,9 @@
}
@Override
- public void signal(PipelineSignal signal, PipelineStage originator)
+ public void signal(PipelineSignal signal)
throws DmsException {
- processSignal(signal, originator);
+ processSignal(signal);
}
/**
Modified: branches/dmsd2/java/ch/idok/dmsd/pipeline/PipelineQueue.java
==============================================================================
--- branches/dmsd2/java/ch/idok/dmsd/pipeline/PipelineQueue.java
(original)
+++ branches/dmsd2/java/ch/idok/dmsd/pipeline/PipelineQueue.java Tue
Feb 24 13:48:48 2009
@@ -152,13 +152,13 @@
* @throws DmsException
* If the signal cannot be distributed cleanly.
*/
- public void signal(PipelineSignal signal, PipelineStage originator)
+ public void signal(PipelineSignal signal)
throws DmsException {
- if (signal.direction == PipelineSignal.Direction.UP) // upstream
+ if (signal.direction() == Signal.Direction.UP) // upstream
for (PipelineStage stage : in)
- stage.signal(signal, originator);
- else if (signal.direction == PipelineSignal.Direction.DOWN) //
downstream
+ stage.signal(signal);
+ else if (signal.direction() == Signal.Direction.DOWN) // downstream
for (PipelineStage stage : out)
- stage.signal(signal, originator);
+ stage.signal(signal);
}
}
Modified: branches/dmsd2/java/ch/idok/dmsd/pipeline/PipelineSignal.java
==============================================================================
--- branches/dmsd2/java/ch/idok/dmsd/pipeline/PipelineSignal.java
(original)
+++ branches/dmsd2/java/ch/idok/dmsd/pipeline/PipelineSignal.java Tue
Feb 24 13:48:48 2009
@@ -20,58 +20,41 @@
package ch.idok.dmsd.pipeline;
/**
- * @brief The different pipeline signals. - Upstream signals have direction
UP. -
- * Downstream signals have direction DOWN. - Nonpropagating signals
have
- * direction NONE.
+ * @brief Pipeline signals.
+ * - Upstream signals have direction UP.
+ * - Downstream signals have direction DOWN.
+ * - Non propagating signals have direction NONE.
*/
-public enum PipelineSignal {
- /** @name Downstream signals */
- // @{
- /** Start operation cycle. */
- START(Direction.DOWN),
- /** Stop operation cycle. */
- STOP(Direction.DOWN),
- /** Commit operation. */
- COMMIT(Direction.DOWN),
- /** Get monitoring counters. */
- GET_COUNTERS(Direction.DOWN),
+public class PipelineSignal {
+
/**
- * Signal unrecoverable upstream problems with last upstream signal or
- * operation.
+ * @brief Return the direction of the emitted signal
+ *
+ * @return Direction of the emitted signal
*/
- UPSTREAM_PROBLEM(Direction.DOWN),
- // @}
- /** @name Upstream signals */
- // @{
- /** Request stop of the operation cycle. */
- STOP_REQUEST(Direction.UP),
+ public Signal.Direction direction() {
+ return type.direction;
+ }
+
/**
- * Signal unrecoverable downstream problems with last downstream signal
or
- * operation.
+ * @brief The emitted signal type
*/
- DOWNSTREAM_PROBLEM(Direction.UP),
- // @}
- /** @name Nonpropagating signals */
- // @{
- /** Announce job accomplishment. */
- FINISHED(Direction.NONE);
- // @}
-
- /** Signal propagation directions. */
- public enum Direction {
- UP, DOWN, NONE
- }
-
- /** The propagation direction of this signal. */
- public Direction direction;
+ public Signal type;
+
+ /**
+ * @brief The pipeline stage that emitted this signal originally
+ */
+ public PipelineStage originator;
/**
- * Create a pipeline signal.
+ * @brief Construct a pipeline signal
*
- * @param dir
- * The propagation direction of the signal.
+ * @param sig The emitted signal
+ * @param orig The original emitter of this signal
*/
- PipelineSignal(Direction dir) {
- direction = dir;
+ public PipelineSignal(Signal sig, PipelineStage orig) {
+ type = sig;
+ originator = orig;
}
+
}
Modified:
branches/dmsd2/java/ch/idok/dmsd/pipeline/PipelineSignalException.java
==============================================================================
--- branches/dmsd2/java/ch/idok/dmsd/pipeline/PipelineSignalException.java
(original)
+++ branches/dmsd2/java/ch/idok/dmsd/pipeline/PipelineSignalException.java
Tue Feb 24 13:48:48 2009
@@ -57,11 +57,11 @@
* The originator of the signal that caused the failure.
*/
public PipelineSignalException(DmsException cause, PipelineStage failed,
- PipelineSignal sig, PipelineStage origin) {
+ PipelineSignal sig) {
super(cause);
exception = cause;
signal = sig;
- signalOriginator = origin;
+ signalOriginator = sig.originator;
failedStage = failed;
}
Modified: branches/dmsd2/java/ch/idok/dmsd/pipeline/PipelineStage.java
==============================================================================
--- branches/dmsd2/java/ch/idok/dmsd/pipeline/PipelineStage.java
(original)
+++ branches/dmsd2/java/ch/idok/dmsd/pipeline/PipelineStage.java Tue
Feb 24 13:48:48 2009
@@ -150,11 +150,11 @@
* @throws DmsException
* If the signal cannot be distributed cleanly.
*/
- public void signal(PipelineSignal signal, PipelineStage originator)
+ public void signal(PipelineSignal signal)
throws DmsException {
DmsException exception = null;
try {
- processSignal(signal, originator);
+ processSignal(signal);
} catch (DmsException ex) {
exception = ex;
} catch (Throwable th) {
@@ -164,18 +164,18 @@
if (exception != null) {
PipelineSignalException ex = new
PipelineSignalException(exception,
- this, signal, originator);
+ this, signal);
Setup.getConfig().getPipelineController().handleSignalingError(ex);
return;
}
PipelineQueue q = null;
- if (signal.direction == PipelineSignal.Direction.UP) // upstream
+ if (signal.direction() == Signal.Direction.UP) // upstream
q = in;
- else if (signal.direction == PipelineSignal.Direction.DOWN) //
downstream
+ else if (signal.direction() == Signal.Direction.DOWN) // downstream
q = out;
if (q != null)
- q.signal(signal, originator);
+ q.signal(signal);
}
/**
@@ -214,8 +214,7 @@
* @throws DmsException
* If the pipeline signal cannot be handled.
*/
- public abstract void processSignal(PipelineSignal signal,
- PipelineStage originator) throws DmsException;
+ public abstract void processSignal(PipelineSignal signal) throws
DmsException;
/**
* @brief Retrieve the name of this pipeline stage.
- [idok-commit] idok commit r326 - in branches/dmsd2/java/ch/idok/dmsd: impl/controller impl/extractor impl/indexer/lucene impl/loader impl/updatelist pipeline, AFS account Stadler Hans Christian, 02/24/2009
Archive powered by MHonArc 2.6.19.