comparison src/goodjava/lucene/logging/LoggingIndexWriter.java @ 1544:35601f15ecc3

add lucene log tag and restore_from_log
author Franklin Schmidt <fschmidt@gmail.com>
date Sun, 20 Sep 2020 20:36:55 -0600
parents c27dc6af87ca
children 736ec76bbf42
comparison
equal deleted inserted replaced
1543:1db694d98003 1544:35601f15ecc3
42 private static final int version = 1; 42 private static final int version = 1;
43 private static final int OP_DELETE_ALL = 1; 43 private static final int OP_DELETE_ALL = 1;
44 private static final int OP_DELETE_DOCUMENTS = 2; 44 private static final int OP_DELETE_DOCUMENTS = 2;
45 private static final int OP_ADD_DOCUMENT = 3; 45 private static final int OP_ADD_DOCUMENT = 3;
46 private static final int OP_UPDATE_DOCUMENT = 4; 46 private static final int OP_UPDATE_DOCUMENT = 4;
47 private static final int OP_TAG = 5;
47 private static final Random rnd = new Random(); 48 private static final Random rnd = new Random();
48 49
49 public final LuceneIndexWriter indexWriter; 50 public final LuceneIndexWriter indexWriter;
51 public boolean wasCreated;
50 private final File logDir; 52 private final File logDir;
51 protected final List<LogFile> logs = new ArrayList<LogFile>(); 53 protected final List<LogFile> logs = new ArrayList<LogFile>();
52 private LogOutputStream log; 54 private LogOutputStream log;
53 private final File index; 55 private final File index;
54 private final SemaphoreLock mergeLock = new SemaphoreLock(); 56 private final SemaphoreLock mergeLock = new SemaphoreLock();
69 File file = new File( logDir, dis.readUTF() ); 71 File file = new File( logDir, dis.readUTF() );
70 logs.add( new LogFile(file) ); 72 logs.add( new LogFile(file) );
71 } 73 }
72 deleteUnusedFiles(); 74 deleteUnusedFiles();
73 setLog(); 75 setLog();
76 wasCreated = false;
74 return; 77 return;
75 } 78 }
76 } finally { 79 } finally {
77 dis.close(); 80 dis.close();
78 } 81 }
79 } 82 }
80 newLogs(); 83 newLogs();
84 wasCreated = true;
81 } 85 }
82 86
83 public IndexWriter getLuceneIndexWriter() { 87 public IndexWriter getLuceneIndexWriter() {
84 return indexWriter.getLuceneIndexWriter(); 88 return indexWriter.getLuceneIndexWriter();
85 } 89 }
199 File dirFile = new File(logDir,"merge"); 203 File dirFile = new File(logDir,"merge");
200 if( dirFile.exists() ) 204 if( dirFile.exists() )
201 throw new RuntimeException(); 205 throw new RuntimeException();
202 Directory dir = FSDirectory.open(dirFile); 206 Directory dir = FSDirectory.open(dirFile);
203 LuceneIndexWriter mergeWriter = new LuceneIndexWriter( dir, indexWriter.goodConfig ); 207 LuceneIndexWriter mergeWriter = new LuceneIndexWriter( dir, indexWriter.goodConfig );
204 playLog( first.input(), mergeWriter ); 208 playLog( first.input(), mergeWriter, null );
205 playLog( second.input(), mergeWriter ); 209 playLog( second.input(), mergeWriter, null );
206 mergeWriter.commit(); 210 mergeWriter.commit();
207 LogFile merge = newLogFile(); 211 LogFile merge = newLogFile();
208 logLucene( lastTime, merge, mergeWriter ); 212 logLucene( lastTime, merge, mergeWriter );
209 mergeWriter.close(); 213 mergeWriter.close();
210 synchronized(this) { 214 synchronized(this) {
271 indexWriter.check(); 275 indexWriter.check();
272 File dirFile = new File(logDir,"check"); 276 File dirFile = new File(logDir,"check");
273 IoUtils.deleteRecursively(dirFile); 277 IoUtils.deleteRecursively(dirFile);
274 Directory dir = FSDirectory.open(dirFile); 278 Directory dir = FSDirectory.open(dirFile);
275 LuceneIndexWriter checkWriter = new LuceneIndexWriter( dir, indexWriter.goodConfig ); 279 LuceneIndexWriter checkWriter = new LuceneIndexWriter( dir, indexWriter.goodConfig );
276 playLogs(logReaders,checkWriter); 280 playLogs(logReaders,checkWriter,null);
277 //logger.info("check lucene"); 281 //logger.info("check lucene");
278 IndexReader checkReader = checkWriter.openReader(); 282 IndexReader checkReader = checkWriter.openReader();
283 int nCheck = checkReader.numDocs();
284 int nOrig = indexReader.numDocs();
285 if( nCheck != nOrig ) {
286 logger.error("numDocs mismatch: lucene="+nOrig+" logs="+nCheck);
287 ok = false;
288 }
279 if( sortField == null ) { 289 if( sortField == null ) {
280 int nCheck = checkReader.numDocs(); 290 if( ok && hash(indexReader) != hash(checkReader) ) {
281 int nOrig = indexReader.numDocs();
282 if( nCheck != nOrig ) {
283 logger.error("numDocs mismatch: lucene="+nOrig+" logs="+nCheck);
284 ok = false;
285 }
286 //logger.info("numDocs="+nOrig);
287 if( hash(indexReader) != hash(checkReader) ) {
288 logger.error("hash mismatch"); 291 logger.error("hash mismatch");
289 ok = false; 292 ok = false;
290 } 293 }
291 } else { 294 } else {
292 Sort sort = new Sort(sortField); 295 Sort sort = new Sort(sortField);
411 writeOp(OP_UPDATE_DOCUMENT); 414 writeOp(OP_UPDATE_DOCUMENT);
412 log.writeUTF(keyFieldName); 415 log.writeUTF(keyFieldName);
413 log.writeMap(storedFields); 416 log.writeMap(storedFields);
414 } 417 }
415 418
419 public synchronized void tag(String tag) throws IOException {
420 writeOp(OP_TAG);
421 log.writeUTF(tag);
422 }
423
416 public synchronized void reindexDocuments(String keyFieldName,Query query) throws IOException { 424 public synchronized void reindexDocuments(String keyFieldName,Query query) throws IOException {
417 indexWriter.reindexDocuments(keyFieldName,query); 425 indexWriter.reindexDocuments(keyFieldName,query);
418 } 426 }
419 427
420 private void writeOp(int op) throws IOException { 428 private void writeOp(int op) throws IOException {
421 log.writeLong(System.currentTimeMillis()); 429 log.writeLong(System.currentTimeMillis());
422 log.writeByte(op); 430 log.writeByte(op);
423 } 431 }
424 432
425 public synchronized void playLogs() throws IOException { 433 // return whether stopped at tag
426 playLogs( logReaders(logs), indexWriter ); 434 public synchronized boolean playLogs(String upToTag) throws IOException {
435 return playLogs( logReaders(logs), indexWriter, upToTag );
427 } 436 }
428 437
429 private static List<LogInputStream> logReaders(List<LogFile> logs) throws IOException { 438 private static List<LogInputStream> logReaders(List<LogFile> logs) throws IOException {
430 List<LogInputStream> logReaders = new ArrayList<LogInputStream>(); 439 List<LogInputStream> logReaders = new ArrayList<LogInputStream>();
431 for( LogFile log : logs ) { 440 for( LogFile log : logs ) {
432 logReaders.add( log.input() ); 441 logReaders.add( log.input() );
433 } 442 }
434 return logReaders; 443 return logReaders;
435 } 444 }
436 445
437 private static void playLogs(List<LogInputStream> logReaders,LuceneIndexWriter indexWriter) throws IOException { 446 private static boolean playLogs(List<LogInputStream> logReaders,LuceneIndexWriter indexWriter,String upToTag)
447 throws IOException
448 {
438 if( numDocs(indexWriter) != 0 ) 449 if( numDocs(indexWriter) != 0 )
439 throw new RuntimeException ("not empty"); 450 throw new RuntimeException ("not empty");
451 boolean rtn = false;
440 for( LogInputStream reader : logReaders ) { 452 for( LogInputStream reader : logReaders ) {
441 playLog(reader,indexWriter); 453 if( playLog(reader,indexWriter,upToTag) ) {
454 rtn = true;
455 break;
456 }
442 } 457 }
443 indexWriter.commit(); 458 indexWriter.commit();
459 return rtn;
444 } 460 }
445 461
446 private static int numDocs(LuceneIndexWriter indexWriter) throws IOException { 462 private static int numDocs(LuceneIndexWriter indexWriter) throws IOException {
447 IndexReader reader = indexWriter.openReader(); 463 IndexReader reader = indexWriter.openReader();
448 int n = reader.numDocs(); 464 int n = reader.numDocs();
449 reader.close(); 465 reader.close();
450 return n; 466 return n;
451 } 467 }
452 468
453 private static void playLog(LogInputStream in,LuceneIndexWriter indexWriter) throws IOException { 469 private static boolean playLog(LogInputStream in,LuceneIndexWriter indexWriter,String upToTag)
470 throws IOException
471 {
472 boolean rtn = false;
454 while( in.available() > 0 ) { 473 while( in.available() > 0 ) {
455 playOp(in,indexWriter); 474 if( playOp(in,indexWriter,upToTag) ) {
475 rtn = true;
476 break;
477 }
456 } 478 }
457 in.close(); 479 in.close();
458 } 480 return rtn;
459 481 }
460 private static void playOp(LogInputStream in,LuceneIndexWriter indexWriter) throws IOException { 482
483 private static boolean playOp(LogInputStream in,LuceneIndexWriter indexWriter,String upToTag) throws IOException {
461 in.readLong(); // time 484 in.readLong(); // time
462 int op = in.readByte(); 485 int op = in.readByte();
463 switch(op) { 486 switch(op) {
464 case OP_DELETE_ALL: 487 case OP_DELETE_ALL:
465 indexWriter.deleteAll(); 488 indexWriter.deleteAll();
466 return; 489 return false;
467 case OP_DELETE_DOCUMENTS: 490 case OP_DELETE_DOCUMENTS:
468 indexWriter.deleteDocuments( in.readQuery() ); 491 {
469 return; 492 Query query = in.readQuery();
493 //System.out.println("OP_DELETE_DOCUMENTS "+query);
494 indexWriter.deleteDocuments(query);
495 return false;
496 }
470 case OP_ADD_DOCUMENT: 497 case OP_ADD_DOCUMENT:
471 { 498 {
472 Map storedFields = in.readMap(); 499 Map storedFields = in.readMap();
473 indexWriter.addDocument(storedFields); 500 indexWriter.addDocument(storedFields);
474 return; 501 return false;
475 } 502 }
476 case OP_UPDATE_DOCUMENT: 503 case OP_UPDATE_DOCUMENT:
477 { 504 {
478 String keyFieldName = in.readUTF(); 505 String keyFieldName = in.readUTF();
479 Map storedFields = in.readMap(); 506 Map storedFields = in.readMap();
480 indexWriter.updateDocument(keyFieldName,storedFields); 507 indexWriter.updateDocument(keyFieldName,storedFields);
481 return; 508 return false;
509 }
510 case OP_TAG:
511 {
512 String tag = in.readUTF();
513 return tag.equals(upToTag);
482 } 514 }
483 default: 515 default:
484 throw new RuntimeException("invalid op "+op); 516 throw new RuntimeException("invalid op "+op);
485 } 517 }
486 } 518 }