comparison src/goodjava/lucene/logging/LoggingIndexWriter.java @ 1549:41c32da4cbd1

lucene log work
author Franklin Schmidt <fschmidt@gmail.com>
date Sat, 03 Oct 2020 20:55:26 -0600
parents 736ec76bbf42
children 9cc4cee39b8b
comparison
equal deleted inserted replaced
1548:736ec76bbf42 1549:41c32da4cbd1
25 import org.apache.lucene.search.SortField; 25 import org.apache.lucene.search.SortField;
26 import org.apache.lucene.search.Sort; 26 import org.apache.lucene.search.Sort;
27 import org.apache.lucene.store.Directory; 27 import org.apache.lucene.store.Directory;
28 import org.apache.lucene.store.FSDirectory; 28 import org.apache.lucene.store.FSDirectory;
29 import goodjava.io.IoUtils; 29 import goodjava.io.IoUtils;
30 import goodjava.lucene.api.GoodWriter;
31 import goodjava.lucene.api.GoodIndexWriter; 30 import goodjava.lucene.api.GoodIndexWriter;
32 import goodjava.lucene.api.LuceneIndexWriter; 31 import goodjava.lucene.api.LuceneIndexWriter;
33 import goodjava.lucene.api.GoodCollector; 32 import goodjava.lucene.api.GoodCollector;
34 import goodjava.lucene.api.LuceneUtils; 33 import goodjava.lucene.api.LuceneUtils;
35 import goodjava.logging.Logger; 34 import goodjava.logging.Logger;
222 File dirFile = new File(logDir,"merge"); 221 File dirFile = new File(logDir,"merge");
223 if( dirFile.exists() ) 222 if( dirFile.exists() )
224 throw new RuntimeException(); 223 throw new RuntimeException();
225 Directory dir = FSDirectory.open(dirFile); 224 Directory dir = FSDirectory.open(dirFile);
226 LuceneIndexWriter mergeWriter = new LuceneIndexWriter( dir, indexWriter.goodConfig ); 225 LuceneIndexWriter mergeWriter = new LuceneIndexWriter( dir, indexWriter.goodConfig );
227 playLog( first.input(), mergeWriter ); 226 OpDoer opDoer = new OpDoer(mergeWriter);
228 playLog( second.input(), mergeWriter ); 227 playLog( first.input(), opDoer );
228 playLog( second.input(), opDoer );
229 mergeWriter.commit(); 229 mergeWriter.commit();
230 LogFile merge = newLogFile(); 230 LogFile merge = newLogFile();
231 LogOutputStream log = merge.output(); 231 LogOutputStream log = merge.output();
232 logLucene( lastTime, log, mergeWriter ); 232 logLucene( lastTime, log, mergeWriter );
233 log.close(); 233 log.close();
298 indexWriter.check(); 298 indexWriter.check();
299 File dirFile = new File(logDir,"check"); 299 File dirFile = new File(logDir,"check");
300 IoUtils.deleteRecursively(dirFile); 300 IoUtils.deleteRecursively(dirFile);
301 Directory dir = FSDirectory.open(dirFile); 301 Directory dir = FSDirectory.open(dirFile);
302 LuceneIndexWriter checkWriter = new LuceneIndexWriter( dir, indexWriter.goodConfig ); 302 LuceneIndexWriter checkWriter = new LuceneIndexWriter( dir, indexWriter.goodConfig );
303 playLogs(logReaders,checkWriter); 303 playLogs(logReaders,new OpDoer(checkWriter));
304 //logger.info("check lucene"); 304 //logger.info("check lucene");
305 IndexReader checkReader = checkWriter.openReader(); 305 IndexReader checkReader = checkWriter.openReader();
306 int nCheck = checkReader.numDocs(); 306 int nCheck = checkReader.numDocs();
307 int nOrig = indexReader.numDocs(); 307 int nOrig = indexReader.numDocs();
308 if( nCheck != nOrig ) { 308 if( nCheck != nOrig ) {
447 log.writeLong(System.currentTimeMillis()); 447 log.writeLong(System.currentTimeMillis());
448 log.writeByte(op); 448 log.writeByte(op);
449 } 449 }
450 450
451 // return whether stopped at tag 451 // return whether stopped at tag
452 public synchronized void playLogs(GoodWriter writer) throws IOException { 452 public synchronized void playLogs(OpDoer opDoer) throws IOException {
453 if( writer == null ) 453 if( opDoer == null )
454 writer = indexWriter; 454 opDoer = new OpDoer(indexWriter);
455 playLogs( logReaders(logs), writer ); 455 playLogs( logReaders(logs), opDoer );
456 } 456 }
457 457
458 private static LogInputStream[] logReaders(LogFile[] logs) throws IOException { 458 private static LogInputStream[] logReaders(LogFile[] logs) throws IOException {
459 LogInputStream[] logReaders = new LogInputStream[logs.length]; 459 LogInputStream[] logReaders = new LogInputStream[logs.length];
460 for( int i=0; i<logs.length; i++ ) { 460 for( int i=0; i<logs.length; i++ ) {
461 logReaders[i] = logs[i].input(); 461 logReaders[i] = logs[i].input();
462 } 462 }
463 return logReaders; 463 return logReaders;
464 } 464 }
465 465
466 private static void playLogs(LogInputStream[] logReaders,GoodWriter indexWriter) 466 private static void playLogs(LogInputStream[] logReaders,OpDoer opDoer)
467 throws IOException 467 throws IOException
468 { 468 {
469 if( numDocs(indexWriter) != 0 ) 469 if( numDocs(opDoer.writer) != 0 )
470 throw new RuntimeException ("not empty"); 470 throw new RuntimeException ("not empty");
471 for( LogInputStream reader : logReaders ) { 471 for( LogInputStream reader : logReaders ) {
472 playLog(reader,indexWriter); 472 playLog(reader,opDoer);
473 } 473 }
474 indexWriter.commit(); 474 opDoer.commit();
475 } 475 }
476 476
477 private static int numDocs(GoodWriter indexWriter) throws IOException { 477 private static int numDocs(GoodIndexWriter indexWriter) throws IOException {
478 IndexReader reader = indexWriter.openReader(); 478 IndexReader reader = indexWriter.openReader();
479 int n = reader.numDocs(); 479 int n = reader.numDocs();
480 reader.close(); 480 reader.close();
481 return n; 481 return n;
482 } 482 }
483 483
484 private static void playLog(LogInputStream in,GoodWriter indexWriter) 484 private static void playLog(LogInputStream in,OpDoer opDoer)
485 throws IOException 485 throws IOException
486 { 486 {
487 while( in.available() > 0 ) { 487 while( in.available() > 0 ) {
488 playOp(in,indexWriter); 488 playOp(in,opDoer);
489 } 489 }
490 in.close(); 490 in.close();
491 } 491 }
492 492
493 private static void playOp(LogInputStream in,GoodWriter indexWriter) 493 private static void playOp(LogInputStream in,OpDoer opDoer)
494 throws IOException 494 throws IOException
495 { 495 {
496 in.readLong(); // time 496 long time = in.readLong(); // time
497 int op = in.readByte(); 497 int op = in.readByte();
498 switch(op) { 498 switch(op) {
499 case OP_DELETE_ALL: 499 case OP_DELETE_ALL:
500 indexWriter.deleteAll(); 500 opDoer.deleteAll(time);
501 return; 501 return;
502 case OP_DELETE_DOCUMENTS: 502 case OP_DELETE_DOCUMENTS:
503 { 503 {
504 Query query = in.readQuery(); 504 Query query = in.readQuery();
505 //System.out.println("OP_DELETE_DOCUMENTS "+query); 505 //System.out.println("OP_DELETE_DOCUMENTS "+query);
506 indexWriter.deleteDocuments(query); 506 opDoer.deleteDocuments(time,query);
507 return; 507 return;
508 } 508 }
509 case OP_ADD_DOCUMENT: 509 case OP_ADD_DOCUMENT:
510 { 510 {
511 Map storedFields = in.readMap(); 511 Map storedFields = in.readMap();
512 indexWriter.addDocument(storedFields); 512 opDoer.addDocument(time,storedFields);
513 return; 513 return;
514 } 514 }
515 case OP_UPDATE_DOCUMENT: 515 case OP_UPDATE_DOCUMENT:
516 { 516 {
517 String keyFieldName = in.readUTF(); 517 String keyFieldName = in.readUTF();
518 Map storedFields = in.readMap(); 518 Map storedFields = in.readMap();
519 indexWriter.updateDocument(keyFieldName,storedFields); 519 opDoer.updateDocument(time,keyFieldName,storedFields);
520 return; 520 return;
521 } 521 }
522 case OP_TAG: 522 case OP_TAG:
523 { 523 {
524 String tag = in.readUTF(); 524 String tag = in.readUTF();
525 indexWriter.tag(tag); 525 opDoer.tag(time,tag);
526 return; 526 return;
527 } 527 }
528 default: 528 default:
529 throw new RuntimeException("invalid op "+op); 529 throw new RuntimeException("invalid op "+op);
530 } 530 }