Statistics
| Revision:

root / sqs-reader / src / main / java / net / sqs2 / omr / session / producer / SessionSourceScannerTaskProducer.java @ 1855

History | View | Annotate | Download (6.8 KB)

1
/**
2
 *  SessionSourceScannerTaskGenerator.java
3
4
 Copyright 2007 KUBO Hiroya (hiroya@cuc.ac.jp).
5
6
 Licensed under the Apache License, Version 2.0 (the "License");
7
 you may not use this file except in compliance with the License.
8
 You may obtain a copy of the License at
9
10
 http://www.apache.org/licenses/LICENSE-2.0
11
12
 Unless required by applicable law or agreed to in writing, software
13
 distributed under the License is distributed on an "AS IS" BASIS,
14
 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
 See the License for the specific language governing permissions and
16
 limitations under the License.
17
18
 Created on 2007/01/31
19
 Author hiroya
20
 */
21
package net.sqs2.omr.session.producer;
22
23
import java.io.IOException;
24
25
import java.util.Calendar;
26
import java.util.logging.Logger;
27
28
import net.sqs2.omr.MarkReaderConstants;
29
import net.sqs2.omr.model.AbstractPageTask;
30
import net.sqs2.omr.model.AbstractTask;
31
import net.sqs2.omr.model.PageID;
32
import net.sqs2.omr.model.PageTask;
33
import net.sqs2.omr.model.PageTaskAccessor;
34
import net.sqs2.omr.model.PageTaskFactory;
35
import net.sqs2.omr.model.PageTaskHolder;
36
import net.sqs2.omr.model.SourceDirectory;
37
import net.sqs2.omr.session.monitor.MarkReaderSessionMonitorImpl;
38
import net.sqs2.omr.session.source.SessionSource;
39
import net.sqs2.omr.session.source.ContentIndexer;
40
import net.sqs2.omr.session.source.SessionStopException;
41
42
public class SessionSourceScannerTaskProducer extends SessionSourceScanner implements Runnable {
43
44
        private MarkReaderSessionMonitorImpl monitor;
45
        private PageTaskHolder taskHolder;
46
47
        public SessionSourceScannerTaskProducer(SessionSource sessionSource, MarkReaderSessionMonitorImpl monitor,
48
                        PageTaskHolder taskHolder) throws IOException {
49
                super(sessionSource);
50
                this.monitor = monitor;
51
                this.taskHolder = taskHolder;
52
        }
53
54
        @Override
55
        public AbstractSessionSourceScannerWorker createWorker() throws IOException {
56
                return new SessionSourceScannerWorker(this.sessionSource, this.taskHolder, this.monitor,
57
                                MarkReaderConstants.SESSION_SOURCE_NEWFILE_IGNORE_SEC_THRESHOLD_IN_SEC);
58
        }
59
60
        @Override
61
        public void run() {
62
63
                super.run();
64
65
        }
66
67
        static protected class SessionSourceScannerWorker extends AbstractSessionSourceScannerWorker {
68
69
                private static final boolean INFO = false;
70
71
                int numAdded = 0;
72
                int numReused = 0;
73
                int numRetry = 0;
74
75
                private SessionSource sessionSource;
76
                private PageTaskHolder taskHolder;
77
                private MarkReaderSessionMonitorImpl monitor;
78
                private long newFileIgnoreSecThreshold;
79
80
                private PageTaskAccessor taskAccessor;
81
                private long now;
82
83
                SessionSourceScannerWorker(SessionSource sessionSource, PageTaskHolder taskHolder,
84
                                MarkReaderSessionMonitorImpl monitor, long newFileIgnoreSecThreshold) throws IOException {
85
                        this.now = Calendar.getInstance().getTimeInMillis();
86
                        this.sessionSource = sessionSource;
87
                        this.taskHolder = taskHolder;
88
                        this.monitor = monitor;
89
                        this.newFileIgnoreSecThreshold = newFileIgnoreSecThreshold;
90
                        this.taskAccessor = sessionSource.getContentAccessor().getPageTaskAccessor();
91
                }
92
93
                @Override
94
                void work(SourceDirectory sourceDirectory, int pageNumber, PageID pageID, int rowIndex) throws SessionStopException {
95
                        PageTask task = PageTaskFactory.createPageTask(sourceDirectory, pageNumber, pageID, this.sessionSource.getSessionID());
96
                        if(isIgnorableTask(task)){
97
                                return;
98
                        }
99
100
                        PageTask preparedTask = prepareTask(task);
101
                        if (preparedTask != null) {
102
                                this.taskHolder.incrementNumTargetTasks(1);
103
                                this.taskAccessor.put(preparedTask);
104
                                this.taskHolder.addPreparedTask(preparedTask);
105
                                this.monitor.notifyPageTaskProduced(preparedTask);
106
                        }
107
                        ContentIndexer contentIndexer = this.sessionSource.getContentIndexer();
108
                        contentIndexer.putRowIndex(pageID, rowIndex);
109
                }
110
111
                @Override
112
                void startScanningSourceDirectory(SourceDirectory sourceDirectory) {
113
                }
114
115
                private PageTask prepareTask(PageTask task) throws SessionStopException {
116
                        PageTask storedTask = null;
117
                        try {
118
                                storedTask = this.taskAccessor.get(task.toString());
119
                        } catch (Exception ignore) {
120
                        }
121
122
                        if (isOnceExecutedTask(storedTask)) {
123
                                if (hasError(storedTask)) {
124
125
                                        this.monitor.notifyErrorPageTaskReproduced(storedTask);
126
127
                                        if (!isExecutionRequiredTaskWithError(task, storedTask)) {
128
                                                return null;
129
                                        }
130
131
                                } else if (storedTask.getTaskResult() != null) {
132
                                        if (!isExecutionRequiredTask(task, storedTask)) {
133
                                                return null;
134
                                        }
135
                                } else {
136
                                        this.numAdded++;
137
                                        Logger.getLogger("session").info("==========ADD\t" + task);
138
                                }
139
                        } else {
140
                                this.numAdded++;
141
                                Logger.getLogger("session").info("==========ADD\t" + task);
142
                        }
143
                        return task;
144
                }
145
146
                private boolean isOnceExecutedTask(AbstractTask storedTask) {
147
                        return storedTask != null;
148
                }
149
150
                private boolean isIgnorableTask(PageTask task) {
151
                        if (isConcurrentFileModificationSuspected(task)) {
152
                                if (INFO) {
153
                                        Logger.getLogger("source").info("IGNORE\t" + task);
154
                                }
155
                                return true;
156
                        }
157
                        /*
158
                         * if(isPrepareTaskd(task)){ if(INFO){
159
                         * Logger.getLogger("source").info("PREPARED\t"+task); } return
160
                         * true; } if(isLeasedTask(task)){ if(INFO){
161
                         * Logger.getLogger("source").info("LEASED\t"+task); } return true;
162
                         * }
163
                         */
164
                        return false;
165
                }
166
167
                private boolean isExecutionRequiredTaskWithError(AbstractTask task, AbstractTask storedTask) {
168
                        if (this.sessionSource.getSessionID() == storedTask.getSessionID()) {
169
                                if (INFO) {
170
                                        Logger.getLogger("source").info("IGNORE ERROR\t" + task);
171
                                }
172
                                return false;
173
                        } else {
174
                                this.numRetry++;
175
                                if (INFO) {
176
                                        Logger.getLogger("source").info("==========RETRY ERROR\t" + task);
177
                                }
178
                                return true;
179
                        }
180
                }
181
182
                private boolean isExecutionRequiredTask(AbstractTask task, AbstractTask storedTask) {
183
                        if (this.sessionSource.getSessionID() == storedTask.getSessionID()) {
184
                                if (INFO) {
185
                                        // Logger.getLogger("source").info("IGNORE\t" + task);
186
                                }
187
                                return false;
188
                        } else {
189
                                if (INFO) {
190
                                        // Logger.getLogger("source").info("REUSE\t" + task);
191
                                }
192
                                this.numReused++;
193
                                this.taskHolder.setNumReusedTasks(this.numReused);
194
                                return false;
195
                        }
196
                }
197
198
                /*
199
200
                private boolean isPreparedTask(AbstractTask task) {
201
                        return this.taskHolder.isPreparedTask(task);
202
                }
203
204
                private boolean isLeasedTask(AbstractTask task) {
205
                        return this.taskHolder.isLeasedTask(task);
206
                }
207
208
                 */
209
210
                private boolean hasError(AbstractPageTask task) {
211
                        return task.getTaskError() != null;
212
                }
213
214
                private boolean isConcurrentFileModificationSuspected(PageTask task) {
215
                        return this.newFileIgnoreSecThreshold != -1 && (this.now - task.getPageID().getFileResourceID().getLastModified()
216
                                        <= this.newFileIgnoreSecThreshold);
217
                }
218
219
                @Override
220
                void finishScan() {
221
                        StringBuilder sb = new StringBuilder(64);
222
                        sb.append("\nnumReused = " + this.numReused);
223
                        sb.append("\nnumAdded = " + this.numAdded);
224
                        sb.append("\nnumRetry = " + this.numRetry);
225
                        Logger.getLogger("session").info("TaskProducer\n\t" + sb.toString());
226
                }
227
        }
228
}