Mercurial Hosting > luan
comparison src/goodjava/lucene/logging/LoggingIndexWriter.java @ 1476:7d145095cc0b
lucene.logging check
| author | Franklin Schmidt <fschmidt@gmail.com> |
|---|---|
| date | Sun, 19 Apr 2020 20:42:26 -0600 |
| parents | c7b86342857f |
| children | 1fa6e8ec2d53 |
comparison
equal
deleted
inserted
replaced
| 1475:c7b86342857f | 1476:7d145095cc0b |
|---|---|
| 14 import java.util.ArrayList; | 14 import java.util.ArrayList; |
| 15 import java.util.Random; | 15 import java.util.Random; |
| 16 import org.apache.lucene.document.Document; | 16 import org.apache.lucene.document.Document; |
| 17 import org.apache.lucene.index.DirectoryReader; | 17 import org.apache.lucene.index.DirectoryReader; |
| 18 import org.apache.lucene.index.IndexReader; | 18 import org.apache.lucene.index.IndexReader; |
| 19 import org.apache.lucene.index.Term; | |
| 19 import org.apache.lucene.search.IndexSearcher; | 20 import org.apache.lucene.search.IndexSearcher; |
| 20 import org.apache.lucene.search.Query; | 21 import org.apache.lucene.search.Query; |
| 21 import org.apache.lucene.search.MatchAllDocsQuery; | 22 import org.apache.lucene.search.MatchAllDocsQuery; |
| 22 import org.apache.lucene.search.TopDocs; | 23 import org.apache.lucene.search.TopDocs; |
| 24 import org.apache.lucene.search.PrefixQuery; | |
| 25 import org.apache.lucene.search.SortField; | |
| 26 import org.apache.lucene.search.Sort; | |
| 23 import org.apache.lucene.store.Directory; | 27 import org.apache.lucene.store.Directory; |
| 24 import org.apache.lucene.store.FSDirectory; | 28 import org.apache.lucene.store.FSDirectory; |
| 25 import goodjava.io.IoUtils; | 29 import goodjava.io.IoUtils; |
| 26 import goodjava.lucene.api.GoodIndexWriter; | 30 import goodjava.lucene.api.GoodIndexWriter; |
| 27 import goodjava.lucene.api.LuceneIndexWriter; | 31 import goodjava.lucene.api.LuceneIndexWriter; |
| 61 for( int i=0; i<n; i++ ) { | 65 for( int i=0; i<n; i++ ) { |
| 62 File file = new File( logDir, dis.readUTF() ); | 66 File file = new File( logDir, dis.readUTF() ); |
| 63 logs.add( new LogFile(file,"rwd") ); | 67 logs.add( new LogFile(file,"rwd") ); |
| 64 } | 68 } |
| 65 deleteUnusedFiles(); | 69 deleteUnusedFiles(); |
| 66 log().gotoEnd(); | |
| 67 return; | 70 return; |
| 68 } | 71 } |
| 69 } finally { | 72 } finally { |
| 70 dis.close(); | 73 dis.close(); |
| 71 } | 74 } |
| 72 } | 75 } |
| 76 newLogs(); | |
| 77 } | |
| 78 | |
| 79 public synchronized boolean isMerging() { | |
| 80 return isMerging; | |
| 81 } | |
| 82 | |
| 83 private synchronized void isNotMerging() { | |
| 84 isMerging = false; | |
| 85 } | |
| 86 | |
| 87 public synchronized void newLogs() throws IOException { | |
| 88 if( isMerging ) | |
| 89 throw new RuntimeException("merging"); | |
| 90 logger.info("building new logs"); | |
| 91 logs.clear(); | |
| 73 for( int i=0; i<2; i++ ) { | 92 for( int i=0; i<2; i++ ) { |
| 74 logs.add( newLogFile() ); | 93 logs.add( newLogFile() ); |
| 75 } | 94 } |
| 76 isMerging = true; | 95 logLucene( System.currentTimeMillis(), logs.get(0), indexWriter ); |
| 77 new Thread(new Runnable(){public void run(){ | 96 writeIndex(); |
| 78 try { | 97 logger.info("done building new logs"); |
| 79 logLucene( System.currentTimeMillis(), logs.get(0), indexWriter ); | |
| 80 synchronized(LoggingIndexWriter.this) { | |
| 81 writeIndex(); | |
| 82 } | |
| 83 } catch(IOException e) { | |
| 84 throw new RuntimeException(e); | |
| 85 } finally { | |
| 86 synchronized(LoggingIndexWriter.this) { | |
| 87 isMerging = false; | |
| 88 } | |
| 89 } | |
| 90 }}).start(); | |
| 91 } | 98 } |
| 92 | 99 |
| 93 private static void logLucene(long time,LogFile log,LuceneIndexWriter indexWriter) throws IOException { | 100 private static void logLucene(long time,LogFile log,LuceneIndexWriter indexWriter) throws IOException { |
| 94 IndexReader reader = indexWriter.openReader(); | 101 IndexReader reader = indexWriter.openReader(); |
| 95 final IndexSearcher searcher = new IndexSearcher(reader); | 102 final IndexSearcher searcher = new IndexSearcher(reader); |
| 147 | 154 |
| 148 private void mergeLogs() throws IOException { | 155 private void mergeLogs() throws IOException { |
| 149 logger.info("merge"); | 156 logger.info("merge"); |
| 150 LogFile first = logs.get(0); | 157 LogFile first = logs.get(0); |
| 151 LogFile second = logs.get(1); | 158 LogFile second = logs.get(1); |
| 152 second.gotoEnd(); | 159 long lastTime = second.file.lastModified(); |
| 153 long lastTime = second.readLong(); | |
| 154 File dirFile = new File(logDir,"merge"); | 160 File dirFile = new File(logDir,"merge"); |
| 155 if( dirFile.exists() ) | 161 if( dirFile.exists() ) |
| 156 throw new RuntimeException(); | 162 throw new RuntimeException(); |
| 157 Directory dir = FSDirectory.open(dirFile); | 163 Directory dir = FSDirectory.open(dirFile); |
| 158 LuceneIndexWriter mergeWriter = new LuceneIndexWriter( indexWriter.luceneVersion, dir, indexWriter.goodConfig ); | 164 LuceneIndexWriter mergeWriter = new LuceneIndexWriter( indexWriter.luceneVersion, dir, indexWriter.goodConfig ); |
| 161 mergeWriter.commit(); | 167 mergeWriter.commit(); |
| 162 LogFile merge = newLogFile(); | 168 LogFile merge = newLogFile(); |
| 163 logLucene( lastTime, merge, mergeWriter ); | 169 logLucene( lastTime, merge, mergeWriter ); |
| 164 mergeWriter.close(); | 170 mergeWriter.close(); |
| 165 synchronized(this) { | 171 synchronized(this) { |
| 166 check(); | 172 //check(); |
| 167 logs.remove(0); | 173 logs.remove(0); |
| 168 logs.set(0,merge); | 174 logs.set(0,merge); |
| 169 writeIndex(); | 175 writeIndex(); |
| 170 check(); | 176 //check(null); |
| 171 } | 177 } |
| 172 } | 178 } |
| 173 private final Runnable mergeLogs = new Runnable() { public void run() { | 179 private final Runnable mergeLogs = new Runnable() { public void run() { |
| 174 try { | 180 try { |
| 175 mergeLogs(); | 181 mergeLogs(); |
| 176 /* | |
| 177 } catch(IOException e) { | 182 } catch(IOException e) { |
| 178 throw new RuntimeException(e); | 183 throw new RuntimeException(e); |
| 179 */ | |
| 180 } catch(Exception e) { | |
| 181 e.printStackTrace(); | |
| 182 System.exit(-1); | |
| 183 } finally { | 184 } finally { |
| 184 synchronized(LoggingIndexWriter.this) { | 185 isNotMerging(); |
| 185 isMerging = false; | |
| 186 } | |
| 187 } | 186 } |
| 188 } }; | 187 } }; |
| 189 | 188 |
| 190 private void check() throws IOException { | 189 private static class DocIter { |
| 191 File dirFile = new File(logDir,"check"); | 190 final IndexReader reader; |
| 192 if( dirFile.exists() ) | 191 final TopDocs td; |
| 193 throw new RuntimeException(); | 192 final int n; |
| 194 Directory dir = FSDirectory.open(dirFile); | 193 int i = 0; |
| 195 LuceneIndexWriter checkWriter = new LuceneIndexWriter( indexWriter.luceneVersion, dir, indexWriter.goodConfig ); | 194 |
| 196 playLog(checkWriter); | 195 DocIter(IndexReader reader,Query query,Sort sort) throws IOException { |
| 197 int nCheck = numDocs(checkWriter); | 196 this.reader = reader; |
| 198 int nOrig = numDocs(indexWriter); | 197 IndexSearcher searcher = new IndexSearcher(reader); |
| 199 if( nCheck != nOrig ) { | 198 this.td = searcher.search(query,10000000,sort); |
| 200 logger.error("nCheck = "+nCheck); | 199 this.n = td.scoreDocs.length; |
| 201 logger.error("nOrig = "+nOrig); | 200 if( td.totalHits != n ) |
| 202 //new Exception().printStackTrace(); | 201 throw new RuntimeException(); |
| 203 Thread.dumpStack(); | 202 } |
| 204 System.out.println(); | 203 |
| 205 System.out.println("indexWriter"); | 204 Document next() throws IOException { |
| 206 dump(indexWriter); | 205 return i < n ? reader.document(td.scoreDocs[i++].doc) : null; |
| 207 System.out.println("checkWriter"); | 206 } |
| 208 dump(checkWriter); | 207 } |
| 209 System.exit(-1); | 208 |
| 210 } | 209 public void check(SortField sortField) throws IOException { |
| 211 checkWriter.close(); | 210 IndexReader indexReader; |
| 212 IoUtils.deleteRecursively(dirFile); | 211 List<LogFile> logs; |
| 212 synchronized(this) { | |
| 213 if( isMerging ) { | |
| 214 logger.warn("is merging, check aborted"); | |
| 215 return; | |
| 216 } | |
| 217 isMerging = true; | |
| 218 indexReader = indexWriter.openReader(); | |
| 219 logs = new ArrayList<LogFile>(this.logs); | |
| 220 int i = logs.size() - 1; | |
| 221 LogFile last = logs.get(i); | |
| 222 logs.set(i,last.snapshot()); | |
| 223 } | |
| 224 try { | |
| 225 logger.info("check start"); | |
| 226 indexWriter.check(); | |
| 227 File dirFile = new File(logDir,"check"); | |
| 228 IoUtils.deleteRecursively(dirFile); | |
| 229 Directory dir = FSDirectory.open(dirFile); | |
| 230 LuceneIndexWriter checkWriter = new LuceneIndexWriter( indexWriter.luceneVersion, dir, indexWriter.goodConfig ); | |
| 231 playLogs(logs,checkWriter); | |
| 232 logger.info("check lucene"); | |
| 233 IndexReader checkReader = checkWriter.openReader(); | |
| 234 if( sortField == null ) { | |
| 235 int nCheck = checkReader.numDocs(); | |
| 236 int nOrig = indexReader.numDocs(); | |
| 237 if( nCheck != nOrig ) { | |
| 238 logger.error("numDocs mismatch: lucene="+nOrig+" logs="+nCheck); | |
| 239 } | |
| 240 logger.info("numDocs="+nOrig); | |
| 241 if( hash(indexReader) != hash(checkReader) ) { | |
| 242 logger.error("hash mismatch"); | |
| 243 } | |
| 244 } else { | |
| 245 Sort sort = new Sort(sortField); | |
| 246 String sortFieldName = sortField.getField(); | |
| 247 Query query = new PrefixQuery(new Term(sortFieldName)); | |
| 248 DocIter origIter = new DocIter(indexReader,query,sort); | |
| 249 DocIter checkIter = new DocIter(checkReader,query,sort); | |
| 250 Map<String,Object> origFields = LuceneUtils.toMap(origIter.next()); | |
| 251 Map<String,Object> checkFields = LuceneUtils.toMap(checkIter.next()); | |
| 252 while( origFields!=null && checkFields!=null ) { | |
| 253 Comparable origFld = (Comparable)origFields.get(sortFieldName); | |
| 254 Comparable checkFld = (Comparable)checkFields.get(sortFieldName); | |
| 255 int cmp = origFld.compareTo(checkFld); | |
| 256 if( cmp==0 ) { | |
| 257 if( !origFields.equals(checkFields) ) { | |
| 258 logger.error(sortFieldName+" "+origFld+" not equal"); | |
| 259 logger.error("lucene = "+origFields); | |
| 260 logger.error("logs = "+checkFields); | |
| 261 } | |
| 262 origFields = LuceneUtils.toMap(origIter.next()); | |
| 263 checkFields = LuceneUtils.toMap(checkIter.next()); | |
| 264 } else if( cmp < 0 ) { | |
| 265 logger.error(sortFieldName+" "+origFld+" found in lucene but not logs"); | |
| 266 origFields = LuceneUtils.toMap(origIter.next()); | |
| 267 } else { // > | |
| 268 logger.error(sortFieldName+" "+checkFld+" found in logs but not lucene"); | |
| 269 checkFields = LuceneUtils.toMap(checkIter.next()); | |
| 270 } | |
| 271 } | |
| 272 while( origFields!=null ) { | |
| 273 Comparable origFld = (Comparable)origFields.get(sortFieldName); | |
| 274 logger.error(sortFieldName+" "+origFld+" found in lucene but not logs"); | |
| 275 origFields = LuceneUtils.toMap(origIter.next()); | |
| 276 } | |
| 277 while( checkFields!=null ) { | |
| 278 Comparable checkFld = (Comparable)checkFields.get(sortFieldName); | |
| 279 logger.error(sortFieldName+" "+checkFld+" found in logs but not lucene"); | |
| 280 checkFields = LuceneUtils.toMap(checkIter.next()); | |
| 281 } | |
| 282 //logger.info("check done"); | |
| 283 } | |
| 284 checkReader.close(); | |
| 285 checkWriter.close(); | |
| 286 IoUtils.deleteRecursively(dirFile); | |
| 287 logger.info("check done"); | |
| 288 } finally { | |
| 289 indexReader.close(); | |
| 290 isNotMerging(); | |
| 291 } | |
| 292 } | |
| 293 | |
| 294 private static abstract class HashCollector extends GoodCollector { | |
| 295 int total = 0; | |
| 296 } | |
| 297 | |
| 298 private static int hash(IndexReader reader) throws IOException { | |
| 299 final IndexSearcher searcher = new IndexSearcher(reader); | |
| 300 Query query = new MatchAllDocsQuery(); | |
| 301 HashCollector col = new HashCollector() { | |
| 302 public void collectDoc(int iDoc) throws IOException { | |
| 303 Document doc = searcher.doc(iDoc); | |
| 304 Map<String,Object> storedFields = LuceneUtils.toMap(doc); | |
| 305 total += storedFields.hashCode(); | |
| 306 } | |
| 307 }; | |
| 308 searcher.search(query,col); | |
| 309 return col.total; | |
| 213 } | 310 } |
| 214 | 311 |
| 215 private LogFile log() { | 312 private LogFile log() { |
| 216 return logs.get(logs.size()-1); | 313 return logs.get(logs.size()-1); |
| 217 } | 314 } |
| 226 indexWriter.commit(); | 323 indexWriter.commit(); |
| 227 LogFile log = log(); | 324 LogFile log = log(); |
| 228 log.commit(); | 325 log.commit(); |
| 229 if( isMerging ) | 326 if( isMerging ) |
| 230 return; | 327 return; |
| 231 if( log.length() > logs.get(0).length() ) { | 328 if( log.end() > logs.get(0).end() ) { |
| 232 log.writeLong( System.currentTimeMillis() ); | |
| 233 logs.add( newLogFile() ); | 329 logs.add( newLogFile() ); |
| 234 writeIndex(); | 330 writeIndex(); |
| 235 } | 331 } |
| 236 if( logs.size() > 3 ) { | 332 if( logs.size() > 3 ) { |
| 237 isMerging = true; | 333 isMerging = true; |
| 238 // new Thread(mergeLogs).start(); | 334 new Thread(mergeLogs).start(); |
| 239 mergeLogs.run(); | 335 // mergeLogs.run(); |
| 240 } | 336 } |
| 241 } | 337 } |
| 242 | 338 |
| 243 public synchronized void rollback() throws IOException { | 339 public synchronized void rollback() throws IOException { |
| 244 indexWriter.rollback(); | 340 indexWriter.rollback(); |
| 245 LogFile log = log(); | 341 LogFile log = log(); |
| 246 log.gotoEnd(); | 342 log.rollback(); |
| 247 } | 343 } |
| 248 | 344 |
| 249 public synchronized void deleteAll() throws IOException { | 345 public synchronized void deleteAll() throws IOException { |
| 250 indexWriter.deleteAll(); | 346 indexWriter.deleteAll(); |
| 251 LogFile log = log(); | 347 LogFile log = log(); |
| 281 private void writeOp(LogFile log,int op) throws IOException { | 377 private void writeOp(LogFile log,int op) throws IOException { |
| 282 log.writeLong(System.currentTimeMillis()); | 378 log.writeLong(System.currentTimeMillis()); |
| 283 log.writeByte(op); | 379 log.writeByte(op); |
| 284 } | 380 } |
| 285 | 381 |
| 286 public synchronized void playLog() throws IOException { | 382 public synchronized void playLogs() throws IOException { |
| 287 playLog(indexWriter); | 383 playLogs(logs,indexWriter); |
| 288 } | 384 } |
| 289 | 385 |
| 290 private void playLog(LuceneIndexWriter indexWriter) throws IOException { | 386 private static void playLogs(List<LogFile> logs,LuceneIndexWriter indexWriter) throws IOException { |
| 291 if( numDocs(indexWriter) != 0 ) | 387 if( numDocs(indexWriter) != 0 ) |
| 292 throw new RuntimeException ("not empty"); | 388 throw new RuntimeException ("not empty"); |
| 293 for( LogFile log : logs ) { | 389 for( LogFile log : logs ) { |
| 294 playLog(log,indexWriter); | 390 playLog(log,indexWriter); |
| 295 } | 391 } |
| 302 reader.close(); | 398 reader.close(); |
| 303 return n; | 399 return n; |
| 304 } | 400 } |
| 305 | 401 |
| 306 private static void playLog(LogFile log,LuceneIndexWriter indexWriter) throws IOException { | 402 private static void playLog(LogFile log,LuceneIndexWriter indexWriter) throws IOException { |
| 307 log.gotoStart(); | 403 LogInputStream in = log.input(); |
| 308 while( log.hasMore() ) { | 404 while( in.available() > 0 ) { |
| 309 playOp(log,indexWriter); | 405 playOp(in,indexWriter); |
| 310 } | 406 } |
| 311 } | 407 } |
| 312 | 408 |
| 313 private static void playOp(LogFile log,LuceneIndexWriter indexWriter) throws IOException { | 409 private static void playOp(LogInputStream in,LuceneIndexWriter indexWriter) throws IOException { |
| 314 log.readLong(); // time | 410 in.readLong(); // time |
| 315 int op = log.readByte(); | 411 int op = in.readByte(); |
| 316 switch(op) { | 412 switch(op) { |
| 317 case OP_DELETE_ALL: | 413 case OP_DELETE_ALL: |
| 318 indexWriter.deleteAll(); | 414 indexWriter.deleteAll(); |
| 319 return; | 415 return; |
| 320 case OP_DELETE_DOCUMENTS: | 416 case OP_DELETE_DOCUMENTS: |
| 321 indexWriter.deleteDocuments( log.readQuery() ); | 417 indexWriter.deleteDocuments( in.readQuery() ); |
| 322 return; | 418 return; |
| 323 case OP_ADD_DOCUMENT: | 419 case OP_ADD_DOCUMENT: |
| 324 { | 420 { |
| 325 Map storedFields = log.readMap(); | 421 Map storedFields = in.readMap(); |
| 326 indexWriter.addDocument(storedFields); | 422 indexWriter.addDocument(storedFields); |
| 327 return; | 423 return; |
| 328 } | 424 } |
| 329 case OP_UPDATE_DOCUMENT: | 425 case OP_UPDATE_DOCUMENT: |
| 330 { | 426 { |
| 331 String keyFieldName = log.readUTF(); | 427 String keyFieldName = in.readUTF(); |
| 332 Map storedFields = log.readMap(); | 428 Map storedFields = in.readMap(); |
| 333 indexWriter.updateDocument(keyFieldName,storedFields); | 429 indexWriter.updateDocument(keyFieldName,storedFields); |
| 334 return; | 430 return; |
| 335 } | 431 } |
| 336 default: | 432 default: |
| 337 throw new RuntimeException("invalid op "+op); | 433 throw new RuntimeException("invalid op "+op); |
