Mercurial Hosting > luan
diff src/goodjava/lucene/logging/LoggingIndexWriter.java @ 1465:5e3870618377
lucene.logging dir
author | Franklin Schmidt <fschmidt@gmail.com> |
---|---|
date | Sun, 12 Apr 2020 15:59:57 -0600 |
parents | e5d48b85351c |
children | 6c6ce14db6a8 |
line wrap: on
line diff
--- a/src/goodjava/lucene/logging/LoggingIndexWriter.java Fri Apr 03 10:04:52 2020 -0600 +++ b/src/goodjava/lucene/logging/LoggingIndexWriter.java Sun Apr 12 15:59:57 2020 -0600 @@ -1,94 +1,364 @@ package goodjava.lucene.logging; +import java.io.File; +import java.io.RandomAccessFile; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.DataInputStream; +import java.io.FileInputStream; import java.io.IOException; import java.util.Map; +import java.util.Set; +import java.util.HashSet; +import java.util.List; +import java.util.ArrayList; +import java.util.Random; +import org.apache.lucene.document.Document; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Query; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.TopDocs; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FSDirectory; import goodjava.lucene.api.GoodIndexWriter; +import goodjava.lucene.api.LuceneIndexWriter; +import goodjava.lucene.api.GoodCollector; +import goodjava.lucene.api.LuceneUtils; +import goodjava.logging.Logger; +import goodjava.logging.LoggerFactory; -public class LoggingIndexWriter implements GoodIndexWriter { +public final class LoggingIndexWriter implements GoodIndexWriter { + private static final Logger logger = LoggerFactory.getLogger(LoggingIndexWriter.class); + private static final int version = 1; private static final int OP_DELETE_ALL = 1; private static final int OP_DELETE_DOCUMENTS = 2; private static final int OP_ADD_DOCUMENT = 3; private static final int OP_UPDATE_DOCUMENT = 4; + private static final Random rnd = new Random(); - public final GoodIndexWriter indexWriter; - private final LogFile logFile; + public final LuceneIndexWriter indexWriter; + private final File logDir; + private final List<LogFile> logs = new ArrayList<LogFile>(); + private final File index; + private boolean isMerging = false; - public LoggingIndexWriter(GoodIndexWriter indexWriter) throws IOException { + public LoggingIndexWriter(LuceneIndexWriter indexWriter,File logDir) throws IOException { this.indexWriter = indexWriter; - logFile = new LogFile("lucene.log","rw"); - logFile.gotoStart(); // for now + this.logDir = logDir; + logDir.mkdirs(); + if( !logDir.isDirectory() ) + throw new RuntimeException(); + index = new File(logDir,"index"); + if( index.exists() ) { + DataInputStream dis = new DataInputStream(new FileInputStream(index)); + try { + if( dis.readInt() == version ) { + final int n = dis.readInt(); + for( int i=0; i<n; i++ ) { + File file = new File( logDir, dis.readUTF() ); + logs.add( new LogFile(file,"rwd") ); + } + deleteUnusedFiles(); + log().gotoEnd(); + return; + } + } finally { + dis.close(); + } + } + for( int i=0; i<2; i++ ) { + logs.add( newLogFile() ); + } + isMerging = true; + new Thread(new Runnable(){public void run(){ + try { + logLucene( System.currentTimeMillis(), logs.get(0), indexWriter ); + synchronized(LoggingIndexWriter.this) { + writeIndex(); + } + } catch(IOException e) { + throw new RuntimeException(e); + } finally { + synchronized(LoggingIndexWriter.this) { + isMerging = false; + } + } + }}).start(); } - public void close() throws IOException { - indexWriter.close(); - logFile.commit(); + private static void logLucene(long time,LogFile log,LuceneIndexWriter indexWriter) throws IOException { + IndexReader reader = indexWriter.openReader(); + final IndexSearcher searcher = new IndexSearcher(reader); + Query query = new MatchAllDocsQuery(); + searcher.search( query, new GoodCollector(){ + public void collectDoc(int iDoc) throws IOException { + Document doc = searcher.doc(iDoc); + Map<String,Object> storedFields = LuceneUtils.toMap(doc); + log.writeLong(time); + log.writeByte(OP_ADD_DOCUMENT); + log.writeMap(storedFields); + } + }); + reader.close(); + log.commit(); + } + + private LogFile newLogFile() throws IOException { + File file; + do { + file = new File(logDir,"_"+rnd.nextInt(100)+".log"); + } while( file.exists() ); + return new LogFile(file,"rwd"); } - public void commit() throws IOException { - indexWriter.commit(); - logFile.commit(); + private void deleteUnusedFiles() { + Set<String> used = new HashSet<String>(); + used.add( index.getName() ); + for( LogFile lf : logs ) { + used.add( lf.file.getName() ); + } + for( File f : logDir.listFiles() ) { + if( !used.contains(f.getName()) ) { + deleteFile(f); + } + } } - public void rollback() throws IOException { - indexWriter.rollback(); - logFile.gotoEnd(); + private static void deleteFile(File file) { + if( file.isDirectory() ) { + for( File f : file.listFiles() ) { + deleteFile(f); + } + } + if( !file.delete() ) + throw new RuntimeException(file.getName()); + } + + private void writeIndex() throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos); + dos.writeInt(version); + dos.writeInt(logs.size()); + for( LogFile lf : logs ) { + String fileName = lf.file.getName(); + dos.writeUTF(fileName); + } + dos.close(); + RandomAccessFile raf = new RandomAccessFile( index, "rwd" ); + raf.write( baos.toByteArray() ); + raf.close(); + deleteUnusedFiles(); + logger.info("writeIndex "+logs.toString()); } - public void deleteAll() throws IOException { - indexWriter.deleteAll(); - logFile.writeByte(OP_DELETE_ALL); + private void mergeLogs() throws IOException { + logger.info("merge"); + LogFile first = logs.get(0); + LogFile second = logs.get(1); + second.gotoEnd(); + long lastTime = second.readLong(); + File dirFile = new File(logDir,"merge"); + if( dirFile.exists() ) + throw new RuntimeException(); + Directory dir = FSDirectory.open(dirFile); + LuceneIndexWriter mergeWriter = new LuceneIndexWriter( indexWriter.luceneVersion, dir, indexWriter.goodConfig ); + playLog(first,mergeWriter); + playLog(second,mergeWriter); + mergeWriter.commit(); + LogFile merge = newLogFile(); + logLucene( lastTime, merge, mergeWriter ); + mergeWriter.close(); + synchronized(this) { + check(); + logs.remove(0); + logs.set(0,merge); + writeIndex(); + check(); + } } + private final Runnable mergeLogs = new Runnable() { public void run() { + try { + mergeLogs(); +/* + } catch(IOException e) { + throw new RuntimeException(e); +*/ + } catch(Exception e) { + e.printStackTrace(); + System.exit(-1); + } finally { + synchronized(LoggingIndexWriter.this) { + isMerging = false; + } + } + } }; - public void deleteDocuments(Query query) throws IOException { - indexWriter.deleteDocuments(query); - logFile.writeByte(OP_DELETE_DOCUMENTS); - logFile.writeQuery(query); + private void check() throws IOException { + File dirFile = new File(logDir,"check"); + if( dirFile.exists() ) + throw new RuntimeException(); + Directory dir = FSDirectory.open(dirFile); + LuceneIndexWriter checkWriter = new LuceneIndexWriter( indexWriter.luceneVersion, dir, indexWriter.goodConfig ); + playLog(checkWriter); + int nCheck = numDocs(checkWriter); + int nOrig = numDocs(indexWriter); + if( nCheck != nOrig ) { + logger.error("nCheck = "+nCheck); + logger.error("nOrig = "+nOrig); + //new Exception().printStackTrace(); + Thread.dumpStack(); + System.out.println(); + System.out.println("indexWriter"); + dump(indexWriter); + System.out.println("checkWriter"); + dump(checkWriter); + System.exit(-1); + } + checkWriter.close(); + deleteFile(dirFile); } - public void addDocument(Map<String,Object> storedFields) throws IOException { - indexWriter.addDocument(storedFields); - logFile.writeByte(OP_ADD_DOCUMENT); - logFile.writeMap(storedFields); + private LogFile log() { + return logs.get(logs.size()-1); + } + + public synchronized void close() throws IOException { + indexWriter.close(); + LogFile log = log(); + log.commit(); + } + + public synchronized void commit() throws IOException { + indexWriter.commit(); + LogFile log = log(); + log.commit(); + if( isMerging ) + return; + if( log.length() > logs.get(0).length() ) { + log.writeLong( System.currentTimeMillis() ); + logs.add( newLogFile() ); + writeIndex(); + } + if( logs.size() > 3 ) { + isMerging = true; +// new Thread(mergeLogs).start(); + mergeLogs.run(); + } } - public void updateDocument(String keyFieldName,Map<String,Object> storedFields) throws IOException { - indexWriter.updateDocument(keyFieldName,storedFields); - logFile.writeByte(OP_UPDATE_DOCUMENT); - logFile.writeUTF(keyFieldName); - logFile.writeMap(storedFields); + public synchronized void rollback() throws IOException { + indexWriter.rollback(); + LogFile log = log(); + log.gotoEnd(); + } + + public synchronized void deleteAll() throws IOException { + indexWriter.deleteAll(); + LogFile log = log(); + writeOp(log,OP_DELETE_ALL); } - public void reindexDocuments(String keyFieldName,Query query) throws IOException { + public synchronized void deleteDocuments(Query query) throws IOException { + indexWriter.deleteDocuments(query); + LogFile log = log(); + writeOp(log,OP_DELETE_DOCUMENTS); + log.writeQuery(query); + } + + public synchronized void addDocument(Map<String,Object> storedFields) throws IOException { + indexWriter.addDocument(storedFields); + LogFile log = log(); + writeOp(log,OP_ADD_DOCUMENT); + log.writeMap(storedFields); + } + + public synchronized void updateDocument(String keyFieldName,Map<String,Object> storedFields) throws IOException { + indexWriter.updateDocument(keyFieldName,storedFields); + LogFile log = log(); + writeOp(log,OP_UPDATE_DOCUMENT); + log.writeUTF(keyFieldName); + log.writeMap(storedFields); + } + + public synchronized void reindexDocuments(String keyFieldName,Query query) throws IOException { indexWriter.reindexDocuments(keyFieldName,query); } - private void playOp() throws IOException { - int op = logFile.readByte(); + private void writeOp(LogFile log,int op) throws IOException { + log.writeLong(System.currentTimeMillis()); + log.writeByte(op); + } + + public synchronized void playLog() throws IOException { + playLog(indexWriter); + } + + private void playLog(LuceneIndexWriter indexWriter) throws IOException { + if( numDocs(indexWriter) != 0 ) + throw new RuntimeException ("not empty"); + for( LogFile log : logs ) { + playLog(log,indexWriter); + } + indexWriter.commit(); + } + + private static int numDocs(LuceneIndexWriter indexWriter) throws IOException { + IndexReader reader = indexWriter.openReader(); + int n = reader.numDocs(); + reader.close(); + return n; + } + + private static void playLog(LogFile log,LuceneIndexWriter indexWriter) throws IOException { + log.gotoStart(); + while( log.hasMore() ) { + playOp(log,indexWriter); + } + } + + private static void playOp(LogFile log,LuceneIndexWriter indexWriter) throws IOException { + log.readLong(); // time + int op = log.readByte(); switch(op) { case OP_DELETE_ALL: indexWriter.deleteAll(); return; case OP_DELETE_DOCUMENTS: - indexWriter.deleteDocuments( logFile.readQuery() ); + indexWriter.deleteDocuments( log.readQuery() ); return; case OP_ADD_DOCUMENT: - indexWriter.addDocument( logFile.readMap() ); - return; + { + Map storedFields = log.readMap(); + indexWriter.addDocument(storedFields); + return; + } case OP_UPDATE_DOCUMENT: - indexWriter.updateDocument( logFile.readUTF(), logFile.readMap() ); - return; + { + String keyFieldName = log.readUTF(); + Map storedFields = log.readMap(); + indexWriter.updateDocument(keyFieldName,storedFields); + return; + } default: throw new RuntimeException("invalid op "+op); } } - public void playLog() throws IOException { - logFile.gotoStart(); - while( logFile.hasMore() ) { - playOp(); + private static void dump(LuceneIndexWriter indexWriter) throws IOException { + IndexReader reader = indexWriter.openReader(); + IndexSearcher searcher = new IndexSearcher(reader); + Query query = new MatchAllDocsQuery(); + TopDocs td = searcher.search(query,100); + System.out.println("totalHits = "+td.totalHits); + for( int i=0; i<td.scoreDocs.length; i++ ) { + Document doc = searcher.doc(td.scoreDocs[i].doc); + System.out.println(LuceneUtils.toMap(doc)); } - indexWriter.commit(); + System.out.println(); + reader.close(); } + }