Mercurial Hosting > luan
view src/goodjava/lucene/logging/LoggingIndexWriter.java @ 1473:6c6ce14db6a8
add goodjava.io
author | Franklin Schmidt <fschmidt@gmail.com> |
---|---|
date | Fri, 17 Apr 2020 13:56:57 -0600 |
parents | 5e3870618377 |
children | c7b86342857f |
line wrap: on
line source
package goodjava.lucene.logging; import java.io.File; import java.io.RandomAccessFile; import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.DataInputStream; import java.io.FileInputStream; import java.io.IOException; import java.util.Map; import java.util.Set; import java.util.HashSet; import java.util.List; import java.util.ArrayList; import java.util.Random; import org.apache.lucene.document.Document; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexReader; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Query; import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.TopDocs; import org.apache.lucene.store.Directory; import org.apache.lucene.store.FSDirectory; import goodjava.io.IoUtils; import goodjava.lucene.api.GoodIndexWriter; import goodjava.lucene.api.LuceneIndexWriter; import goodjava.lucene.api.GoodCollector; import goodjava.lucene.api.LuceneUtils; import goodjava.logging.Logger; import goodjava.logging.LoggerFactory; public final class LoggingIndexWriter implements GoodIndexWriter { private static final Logger logger = LoggerFactory.getLogger(LoggingIndexWriter.class); private static final int version = 1; private static final int OP_DELETE_ALL = 1; private static final int OP_DELETE_DOCUMENTS = 2; private static final int OP_ADD_DOCUMENT = 3; private static final int OP_UPDATE_DOCUMENT = 4; private static final Random rnd = new Random(); public final LuceneIndexWriter indexWriter; private final File logDir; private final List<LogFile> logs = new ArrayList<LogFile>(); private final File index; private boolean isMerging = false; public LoggingIndexWriter(LuceneIndexWriter indexWriter,File logDir) throws IOException { this.indexWriter = indexWriter; this.logDir = logDir; logDir.mkdirs(); if( !logDir.isDirectory() ) throw new RuntimeException(); index = new File(logDir,"index"); if( index.exists() ) { DataInputStream dis = new DataInputStream(new FileInputStream(index)); try { if( dis.readInt() == version ) { final int n = dis.readInt(); for( int i=0; i<n; i++ ) { File file = new File( logDir, dis.readUTF() ); logs.add( new LogFile(file,"rwd") ); } deleteUnusedFiles(); log().gotoEnd(); return; } } finally { dis.close(); } } for( int i=0; i<2; i++ ) { logs.add( newLogFile() ); } isMerging = true; new Thread(new Runnable(){public void run(){ try { logLucene( System.currentTimeMillis(), logs.get(0), indexWriter ); synchronized(LoggingIndexWriter.this) { writeIndex(); } } catch(IOException e) { throw new RuntimeException(e); } finally { synchronized(LoggingIndexWriter.this) { isMerging = false; } } }}).start(); } private static void logLucene(long time,LogFile log,LuceneIndexWriter indexWriter) throws IOException { IndexReader reader = indexWriter.openReader(); final IndexSearcher searcher = new IndexSearcher(reader); Query query = new MatchAllDocsQuery(); searcher.search( query, new GoodCollector(){ public void collectDoc(int iDoc) throws IOException { Document doc = searcher.doc(iDoc); Map<String,Object> storedFields = LuceneUtils.toMap(doc); log.writeLong(time); log.writeByte(OP_ADD_DOCUMENT); log.writeMap(storedFields); } }); reader.close(); log.commit(); } private LogFile newLogFile() throws IOException { File file; do { file = new File(logDir,"_"+rnd.nextInt(100)+".log"); } while( file.exists() ); return new LogFile(file,"rwd"); } private void deleteUnusedFiles() throws IOException { Set<String> used = new HashSet<String>(); used.add( index.getName() ); for( LogFile lf : logs ) { used.add( lf.file.getName() ); } for( File f : logDir.listFiles() ) { if( !used.contains(f.getName()) ) { deleteFile(f); } } } private static void deleteFile(File file) throws IOException { if( file.isDirectory() ) { for( File f : file.listFiles() ) { deleteFile(f); } } IoUtils.delete(file); } private void writeIndex() throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(baos); dos.writeInt(version); dos.writeInt(logs.size()); for( LogFile lf : logs ) { String fileName = lf.file.getName(); dos.writeUTF(fileName); } dos.close(); RandomAccessFile raf = new RandomAccessFile( index, "rwd" ); raf.write( baos.toByteArray() ); raf.close(); deleteUnusedFiles(); logger.info("writeIndex "+logs.toString()); } private void mergeLogs() throws IOException { logger.info("merge"); LogFile first = logs.get(0); LogFile second = logs.get(1); second.gotoEnd(); long lastTime = second.readLong(); File dirFile = new File(logDir,"merge"); if( dirFile.exists() ) throw new RuntimeException(); Directory dir = FSDirectory.open(dirFile); LuceneIndexWriter mergeWriter = new LuceneIndexWriter( indexWriter.luceneVersion, dir, indexWriter.goodConfig ); playLog(first,mergeWriter); playLog(second,mergeWriter); mergeWriter.commit(); LogFile merge = newLogFile(); logLucene( lastTime, merge, mergeWriter ); mergeWriter.close(); synchronized(this) { check(); logs.remove(0); logs.set(0,merge); writeIndex(); check(); } } private final Runnable mergeLogs = new Runnable() { public void run() { try { mergeLogs(); /* } catch(IOException e) { throw new RuntimeException(e); */ } catch(Exception e) { e.printStackTrace(); System.exit(-1); } finally { synchronized(LoggingIndexWriter.this) { isMerging = false; } } } }; private void check() throws IOException { File dirFile = new File(logDir,"check"); if( dirFile.exists() ) throw new RuntimeException(); Directory dir = FSDirectory.open(dirFile); LuceneIndexWriter checkWriter = new LuceneIndexWriter( indexWriter.luceneVersion, dir, indexWriter.goodConfig ); playLog(checkWriter); int nCheck = numDocs(checkWriter); int nOrig = numDocs(indexWriter); if( nCheck != nOrig ) { logger.error("nCheck = "+nCheck); logger.error("nOrig = "+nOrig); //new Exception().printStackTrace(); Thread.dumpStack(); System.out.println(); System.out.println("indexWriter"); dump(indexWriter); System.out.println("checkWriter"); dump(checkWriter); System.exit(-1); } checkWriter.close(); deleteFile(dirFile); } private LogFile log() { return logs.get(logs.size()-1); } public synchronized void close() throws IOException { indexWriter.close(); LogFile log = log(); log.commit(); } public synchronized void commit() throws IOException { indexWriter.commit(); LogFile log = log(); log.commit(); if( isMerging ) return; if( log.length() > logs.get(0).length() ) { log.writeLong( System.currentTimeMillis() ); logs.add( newLogFile() ); writeIndex(); } if( logs.size() > 3 ) { isMerging = true; // new Thread(mergeLogs).start(); mergeLogs.run(); } } public synchronized void rollback() throws IOException { indexWriter.rollback(); LogFile log = log(); log.gotoEnd(); } public synchronized void deleteAll() throws IOException { indexWriter.deleteAll(); LogFile log = log(); writeOp(log,OP_DELETE_ALL); } public synchronized void deleteDocuments(Query query) throws IOException { indexWriter.deleteDocuments(query); LogFile log = log(); writeOp(log,OP_DELETE_DOCUMENTS); log.writeQuery(query); } public synchronized void addDocument(Map<String,Object> storedFields) throws IOException { indexWriter.addDocument(storedFields); LogFile log = log(); writeOp(log,OP_ADD_DOCUMENT); log.writeMap(storedFields); } public synchronized void updateDocument(String keyFieldName,Map<String,Object> storedFields) throws IOException { indexWriter.updateDocument(keyFieldName,storedFields); LogFile log = log(); writeOp(log,OP_UPDATE_DOCUMENT); log.writeUTF(keyFieldName); log.writeMap(storedFields); } public synchronized void reindexDocuments(String keyFieldName,Query query) throws IOException { indexWriter.reindexDocuments(keyFieldName,query); } private void writeOp(LogFile log,int op) throws IOException { log.writeLong(System.currentTimeMillis()); log.writeByte(op); } public synchronized void playLog() throws IOException { playLog(indexWriter); } private void playLog(LuceneIndexWriter indexWriter) throws IOException { if( numDocs(indexWriter) != 0 ) throw new RuntimeException ("not empty"); for( LogFile log : logs ) { playLog(log,indexWriter); } indexWriter.commit(); } private static int numDocs(LuceneIndexWriter indexWriter) throws IOException { IndexReader reader = indexWriter.openReader(); int n = reader.numDocs(); reader.close(); return n; } private static void playLog(LogFile log,LuceneIndexWriter indexWriter) throws IOException { log.gotoStart(); while( log.hasMore() ) { playOp(log,indexWriter); } } private static void playOp(LogFile log,LuceneIndexWriter indexWriter) throws IOException { log.readLong(); // time int op = log.readByte(); switch(op) { case OP_DELETE_ALL: indexWriter.deleteAll(); return; case OP_DELETE_DOCUMENTS: indexWriter.deleteDocuments( log.readQuery() ); return; case OP_ADD_DOCUMENT: { Map storedFields = log.readMap(); indexWriter.addDocument(storedFields); return; } case OP_UPDATE_DOCUMENT: { String keyFieldName = log.readUTF(); Map storedFields = log.readMap(); indexWriter.updateDocument(keyFieldName,storedFields); return; } default: throw new RuntimeException("invalid op "+op); } } private static void dump(LuceneIndexWriter indexWriter) throws IOException { IndexReader reader = indexWriter.openReader(); IndexSearcher searcher = new IndexSearcher(reader); Query query = new MatchAllDocsQuery(); TopDocs td = searcher.search(query,100); System.out.println("totalHits = "+td.totalHits); for( int i=0; i<td.scoreDocs.length; i++ ) { Document doc = searcher.doc(td.scoreDocs[i].doc); System.out.println(LuceneUtils.toMap(doc)); } System.out.println(); reader.close(); } }