root / sqs-reader / src / main / java / net / sqs2 / omr / session / producer / SessionSourceScannerTaskProducer.java @ 1855
History | View | Annotate | Download (6.8 KB)
| 1 | /**
|
|---|---|
| 2 | * SessionSourceScannerTaskGenerator.java |
| 3 | |
| 4 | Copyright 2007 KUBO Hiroya (hiroya@cuc.ac.jp). |
| 5 | |
| 6 | Licensed under the Apache License, Version 2.0 (the "License"); |
| 7 | you may not use this file except in compliance with the License. |
| 8 | You may obtain a copy of the License at |
| 9 | |
| 10 | http://www.apache.org/licenses/LICENSE-2.0 |
| 11 | |
| 12 | Unless required by applicable law or agreed to in writing, software |
| 13 | distributed under the License is distributed on an "AS IS" BASIS, |
| 14 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 15 | See the License for the specific language governing permissions and |
| 16 | limitations under the License. |
| 17 | |
| 18 | Created on 2007/01/31 |
| 19 | Author hiroya |
| 20 | */ |
| 21 | package net.sqs2.omr.session.producer;
|
| 22 | |
| 23 | import java.io.IOException; |
| 24 | |
| 25 | import java.util.Calendar; |
| 26 | import java.util.logging.Logger; |
| 27 | |
| 28 | import net.sqs2.omr.MarkReaderConstants; |
| 29 | import net.sqs2.omr.model.AbstractPageTask; |
| 30 | import net.sqs2.omr.model.AbstractTask; |
| 31 | import net.sqs2.omr.model.PageID; |
| 32 | import net.sqs2.omr.model.PageTask; |
| 33 | import net.sqs2.omr.model.PageTaskAccessor; |
| 34 | import net.sqs2.omr.model.PageTaskFactory; |
| 35 | import net.sqs2.omr.model.PageTaskHolder; |
| 36 | import net.sqs2.omr.model.SourceDirectory; |
| 37 | import net.sqs2.omr.session.monitor.MarkReaderSessionMonitorImpl; |
| 38 | import net.sqs2.omr.session.source.SessionSource; |
| 39 | import net.sqs2.omr.session.source.ContentIndexer; |
| 40 | import net.sqs2.omr.session.source.SessionStopException; |
| 41 | |
| 42 | public class SessionSourceScannerTaskProducer extends SessionSourceScanner implements Runnable { |
| 43 | |
| 44 | private MarkReaderSessionMonitorImpl monitor;
|
| 45 | private PageTaskHolder taskHolder;
|
| 46 | |
| 47 | public SessionSourceScannerTaskProducer(SessionSource sessionSource, MarkReaderSessionMonitorImpl monitor,
|
| 48 | PageTaskHolder taskHolder) throws IOException { |
| 49 | super(sessionSource);
|
| 50 | this.monitor = monitor;
|
| 51 | this.taskHolder = taskHolder;
|
| 52 | } |
| 53 | |
| 54 | @Override
|
| 55 | public AbstractSessionSourceScannerWorker createWorker() throws IOException { |
| 56 | return new SessionSourceScannerWorker(this.sessionSource, this.taskHolder, this.monitor, |
| 57 | MarkReaderConstants.SESSION_SOURCE_NEWFILE_IGNORE_SEC_THRESHOLD_IN_SEC); |
| 58 | } |
| 59 | |
| 60 | @Override
|
| 61 | public void run() { |
| 62 | |
| 63 | super.run();
|
| 64 | |
| 65 | } |
| 66 | |
| 67 | static protected class SessionSourceScannerWorker extends AbstractSessionSourceScannerWorker { |
| 68 | |
| 69 | private static final boolean INFO = false; |
| 70 | |
| 71 | int numAdded = 0; |
| 72 | int numReused = 0; |
| 73 | int numRetry = 0; |
| 74 | |
| 75 | private SessionSource sessionSource;
|
| 76 | private PageTaskHolder taskHolder;
|
| 77 | private MarkReaderSessionMonitorImpl monitor;
|
| 78 | private long newFileIgnoreSecThreshold; |
| 79 | |
| 80 | private PageTaskAccessor taskAccessor;
|
| 81 | private long now; |
| 82 | |
| 83 | SessionSourceScannerWorker(SessionSource sessionSource, PageTaskHolder taskHolder, |
| 84 | MarkReaderSessionMonitorImpl monitor, long newFileIgnoreSecThreshold) throws IOException { |
| 85 | this.now = Calendar.getInstance().getTimeInMillis(); |
| 86 | this.sessionSource = sessionSource;
|
| 87 | this.taskHolder = taskHolder;
|
| 88 | this.monitor = monitor;
|
| 89 | this.newFileIgnoreSecThreshold = newFileIgnoreSecThreshold;
|
| 90 | this.taskAccessor = sessionSource.getContentAccessor().getPageTaskAccessor();
|
| 91 | } |
| 92 | |
| 93 | @Override
|
| 94 | void work(SourceDirectory sourceDirectory, int pageNumber, PageID pageID, int rowIndex) throws SessionStopException { |
| 95 | PageTask task = PageTaskFactory.createPageTask(sourceDirectory, pageNumber, pageID, this.sessionSource.getSessionID());
|
| 96 | if(isIgnorableTask(task)){
|
| 97 | return;
|
| 98 | } |
| 99 | |
| 100 | PageTask preparedTask = prepareTask(task); |
| 101 | if (preparedTask != null) { |
| 102 | this.taskHolder.incrementNumTargetTasks(1); |
| 103 | this.taskAccessor.put(preparedTask);
|
| 104 | this.taskHolder.addPreparedTask(preparedTask);
|
| 105 | this.monitor.notifyPageTaskProduced(preparedTask);
|
| 106 | } |
| 107 | ContentIndexer contentIndexer = this.sessionSource.getContentIndexer();
|
| 108 | contentIndexer.putRowIndex(pageID, rowIndex); |
| 109 | } |
| 110 | |
| 111 | @Override
|
| 112 | void startScanningSourceDirectory(SourceDirectory sourceDirectory) {
|
| 113 | } |
| 114 | |
| 115 | private PageTask prepareTask(PageTask task) throws SessionStopException { |
| 116 | PageTask storedTask = null;
|
| 117 | try {
|
| 118 | storedTask = this.taskAccessor.get(task.toString());
|
| 119 | } catch (Exception ignore) { |
| 120 | } |
| 121 | |
| 122 | if (isOnceExecutedTask(storedTask)) {
|
| 123 | if (hasError(storedTask)) {
|
| 124 | |
| 125 | this.monitor.notifyErrorPageTaskReproduced(storedTask);
|
| 126 | |
| 127 | if (!isExecutionRequiredTaskWithError(task, storedTask)) {
|
| 128 | return null; |
| 129 | } |
| 130 | |
| 131 | } else if (storedTask.getTaskResult() != null) { |
| 132 | if (!isExecutionRequiredTask(task, storedTask)) {
|
| 133 | return null; |
| 134 | } |
| 135 | } else {
|
| 136 | this.numAdded++;
|
| 137 | Logger.getLogger("session").info("==========ADD\t" + task); |
| 138 | } |
| 139 | } else {
|
| 140 | this.numAdded++;
|
| 141 | Logger.getLogger("session").info("==========ADD\t" + task); |
| 142 | } |
| 143 | return task;
|
| 144 | } |
| 145 | |
| 146 | private boolean isOnceExecutedTask(AbstractTask storedTask) { |
| 147 | return storedTask != null; |
| 148 | } |
| 149 | |
| 150 | private boolean isIgnorableTask(PageTask task) { |
| 151 | if (isConcurrentFileModificationSuspected(task)) {
|
| 152 | if (INFO) {
|
| 153 | Logger.getLogger("source").info("IGNORE\t" + task); |
| 154 | } |
| 155 | return true; |
| 156 | } |
| 157 | /*
|
| 158 | * if(isPrepareTaskd(task)){ if(INFO){
|
| 159 | * Logger.getLogger("source").info("PREPARED\t"+task); } return
|
| 160 | * true; } if(isLeasedTask(task)){ if(INFO){
|
| 161 | * Logger.getLogger("source").info("LEASED\t"+task); } return true;
|
| 162 | * } |
| 163 | */ |
| 164 | return false; |
| 165 | } |
| 166 | |
| 167 | private boolean isExecutionRequiredTaskWithError(AbstractTask task, AbstractTask storedTask) { |
| 168 | if (this.sessionSource.getSessionID() == storedTask.getSessionID()) { |
| 169 | if (INFO) {
|
| 170 | Logger.getLogger("source").info("IGNORE ERROR\t" + task); |
| 171 | } |
| 172 | return false; |
| 173 | } else {
|
| 174 | this.numRetry++;
|
| 175 | if (INFO) {
|
| 176 | Logger.getLogger("source").info("==========RETRY ERROR\t" + task); |
| 177 | } |
| 178 | return true; |
| 179 | } |
| 180 | } |
| 181 | |
| 182 | private boolean isExecutionRequiredTask(AbstractTask task, AbstractTask storedTask) { |
| 183 | if (this.sessionSource.getSessionID() == storedTask.getSessionID()) { |
| 184 | if (INFO) {
|
| 185 | // Logger.getLogger("source").info("IGNORE\t" + task);
|
| 186 | } |
| 187 | return false; |
| 188 | } else {
|
| 189 | if (INFO) {
|
| 190 | // Logger.getLogger("source").info("REUSE\t" + task);
|
| 191 | } |
| 192 | this.numReused++;
|
| 193 | this.taskHolder.setNumReusedTasks(this.numReused); |
| 194 | return false; |
| 195 | } |
| 196 | } |
| 197 | |
| 198 | /*
|
| 199 | |
| 200 | private boolean isPreparedTask(AbstractTask task) {
|
| 201 | return this.taskHolder.isPreparedTask(task); |
| 202 | } |
| 203 | |
| 204 | private boolean isLeasedTask(AbstractTask task) {
|
| 205 | return this.taskHolder.isLeasedTask(task); |
| 206 | } |
| 207 | |
| 208 | */ |
| 209 | |
| 210 | private boolean hasError(AbstractPageTask task) { |
| 211 | return task.getTaskError() != null; |
| 212 | } |
| 213 | |
| 214 | private boolean isConcurrentFileModificationSuspected(PageTask task) { |
| 215 | return this.newFileIgnoreSecThreshold != -1 && (this.now - task.getPageID().getFileResourceID().getLastModified() |
| 216 | <= this.newFileIgnoreSecThreshold);
|
| 217 | } |
| 218 | |
| 219 | @Override
|
| 220 | void finishScan() {
|
| 221 | StringBuilder sb = new StringBuilder(64); |
| 222 | sb.append("\nnumReused = " + this.numReused); |
| 223 | sb.append("\nnumAdded = " + this.numAdded); |
| 224 | sb.append("\nnumRetry = " + this.numRetry); |
| 225 | Logger.getLogger("session").info("TaskProducer\n\t" + sb.toString()); |
| 226 | } |
| 227 | } |
| 228 | } |

SourceEditor2.0(2010/08/18)
