Mercurial Hosting > luan
view src/goodjava/lucene/logging/LoggingIndexWriter.java @ 1807:ffa28e2155bb
add line_diff
author | Franklin Schmidt <fschmidt@gmail.com> |
---|---|
date | Sun, 12 May 2024 15:25:29 -0600 |
parents | 39287902fb0c |
children |
line wrap: on
line source
package goodjava.lucene.logging; import java.io.File; import java.io.RandomAccessFile; import java.io.ByteArrayOutputStream; import java.io.OutputStreamWriter; import java.io.FileReader; import java.io.Writer; import java.io.IOException; import java.util.List; import java.util.ArrayList; import java.util.Map; import java.util.Set; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.Random; import java.util.concurrent.TimeUnit; import org.apache.lucene.document.Document; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.Term; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Query; import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.PrefixQuery; import org.apache.lucene.search.SortField; import org.apache.lucene.search.Sort; import org.apache.lucene.store.Directory; import org.apache.lucene.store.FSDirectory; import goodjava.io.IoUtils; import goodjava.json.JsonParser; import goodjava.json.JsonToString; import goodjava.parser.ParseException; import goodjava.lucene.api.GoodIndexWriter; import goodjava.lucene.api.LuceneIndexWriter; import goodjava.lucene.api.GoodCollector; import goodjava.lucene.api.LuceneUtils; import goodjava.logging.Logger; import goodjava.logging.LoggerFactory; public class LoggingIndexWriter implements GoodIndexWriter { private static final Logger logger = LoggerFactory.getLogger(LoggingIndexWriter.class); private static final int version = 2; private static final int OP_DELETE_ALL = 1; private static final int OP_DELETE_DOCUMENTS = 2; private static final int OP_ADD_DOCUMENT = 3; private static final int OP_UPDATE_DOCUMENT = 4; private static final int OP_TAG = 5; private static final Random rnd = new Random(); public final LuceneIndexWriter indexWriter; public boolean wasCreated; private final File logDir; private final long logTime; protected final LogFile[] logs = new LogFile[3]; private LogOutputStream log; private final File indexFile; private final SemaphoreLock mergeLock = new SemaphoreLock(); public LoggingIndexWriter(LuceneIndexWriter indexWriter,File logDir,long logTime) throws IOException { this.indexWriter = indexWriter; this.logDir = logDir; this.logTime = logTime; IoUtils.mkdirs(logDir); if( !logDir.isDirectory() ) throw new RuntimeException(); indexFile = new File(logDir,"index.json"); if( indexFile.exists() ) { try { Map map = (Map)JsonParser.parse( IoUtils.readAll(new FileReader(indexFile)) ); if( ((Number)map.get("version")).intValue() == version ) { List fileNames = (List)map.get("files"); for( int i=0; i<logs.length; i++ ) { File file = new File( logDir, (String)fileNames.get(i) ); logs[i] = new LogFile(file); } deleteUnusedFiles(); setLog(); wasCreated = false; return; } } catch(ParseException e) { logger.error("bad index.json",e); } } logger.info("building new logs"); for( int i=0; i<logs.length; i++ ) { logs[i] = newLogFile(); } LogOutputStream log = logs[0].output(); logLucene( System.currentTimeMillis(), log, indexWriter ); log.close(); writeIndex(); setLog(); logger.info("done building new logs"); wasCreated = true; } public IndexReader openReader() throws IOException { return indexWriter.openReader(); } public IndexWriter getLuceneIndexWriter() { return indexWriter.getLuceneIndexWriter(); } private void setLog() throws IOException { if( log != null ) log.close(); log = logs[2].output(); } /* public synchronized boolean isMerging() { return mergeLock.isLocked(); } */ private void getMergeLock() { try { if( !mergeLock.tryLock(1,TimeUnit.MINUTES) ) throw new RuntimeException("failed to acquire lock"); } catch(InterruptedException e) { throw new RuntimeException(e); } } public synchronized void logLucene() throws IOException { logLucene( System.currentTimeMillis(), log, indexWriter ); } private static void logLucene(long time,LogOutputStream log,LuceneIndexWriter indexWriter) throws IOException { log.writeLong(time); log.writeByte(OP_DELETE_ALL); IndexReader reader = indexWriter.openReader(); final IndexSearcher searcher = new IndexSearcher(reader); Query query = new MatchAllDocsQuery(); searcher.search( query, new GoodCollector(){ public void collectDoc(int iDoc) throws IOException { Document doc = searcher.doc(iDoc); Map<String,Object> storedFields = LuceneUtils.toMap(doc); log.writeLong(time); log.writeByte(OP_ADD_DOCUMENT); log.writeMap(storedFields); } }); reader.close(); log.commit(); } private LogFile newLogFile() throws IOException { File file; do { file = new File(logDir,"_"+rnd.nextInt(100)+".log"); } while( file.exists() ); return new LogFile(file); } private void deleteUnusedFiles() throws IOException { deleteUnusedFiles(logs,indexFile); } private static void deleteUnusedFiles(LogFile[] logs,File indexFile) throws IOException { Set<String> used = new HashSet<String>(); used.add( indexFile.getName() ); for( LogFile lf : logs ) { used.add( lf.file.getName() ); } for( File f : indexFile.getParentFile().listFiles() ) { if( !used.contains(f.getName()) ) { IoUtils.deleteRecursively(f); } } } private void writeIndex() throws IOException { writeIndex(logs,indexFile); } public static void writeIndex(LogFile[] logs,File indexFile) throws IOException { if( logs.length != 3 ) throw new RuntimeException(); Map map = new LinkedHashMap(); map.put("version",version); List fileNames = new ArrayList(); for( LogFile lf : logs ) { String fileName = lf.file.getName(); fileNames.add(fileName); } map.put("files",fileNames); ByteArrayOutputStream baos = new ByteArrayOutputStream(); Writer writer = new OutputStreamWriter(baos); writer.write( new JsonToString().toString(map) ); writer.write( '\n' ); writer.close(); RandomAccessFile raf = new RandomAccessFile( indexFile, "rwd" ); raf.write( baos.toByteArray() ); raf.close(); deleteUnusedFiles(logs,indexFile); //logger.info("writeIndex "+logs.toString()); } private void mergeLogs() throws IOException { logger.info("merge"); if( !mergeLock.isLocked() ) { logger.error("merge without lock"); return; } LogFile first = logs[0]; LogFile second = logs[1]; long lastTime = second.file.lastModified(); File dirFile = new File(logDir,"merge"); if( dirFile.exists() ) throw new RuntimeException(); Directory dir = FSDirectory.open(dirFile); LuceneIndexWriter mergeWriter = new LuceneIndexWriter( dir, indexWriter.goodConfig ); OpDoer opDoer = new BasicOpDoer(mergeWriter); playLog( first.input(), opDoer ); playLog( second.input(), opDoer ); mergeWriter.commit(); LogFile merge = newLogFile(); LogOutputStream log = merge.output(); logLucene( lastTime, log, mergeWriter ); log.close(); mergeWriter.close(); synchronized(this) { //check(); logs[0] = merge; logs[1] = logs[2]; logs[2] = newLogFile(); writeIndex(); setLog(); //check(null); } } private final Runnable mergeLogs = new Runnable() { public void run() { try { mergeLogs(); } catch(IOException e) { throw new RuntimeException(e); } finally { mergeLock.unlock(); } } }; private static class DocIter { final IndexReader reader; final TopDocs td; final int n; int i = 0; DocIter(IndexReader reader,Query query,Sort sort) throws IOException { this.reader = reader; IndexSearcher searcher = new IndexSearcher(reader); this.td = searcher.search(query,10000000,sort); this.n = td.scoreDocs.length; if( td.totalHits != n ) throw new RuntimeException(); } Document next() throws IOException { return i < n ? reader.document(td.scoreDocs[i++].doc) : null; } } private volatile boolean isChecking = false; public boolean check(SortField sortField) throws IOException { if( isChecking ) throw new RuntimeException("another check is running"); isChecking = true; try { return doCheck(sortField); } finally { isChecking = false; } } protected boolean doCheck(SortField sortField) throws IOException { boolean ok = true; IndexReader indexReader; LogInputStream[] logReaders; synchronized(this) { indexReader = indexWriter.openReader(); logReaders = logReaders(logs); } try { //logger.info("check start"); indexWriter.check(); File dirFile = new File(logDir,"check"); IoUtils.deleteRecursively(dirFile); Directory dir = FSDirectory.open(dirFile); LuceneIndexWriter checkWriter = new LuceneIndexWriter( dir, indexWriter.goodConfig ); playLogs(logReaders,new BasicOpDoer(checkWriter)); //logger.info("check lucene"); IndexReader checkReader = checkWriter.openReader(); int nCheck = checkReader.numDocs(); int nOrig = indexReader.numDocs(); if( nCheck != nOrig ) { logger.error("numDocs mismatch: lucene="+nOrig+" logs="+nCheck); ok = false; } if( sortField == null ) { if( ok && hash(indexReader) != hash(checkReader) ) { logger.error("hash mismatch"); ok = false; } } else { Sort sort = new Sort(sortField); String sortFieldName = sortField.getField(); Query query = new PrefixQuery(new Term(sortFieldName)); DocIter origIter = new DocIter(indexReader,query,sort); DocIter checkIter = new DocIter(checkReader,query,sort); Map<String,Object> origFields = LuceneUtils.toMap(origIter.next()); Map<String,Object> checkFields = LuceneUtils.toMap(checkIter.next()); while( origFields!=null && checkFields!=null ) { Comparable origFld = (Comparable)origFields.get(sortFieldName); Comparable checkFld = (Comparable)checkFields.get(sortFieldName); int cmp = origFld.compareTo(checkFld); if( cmp==0 ) { if( !origFields.equals(checkFields) ) { logger.error(sortFieldName+" "+origFld+" not equal"); logger.error("lucene = "+origFields); logger.error("logs = "+checkFields); ok = false; } origFields = LuceneUtils.toMap(origIter.next()); checkFields = LuceneUtils.toMap(checkIter.next()); } else if( cmp < 0 ) { logger.error(sortFieldName+" "+origFld+" found in lucene but not logs"); ok = false; origFields = LuceneUtils.toMap(origIter.next()); } else { // > logger.error(sortFieldName+" "+checkFld+" found in logs but not lucene"); ok = false; checkFields = LuceneUtils.toMap(checkIter.next()); } } while( origFields!=null ) { Comparable origFld = (Comparable)origFields.get(sortFieldName); logger.error(sortFieldName+" "+origFld+" found in lucene but not logs"); ok = false; origFields = LuceneUtils.toMap(origIter.next()); } while( checkFields!=null ) { Comparable checkFld = (Comparable)checkFields.get(sortFieldName); logger.error(sortFieldName+" "+checkFld+" found in logs but not lucene"); ok = false; checkFields = LuceneUtils.toMap(checkIter.next()); } //logger.info("check done"); } checkReader.close(); checkWriter.close(); IoUtils.deleteRecursively(dirFile); //logger.info("check done"); } finally { indexReader.close(); } return ok; } private static abstract class HashCollector extends GoodCollector { int total = 0; } private static int hash(IndexReader reader) throws IOException { final IndexSearcher searcher = new IndexSearcher(reader); Query query = new MatchAllDocsQuery(); HashCollector col = new HashCollector() { public void collectDoc(int iDoc) throws IOException { Document doc = searcher.doc(iDoc); Map<String,Object> storedFields = LuceneUtils.toMap(doc); total += storedFields.hashCode(); } }; searcher.search(query,col); return col.total; } public synchronized void close() throws IOException { indexWriter.close(); log.commit(); log.close(); } public synchronized void commit() throws IOException { indexWriter.commit(); log.commit(); if( mergeLock.isLocked() ) return; if( logs[1].file.lastModified() < System.currentTimeMillis() - logTime ) { getMergeLock(); new Thread(mergeLogs).start(); // mergeLogs.run(); } } public synchronized void rollback() throws IOException { indexWriter.rollback(); log.rollback(); } public synchronized void deleteAll() throws IOException { indexWriter.deleteAll(); writeOp(OP_DELETE_ALL); } public synchronized void deleteDocuments(Query query) throws IOException { indexWriter.deleteDocuments(query); writeOp(OP_DELETE_DOCUMENTS); log.writeQuery(query); } public synchronized void addDocument(Map<String,Object> storedFields) throws IOException { indexWriter.addDocument(storedFields); writeOp(OP_ADD_DOCUMENT); log.writeMap(storedFields); } public synchronized void updateDocument(String keyFieldName,Map<String,Object> storedFields) throws IOException { indexWriter.updateDocument(keyFieldName,storedFields); writeOp(OP_UPDATE_DOCUMENT); log.writeUTF(keyFieldName); log.writeMap(storedFields); } public synchronized void tag(String tag) throws IOException { writeOp(OP_TAG); log.writeUTF(tag); } public synchronized void reindexDocuments(String keyFieldName,Query query) throws IOException { indexWriter.reindexDocuments(keyFieldName,query); } private void writeOp(int op) throws IOException { log.writeLong(System.currentTimeMillis()); log.writeByte(op); } public synchronized void playLogs(OpDoer opDoer) throws IOException { if( opDoer == null ) opDoer = new BasicOpDoer(indexWriter); playLogs( logReaders(logs), opDoer ); } private static LogInputStream[] logReaders(LogFile[] logs) throws IOException { LogInputStream[] logReaders = new LogInputStream[logs.length]; for( int i=0; i<logs.length; i++ ) { logReaders[i] = logs[i].input(); } return logReaders; } private static void playLogs(LogInputStream[] logReaders,OpDoer opDoer) throws IOException { for( LogInputStream reader : logReaders ) { playLog(reader,opDoer); } opDoer.commit(); } private static void playLog(LogInputStream in,OpDoer opDoer) throws IOException { while( in.available() > 0 ) { playOp(in,opDoer); } in.close(); } private static void playOp(LogInputStream in,OpDoer opDoer) throws IOException { long time = in.readLong(); // time int op = in.readByte(); switch(op) { case OP_DELETE_ALL: opDoer.deleteAll(time); return; case OP_DELETE_DOCUMENTS: { Query query = in.readQuery(); //System.out.println("OP_DELETE_DOCUMENTS "+query); opDoer.deleteDocuments(time,query); return; } case OP_ADD_DOCUMENT: { Map storedFields = in.readMap(); opDoer.addDocument(time,storedFields); return; } case OP_UPDATE_DOCUMENT: { String keyFieldName = in.readUTF(); Map storedFields = in.readMap(); opDoer.updateDocument(time,keyFieldName,storedFields); return; } case OP_TAG: { String tag = in.readUTF(); opDoer.tag(time,tag); return; } default: throw new RuntimeException("invalid op "+op); } } private static void dump(LuceneIndexWriter indexWriter) throws IOException { IndexReader reader = indexWriter.openReader(); IndexSearcher searcher = new IndexSearcher(reader); Query query = new MatchAllDocsQuery(); TopDocs td = searcher.search(query,100); System.out.println("totalHits = "+td.totalHits); for( int i=0; i<td.scoreDocs.length; i++ ) { Document doc = searcher.doc(td.scoreDocs[i].doc); System.out.println(LuceneUtils.toMap(doc)); } System.out.println(); reader.close(); } }