Mercurial Hosting > luan
comparison src/goodjava/lucene/logging/LoggingIndexWriter.java @ 1544:35601f15ecc3
add lucene log tag and restore_from_log
| author | Franklin Schmidt <fschmidt@gmail.com> |
|---|---|
| date | Sun, 20 Sep 2020 20:36:55 -0600 |
| parents | c27dc6af87ca |
| children | 736ec76bbf42 |
comparison
equal
deleted
inserted
replaced
| 1543:1db694d98003 | 1544:35601f15ecc3 |
|---|---|
| 42 private static final int version = 1; | 42 private static final int version = 1; |
| 43 private static final int OP_DELETE_ALL = 1; | 43 private static final int OP_DELETE_ALL = 1; |
| 44 private static final int OP_DELETE_DOCUMENTS = 2; | 44 private static final int OP_DELETE_DOCUMENTS = 2; |
| 45 private static final int OP_ADD_DOCUMENT = 3; | 45 private static final int OP_ADD_DOCUMENT = 3; |
| 46 private static final int OP_UPDATE_DOCUMENT = 4; | 46 private static final int OP_UPDATE_DOCUMENT = 4; |
| 47 private static final int OP_TAG = 5; | |
| 47 private static final Random rnd = new Random(); | 48 private static final Random rnd = new Random(); |
| 48 | 49 |
| 49 public final LuceneIndexWriter indexWriter; | 50 public final LuceneIndexWriter indexWriter; |
| 51 public boolean wasCreated; | |
| 50 private final File logDir; | 52 private final File logDir; |
| 51 protected final List<LogFile> logs = new ArrayList<LogFile>(); | 53 protected final List<LogFile> logs = new ArrayList<LogFile>(); |
| 52 private LogOutputStream log; | 54 private LogOutputStream log; |
| 53 private final File index; | 55 private final File index; |
| 54 private final SemaphoreLock mergeLock = new SemaphoreLock(); | 56 private final SemaphoreLock mergeLock = new SemaphoreLock(); |
| 69 File file = new File( logDir, dis.readUTF() ); | 71 File file = new File( logDir, dis.readUTF() ); |
| 70 logs.add( new LogFile(file) ); | 72 logs.add( new LogFile(file) ); |
| 71 } | 73 } |
| 72 deleteUnusedFiles(); | 74 deleteUnusedFiles(); |
| 73 setLog(); | 75 setLog(); |
| 76 wasCreated = false; | |
| 74 return; | 77 return; |
| 75 } | 78 } |
| 76 } finally { | 79 } finally { |
| 77 dis.close(); | 80 dis.close(); |
| 78 } | 81 } |
| 79 } | 82 } |
| 80 newLogs(); | 83 newLogs(); |
| 84 wasCreated = true; | |
| 81 } | 85 } |
| 82 | 86 |
| 83 public IndexWriter getLuceneIndexWriter() { | 87 public IndexWriter getLuceneIndexWriter() { |
| 84 return indexWriter.getLuceneIndexWriter(); | 88 return indexWriter.getLuceneIndexWriter(); |
| 85 } | 89 } |
| 199 File dirFile = new File(logDir,"merge"); | 203 File dirFile = new File(logDir,"merge"); |
| 200 if( dirFile.exists() ) | 204 if( dirFile.exists() ) |
| 201 throw new RuntimeException(); | 205 throw new RuntimeException(); |
| 202 Directory dir = FSDirectory.open(dirFile); | 206 Directory dir = FSDirectory.open(dirFile); |
| 203 LuceneIndexWriter mergeWriter = new LuceneIndexWriter( dir, indexWriter.goodConfig ); | 207 LuceneIndexWriter mergeWriter = new LuceneIndexWriter( dir, indexWriter.goodConfig ); |
| 204 playLog( first.input(), mergeWriter ); | 208 playLog( first.input(), mergeWriter, null ); |
| 205 playLog( second.input(), mergeWriter ); | 209 playLog( second.input(), mergeWriter, null ); |
| 206 mergeWriter.commit(); | 210 mergeWriter.commit(); |
| 207 LogFile merge = newLogFile(); | 211 LogFile merge = newLogFile(); |
| 208 logLucene( lastTime, merge, mergeWriter ); | 212 logLucene( lastTime, merge, mergeWriter ); |
| 209 mergeWriter.close(); | 213 mergeWriter.close(); |
| 210 synchronized(this) { | 214 synchronized(this) { |
| 271 indexWriter.check(); | 275 indexWriter.check(); |
| 272 File dirFile = new File(logDir,"check"); | 276 File dirFile = new File(logDir,"check"); |
| 273 IoUtils.deleteRecursively(dirFile); | 277 IoUtils.deleteRecursively(dirFile); |
| 274 Directory dir = FSDirectory.open(dirFile); | 278 Directory dir = FSDirectory.open(dirFile); |
| 275 LuceneIndexWriter checkWriter = new LuceneIndexWriter( dir, indexWriter.goodConfig ); | 279 LuceneIndexWriter checkWriter = new LuceneIndexWriter( dir, indexWriter.goodConfig ); |
| 276 playLogs(logReaders,checkWriter); | 280 playLogs(logReaders,checkWriter,null); |
| 277 //logger.info("check lucene"); | 281 //logger.info("check lucene"); |
| 278 IndexReader checkReader = checkWriter.openReader(); | 282 IndexReader checkReader = checkWriter.openReader(); |
| 283 int nCheck = checkReader.numDocs(); | |
| 284 int nOrig = indexReader.numDocs(); | |
| 285 if( nCheck != nOrig ) { | |
| 286 logger.error("numDocs mismatch: lucene="+nOrig+" logs="+nCheck); | |
| 287 ok = false; | |
| 288 } | |
| 279 if( sortField == null ) { | 289 if( sortField == null ) { |
| 280 int nCheck = checkReader.numDocs(); | 290 if( ok && hash(indexReader) != hash(checkReader) ) { |
| 281 int nOrig = indexReader.numDocs(); | |
| 282 if( nCheck != nOrig ) { | |
| 283 logger.error("numDocs mismatch: lucene="+nOrig+" logs="+nCheck); | |
| 284 ok = false; | |
| 285 } | |
| 286 //logger.info("numDocs="+nOrig); | |
| 287 if( hash(indexReader) != hash(checkReader) ) { | |
| 288 logger.error("hash mismatch"); | 291 logger.error("hash mismatch"); |
| 289 ok = false; | 292 ok = false; |
| 290 } | 293 } |
| 291 } else { | 294 } else { |
| 292 Sort sort = new Sort(sortField); | 295 Sort sort = new Sort(sortField); |
| 411 writeOp(OP_UPDATE_DOCUMENT); | 414 writeOp(OP_UPDATE_DOCUMENT); |
| 412 log.writeUTF(keyFieldName); | 415 log.writeUTF(keyFieldName); |
| 413 log.writeMap(storedFields); | 416 log.writeMap(storedFields); |
| 414 } | 417 } |
| 415 | 418 |
| 419 public synchronized void tag(String tag) throws IOException { | |
| 420 writeOp(OP_TAG); | |
| 421 log.writeUTF(tag); | |
| 422 } | |
| 423 | |
| 416 public synchronized void reindexDocuments(String keyFieldName,Query query) throws IOException { | 424 public synchronized void reindexDocuments(String keyFieldName,Query query) throws IOException { |
| 417 indexWriter.reindexDocuments(keyFieldName,query); | 425 indexWriter.reindexDocuments(keyFieldName,query); |
| 418 } | 426 } |
| 419 | 427 |
| 420 private void writeOp(int op) throws IOException { | 428 private void writeOp(int op) throws IOException { |
| 421 log.writeLong(System.currentTimeMillis()); | 429 log.writeLong(System.currentTimeMillis()); |
| 422 log.writeByte(op); | 430 log.writeByte(op); |
| 423 } | 431 } |
| 424 | 432 |
| 425 public synchronized void playLogs() throws IOException { | 433 // return whether stopped at tag |
| 426 playLogs( logReaders(logs), indexWriter ); | 434 public synchronized boolean playLogs(String upToTag) throws IOException { |
| 435 return playLogs( logReaders(logs), indexWriter, upToTag ); | |
| 427 } | 436 } |
| 428 | 437 |
| 429 private static List<LogInputStream> logReaders(List<LogFile> logs) throws IOException { | 438 private static List<LogInputStream> logReaders(List<LogFile> logs) throws IOException { |
| 430 List<LogInputStream> logReaders = new ArrayList<LogInputStream>(); | 439 List<LogInputStream> logReaders = new ArrayList<LogInputStream>(); |
| 431 for( LogFile log : logs ) { | 440 for( LogFile log : logs ) { |
| 432 logReaders.add( log.input() ); | 441 logReaders.add( log.input() ); |
| 433 } | 442 } |
| 434 return logReaders; | 443 return logReaders; |
| 435 } | 444 } |
| 436 | 445 |
| 437 private static void playLogs(List<LogInputStream> logReaders,LuceneIndexWriter indexWriter) throws IOException { | 446 private static boolean playLogs(List<LogInputStream> logReaders,LuceneIndexWriter indexWriter,String upToTag) |
| 447 throws IOException | |
| 448 { | |
| 438 if( numDocs(indexWriter) != 0 ) | 449 if( numDocs(indexWriter) != 0 ) |
| 439 throw new RuntimeException ("not empty"); | 450 throw new RuntimeException ("not empty"); |
| 451 boolean rtn = false; | |
| 440 for( LogInputStream reader : logReaders ) { | 452 for( LogInputStream reader : logReaders ) { |
| 441 playLog(reader,indexWriter); | 453 if( playLog(reader,indexWriter,upToTag) ) { |
| 454 rtn = true; | |
| 455 break; | |
| 456 } | |
| 442 } | 457 } |
| 443 indexWriter.commit(); | 458 indexWriter.commit(); |
| 459 return rtn; | |
| 444 } | 460 } |
| 445 | 461 |
| 446 private static int numDocs(LuceneIndexWriter indexWriter) throws IOException { | 462 private static int numDocs(LuceneIndexWriter indexWriter) throws IOException { |
| 447 IndexReader reader = indexWriter.openReader(); | 463 IndexReader reader = indexWriter.openReader(); |
| 448 int n = reader.numDocs(); | 464 int n = reader.numDocs(); |
| 449 reader.close(); | 465 reader.close(); |
| 450 return n; | 466 return n; |
| 451 } | 467 } |
| 452 | 468 |
| 453 private static void playLog(LogInputStream in,LuceneIndexWriter indexWriter) throws IOException { | 469 private static boolean playLog(LogInputStream in,LuceneIndexWriter indexWriter,String upToTag) |
| 470 throws IOException | |
| 471 { | |
| 472 boolean rtn = false; | |
| 454 while( in.available() > 0 ) { | 473 while( in.available() > 0 ) { |
| 455 playOp(in,indexWriter); | 474 if( playOp(in,indexWriter,upToTag) ) { |
| 475 rtn = true; | |
| 476 break; | |
| 477 } | |
| 456 } | 478 } |
| 457 in.close(); | 479 in.close(); |
| 458 } | 480 return rtn; |
| 459 | 481 } |
| 460 private static void playOp(LogInputStream in,LuceneIndexWriter indexWriter) throws IOException { | 482 |
| 483 private static boolean playOp(LogInputStream in,LuceneIndexWriter indexWriter,String upToTag) throws IOException { | |
| 461 in.readLong(); // time | 484 in.readLong(); // time |
| 462 int op = in.readByte(); | 485 int op = in.readByte(); |
| 463 switch(op) { | 486 switch(op) { |
| 464 case OP_DELETE_ALL: | 487 case OP_DELETE_ALL: |
| 465 indexWriter.deleteAll(); | 488 indexWriter.deleteAll(); |
| 466 return; | 489 return false; |
| 467 case OP_DELETE_DOCUMENTS: | 490 case OP_DELETE_DOCUMENTS: |
| 468 indexWriter.deleteDocuments( in.readQuery() ); | 491 { |
| 469 return; | 492 Query query = in.readQuery(); |
| 493 //System.out.println("OP_DELETE_DOCUMENTS "+query); | |
| 494 indexWriter.deleteDocuments(query); | |
| 495 return false; | |
| 496 } | |
| 470 case OP_ADD_DOCUMENT: | 497 case OP_ADD_DOCUMENT: |
| 471 { | 498 { |
| 472 Map storedFields = in.readMap(); | 499 Map storedFields = in.readMap(); |
| 473 indexWriter.addDocument(storedFields); | 500 indexWriter.addDocument(storedFields); |
| 474 return; | 501 return false; |
| 475 } | 502 } |
| 476 case OP_UPDATE_DOCUMENT: | 503 case OP_UPDATE_DOCUMENT: |
| 477 { | 504 { |
| 478 String keyFieldName = in.readUTF(); | 505 String keyFieldName = in.readUTF(); |
| 479 Map storedFields = in.readMap(); | 506 Map storedFields = in.readMap(); |
| 480 indexWriter.updateDocument(keyFieldName,storedFields); | 507 indexWriter.updateDocument(keyFieldName,storedFields); |
| 481 return; | 508 return false; |
| 509 } | |
| 510 case OP_TAG: | |
| 511 { | |
| 512 String tag = in.readUTF(); | |
| 513 return tag.equals(upToTag); | |
| 482 } | 514 } |
| 483 default: | 515 default: |
| 484 throw new RuntimeException("invalid op "+op); | 516 throw new RuntimeException("invalid op "+op); |
| 485 } | 517 } |
| 486 } | 518 } |
