Mercurial Hosting > luan
comparison src/goodjava/lucene/logging/LoggingIndexWriter.java @ 1476:7d145095cc0b
lucene.logging check
author | Franklin Schmidt <fschmidt@gmail.com> |
---|---|
date | Sun, 19 Apr 2020 20:42:26 -0600 |
parents | c7b86342857f |
children | 1fa6e8ec2d53 |
comparison
equal
deleted
inserted
replaced
1475:c7b86342857f | 1476:7d145095cc0b |
---|---|
14 import java.util.ArrayList; | 14 import java.util.ArrayList; |
15 import java.util.Random; | 15 import java.util.Random; |
16 import org.apache.lucene.document.Document; | 16 import org.apache.lucene.document.Document; |
17 import org.apache.lucene.index.DirectoryReader; | 17 import org.apache.lucene.index.DirectoryReader; |
18 import org.apache.lucene.index.IndexReader; | 18 import org.apache.lucene.index.IndexReader; |
19 import org.apache.lucene.index.Term; | |
19 import org.apache.lucene.search.IndexSearcher; | 20 import org.apache.lucene.search.IndexSearcher; |
20 import org.apache.lucene.search.Query; | 21 import org.apache.lucene.search.Query; |
21 import org.apache.lucene.search.MatchAllDocsQuery; | 22 import org.apache.lucene.search.MatchAllDocsQuery; |
22 import org.apache.lucene.search.TopDocs; | 23 import org.apache.lucene.search.TopDocs; |
24 import org.apache.lucene.search.PrefixQuery; | |
25 import org.apache.lucene.search.SortField; | |
26 import org.apache.lucene.search.Sort; | |
23 import org.apache.lucene.store.Directory; | 27 import org.apache.lucene.store.Directory; |
24 import org.apache.lucene.store.FSDirectory; | 28 import org.apache.lucene.store.FSDirectory; |
25 import goodjava.io.IoUtils; | 29 import goodjava.io.IoUtils; |
26 import goodjava.lucene.api.GoodIndexWriter; | 30 import goodjava.lucene.api.GoodIndexWriter; |
27 import goodjava.lucene.api.LuceneIndexWriter; | 31 import goodjava.lucene.api.LuceneIndexWriter; |
61 for( int i=0; i<n; i++ ) { | 65 for( int i=0; i<n; i++ ) { |
62 File file = new File( logDir, dis.readUTF() ); | 66 File file = new File( logDir, dis.readUTF() ); |
63 logs.add( new LogFile(file,"rwd") ); | 67 logs.add( new LogFile(file,"rwd") ); |
64 } | 68 } |
65 deleteUnusedFiles(); | 69 deleteUnusedFiles(); |
66 log().gotoEnd(); | |
67 return; | 70 return; |
68 } | 71 } |
69 } finally { | 72 } finally { |
70 dis.close(); | 73 dis.close(); |
71 } | 74 } |
72 } | 75 } |
76 newLogs(); | |
77 } | |
78 | |
79 public synchronized boolean isMerging() { | |
80 return isMerging; | |
81 } | |
82 | |
83 private synchronized void isNotMerging() { | |
84 isMerging = false; | |
85 } | |
86 | |
87 public synchronized void newLogs() throws IOException { | |
88 if( isMerging ) | |
89 throw new RuntimeException("merging"); | |
90 logger.info("building new logs"); | |
91 logs.clear(); | |
73 for( int i=0; i<2; i++ ) { | 92 for( int i=0; i<2; i++ ) { |
74 logs.add( newLogFile() ); | 93 logs.add( newLogFile() ); |
75 } | 94 } |
76 isMerging = true; | 95 logLucene( System.currentTimeMillis(), logs.get(0), indexWriter ); |
77 new Thread(new Runnable(){public void run(){ | 96 writeIndex(); |
78 try { | 97 logger.info("done building new logs"); |
79 logLucene( System.currentTimeMillis(), logs.get(0), indexWriter ); | |
80 synchronized(LoggingIndexWriter.this) { | |
81 writeIndex(); | |
82 } | |
83 } catch(IOException e) { | |
84 throw new RuntimeException(e); | |
85 } finally { | |
86 synchronized(LoggingIndexWriter.this) { | |
87 isMerging = false; | |
88 } | |
89 } | |
90 }}).start(); | |
91 } | 98 } |
92 | 99 |
93 private static void logLucene(long time,LogFile log,LuceneIndexWriter indexWriter) throws IOException { | 100 private static void logLucene(long time,LogFile log,LuceneIndexWriter indexWriter) throws IOException { |
94 IndexReader reader = indexWriter.openReader(); | 101 IndexReader reader = indexWriter.openReader(); |
95 final IndexSearcher searcher = new IndexSearcher(reader); | 102 final IndexSearcher searcher = new IndexSearcher(reader); |
147 | 154 |
148 private void mergeLogs() throws IOException { | 155 private void mergeLogs() throws IOException { |
149 logger.info("merge"); | 156 logger.info("merge"); |
150 LogFile first = logs.get(0); | 157 LogFile first = logs.get(0); |
151 LogFile second = logs.get(1); | 158 LogFile second = logs.get(1); |
152 second.gotoEnd(); | 159 long lastTime = second.file.lastModified(); |
153 long lastTime = second.readLong(); | |
154 File dirFile = new File(logDir,"merge"); | 160 File dirFile = new File(logDir,"merge"); |
155 if( dirFile.exists() ) | 161 if( dirFile.exists() ) |
156 throw new RuntimeException(); | 162 throw new RuntimeException(); |
157 Directory dir = FSDirectory.open(dirFile); | 163 Directory dir = FSDirectory.open(dirFile); |
158 LuceneIndexWriter mergeWriter = new LuceneIndexWriter( indexWriter.luceneVersion, dir, indexWriter.goodConfig ); | 164 LuceneIndexWriter mergeWriter = new LuceneIndexWriter( indexWriter.luceneVersion, dir, indexWriter.goodConfig ); |
161 mergeWriter.commit(); | 167 mergeWriter.commit(); |
162 LogFile merge = newLogFile(); | 168 LogFile merge = newLogFile(); |
163 logLucene( lastTime, merge, mergeWriter ); | 169 logLucene( lastTime, merge, mergeWriter ); |
164 mergeWriter.close(); | 170 mergeWriter.close(); |
165 synchronized(this) { | 171 synchronized(this) { |
166 check(); | 172 //check(); |
167 logs.remove(0); | 173 logs.remove(0); |
168 logs.set(0,merge); | 174 logs.set(0,merge); |
169 writeIndex(); | 175 writeIndex(); |
170 check(); | 176 //check(null); |
171 } | 177 } |
172 } | 178 } |
173 private final Runnable mergeLogs = new Runnable() { public void run() { | 179 private final Runnable mergeLogs = new Runnable() { public void run() { |
174 try { | 180 try { |
175 mergeLogs(); | 181 mergeLogs(); |
176 /* | |
177 } catch(IOException e) { | 182 } catch(IOException e) { |
178 throw new RuntimeException(e); | 183 throw new RuntimeException(e); |
179 */ | |
180 } catch(Exception e) { | |
181 e.printStackTrace(); | |
182 System.exit(-1); | |
183 } finally { | 184 } finally { |
184 synchronized(LoggingIndexWriter.this) { | 185 isNotMerging(); |
185 isMerging = false; | |
186 } | |
187 } | 186 } |
188 } }; | 187 } }; |
189 | 188 |
190 private void check() throws IOException { | 189 private static class DocIter { |
191 File dirFile = new File(logDir,"check"); | 190 final IndexReader reader; |
192 if( dirFile.exists() ) | 191 final TopDocs td; |
193 throw new RuntimeException(); | 192 final int n; |
194 Directory dir = FSDirectory.open(dirFile); | 193 int i = 0; |
195 LuceneIndexWriter checkWriter = new LuceneIndexWriter( indexWriter.luceneVersion, dir, indexWriter.goodConfig ); | 194 |
196 playLog(checkWriter); | 195 DocIter(IndexReader reader,Query query,Sort sort) throws IOException { |
197 int nCheck = numDocs(checkWriter); | 196 this.reader = reader; |
198 int nOrig = numDocs(indexWriter); | 197 IndexSearcher searcher = new IndexSearcher(reader); |
199 if( nCheck != nOrig ) { | 198 this.td = searcher.search(query,10000000,sort); |
200 logger.error("nCheck = "+nCheck); | 199 this.n = td.scoreDocs.length; |
201 logger.error("nOrig = "+nOrig); | 200 if( td.totalHits != n ) |
202 //new Exception().printStackTrace(); | 201 throw new RuntimeException(); |
203 Thread.dumpStack(); | 202 } |
204 System.out.println(); | 203 |
205 System.out.println("indexWriter"); | 204 Document next() throws IOException { |
206 dump(indexWriter); | 205 return i < n ? reader.document(td.scoreDocs[i++].doc) : null; |
207 System.out.println("checkWriter"); | 206 } |
208 dump(checkWriter); | 207 } |
209 System.exit(-1); | 208 |
210 } | 209 public void check(SortField sortField) throws IOException { |
211 checkWriter.close(); | 210 IndexReader indexReader; |
212 IoUtils.deleteRecursively(dirFile); | 211 List<LogFile> logs; |
212 synchronized(this) { | |
213 if( isMerging ) { | |
214 logger.warn("is merging, check aborted"); | |
215 return; | |
216 } | |
217 isMerging = true; | |
218 indexReader = indexWriter.openReader(); | |
219 logs = new ArrayList<LogFile>(this.logs); | |
220 int i = logs.size() - 1; | |
221 LogFile last = logs.get(i); | |
222 logs.set(i,last.snapshot()); | |
223 } | |
224 try { | |
225 logger.info("check start"); | |
226 indexWriter.check(); | |
227 File dirFile = new File(logDir,"check"); | |
228 IoUtils.deleteRecursively(dirFile); | |
229 Directory dir = FSDirectory.open(dirFile); | |
230 LuceneIndexWriter checkWriter = new LuceneIndexWriter( indexWriter.luceneVersion, dir, indexWriter.goodConfig ); | |
231 playLogs(logs,checkWriter); | |
232 logger.info("check lucene"); | |
233 IndexReader checkReader = checkWriter.openReader(); | |
234 if( sortField == null ) { | |
235 int nCheck = checkReader.numDocs(); | |
236 int nOrig = indexReader.numDocs(); | |
237 if( nCheck != nOrig ) { | |
238 logger.error("numDocs mismatch: lucene="+nOrig+" logs="+nCheck); | |
239 } | |
240 logger.info("numDocs="+nOrig); | |
241 if( hash(indexReader) != hash(checkReader) ) { | |
242 logger.error("hash mismatch"); | |
243 } | |
244 } else { | |
245 Sort sort = new Sort(sortField); | |
246 String sortFieldName = sortField.getField(); | |
247 Query query = new PrefixQuery(new Term(sortFieldName)); | |
248 DocIter origIter = new DocIter(indexReader,query,sort); | |
249 DocIter checkIter = new DocIter(checkReader,query,sort); | |
250 Map<String,Object> origFields = LuceneUtils.toMap(origIter.next()); | |
251 Map<String,Object> checkFields = LuceneUtils.toMap(checkIter.next()); | |
252 while( origFields!=null && checkFields!=null ) { | |
253 Comparable origFld = (Comparable)origFields.get(sortFieldName); | |
254 Comparable checkFld = (Comparable)checkFields.get(sortFieldName); | |
255 int cmp = origFld.compareTo(checkFld); | |
256 if( cmp==0 ) { | |
257 if( !origFields.equals(checkFields) ) { | |
258 logger.error(sortFieldName+" "+origFld+" not equal"); | |
259 logger.error("lucene = "+origFields); | |
260 logger.error("logs = "+checkFields); | |
261 } | |
262 origFields = LuceneUtils.toMap(origIter.next()); | |
263 checkFields = LuceneUtils.toMap(checkIter.next()); | |
264 } else if( cmp < 0 ) { | |
265 logger.error(sortFieldName+" "+origFld+" found in lucene but not logs"); | |
266 origFields = LuceneUtils.toMap(origIter.next()); | |
267 } else { // > | |
268 logger.error(sortFieldName+" "+checkFld+" found in logs but not lucene"); | |
269 checkFields = LuceneUtils.toMap(checkIter.next()); | |
270 } | |
271 } | |
272 while( origFields!=null ) { | |
273 Comparable origFld = (Comparable)origFields.get(sortFieldName); | |
274 logger.error(sortFieldName+" "+origFld+" found in lucene but not logs"); | |
275 origFields = LuceneUtils.toMap(origIter.next()); | |
276 } | |
277 while( checkFields!=null ) { | |
278 Comparable checkFld = (Comparable)checkFields.get(sortFieldName); | |
279 logger.error(sortFieldName+" "+checkFld+" found in logs but not lucene"); | |
280 checkFields = LuceneUtils.toMap(checkIter.next()); | |
281 } | |
282 //logger.info("check done"); | |
283 } | |
284 checkReader.close(); | |
285 checkWriter.close(); | |
286 IoUtils.deleteRecursively(dirFile); | |
287 logger.info("check done"); | |
288 } finally { | |
289 indexReader.close(); | |
290 isNotMerging(); | |
291 } | |
292 } | |
293 | |
294 private static abstract class HashCollector extends GoodCollector { | |
295 int total = 0; | |
296 } | |
297 | |
298 private static int hash(IndexReader reader) throws IOException { | |
299 final IndexSearcher searcher = new IndexSearcher(reader); | |
300 Query query = new MatchAllDocsQuery(); | |
301 HashCollector col = new HashCollector() { | |
302 public void collectDoc(int iDoc) throws IOException { | |
303 Document doc = searcher.doc(iDoc); | |
304 Map<String,Object> storedFields = LuceneUtils.toMap(doc); | |
305 total += storedFields.hashCode(); | |
306 } | |
307 }; | |
308 searcher.search(query,col); | |
309 return col.total; | |
213 } | 310 } |
214 | 311 |
215 private LogFile log() { | 312 private LogFile log() { |
216 return logs.get(logs.size()-1); | 313 return logs.get(logs.size()-1); |
217 } | 314 } |
226 indexWriter.commit(); | 323 indexWriter.commit(); |
227 LogFile log = log(); | 324 LogFile log = log(); |
228 log.commit(); | 325 log.commit(); |
229 if( isMerging ) | 326 if( isMerging ) |
230 return; | 327 return; |
231 if( log.length() > logs.get(0).length() ) { | 328 if( log.end() > logs.get(0).end() ) { |
232 log.writeLong( System.currentTimeMillis() ); | |
233 logs.add( newLogFile() ); | 329 logs.add( newLogFile() ); |
234 writeIndex(); | 330 writeIndex(); |
235 } | 331 } |
236 if( logs.size() > 3 ) { | 332 if( logs.size() > 3 ) { |
237 isMerging = true; | 333 isMerging = true; |
238 // new Thread(mergeLogs).start(); | 334 new Thread(mergeLogs).start(); |
239 mergeLogs.run(); | 335 // mergeLogs.run(); |
240 } | 336 } |
241 } | 337 } |
242 | 338 |
243 public synchronized void rollback() throws IOException { | 339 public synchronized void rollback() throws IOException { |
244 indexWriter.rollback(); | 340 indexWriter.rollback(); |
245 LogFile log = log(); | 341 LogFile log = log(); |
246 log.gotoEnd(); | 342 log.rollback(); |
247 } | 343 } |
248 | 344 |
249 public synchronized void deleteAll() throws IOException { | 345 public synchronized void deleteAll() throws IOException { |
250 indexWriter.deleteAll(); | 346 indexWriter.deleteAll(); |
251 LogFile log = log(); | 347 LogFile log = log(); |
281 private void writeOp(LogFile log,int op) throws IOException { | 377 private void writeOp(LogFile log,int op) throws IOException { |
282 log.writeLong(System.currentTimeMillis()); | 378 log.writeLong(System.currentTimeMillis()); |
283 log.writeByte(op); | 379 log.writeByte(op); |
284 } | 380 } |
285 | 381 |
286 public synchronized void playLog() throws IOException { | 382 public synchronized void playLogs() throws IOException { |
287 playLog(indexWriter); | 383 playLogs(logs,indexWriter); |
288 } | 384 } |
289 | 385 |
290 private void playLog(LuceneIndexWriter indexWriter) throws IOException { | 386 private static void playLogs(List<LogFile> logs,LuceneIndexWriter indexWriter) throws IOException { |
291 if( numDocs(indexWriter) != 0 ) | 387 if( numDocs(indexWriter) != 0 ) |
292 throw new RuntimeException ("not empty"); | 388 throw new RuntimeException ("not empty"); |
293 for( LogFile log : logs ) { | 389 for( LogFile log : logs ) { |
294 playLog(log,indexWriter); | 390 playLog(log,indexWriter); |
295 } | 391 } |
302 reader.close(); | 398 reader.close(); |
303 return n; | 399 return n; |
304 } | 400 } |
305 | 401 |
306 private static void playLog(LogFile log,LuceneIndexWriter indexWriter) throws IOException { | 402 private static void playLog(LogFile log,LuceneIndexWriter indexWriter) throws IOException { |
307 log.gotoStart(); | 403 LogInputStream in = log.input(); |
308 while( log.hasMore() ) { | 404 while( in.available() > 0 ) { |
309 playOp(log,indexWriter); | 405 playOp(in,indexWriter); |
310 } | 406 } |
311 } | 407 } |
312 | 408 |
313 private static void playOp(LogFile log,LuceneIndexWriter indexWriter) throws IOException { | 409 private static void playOp(LogInputStream in,LuceneIndexWriter indexWriter) throws IOException { |
314 log.readLong(); // time | 410 in.readLong(); // time |
315 int op = log.readByte(); | 411 int op = in.readByte(); |
316 switch(op) { | 412 switch(op) { |
317 case OP_DELETE_ALL: | 413 case OP_DELETE_ALL: |
318 indexWriter.deleteAll(); | 414 indexWriter.deleteAll(); |
319 return; | 415 return; |
320 case OP_DELETE_DOCUMENTS: | 416 case OP_DELETE_DOCUMENTS: |
321 indexWriter.deleteDocuments( log.readQuery() ); | 417 indexWriter.deleteDocuments( in.readQuery() ); |
322 return; | 418 return; |
323 case OP_ADD_DOCUMENT: | 419 case OP_ADD_DOCUMENT: |
324 { | 420 { |
325 Map storedFields = log.readMap(); | 421 Map storedFields = in.readMap(); |
326 indexWriter.addDocument(storedFields); | 422 indexWriter.addDocument(storedFields); |
327 return; | 423 return; |
328 } | 424 } |
329 case OP_UPDATE_DOCUMENT: | 425 case OP_UPDATE_DOCUMENT: |
330 { | 426 { |
331 String keyFieldName = log.readUTF(); | 427 String keyFieldName = in.readUTF(); |
332 Map storedFields = log.readMap(); | 428 Map storedFields = in.readMap(); |
333 indexWriter.updateDocument(keyFieldName,storedFields); | 429 indexWriter.updateDocument(keyFieldName,storedFields); |
334 return; | 430 return; |
335 } | 431 } |
336 default: | 432 default: |
337 throw new RuntimeException("invalid op "+op); | 433 throw new RuntimeException("invalid op "+op); |