Mercurial Hosting > luan
diff src/goodjava/lucene/logging/LoggingIndexWriter.java @ 1548:736ec76bbf42
lucene log work
author | Franklin Schmidt <fschmidt@gmail.com> |
---|---|
date | Sun, 27 Sep 2020 22:07:18 -0600 |
parents | 35601f15ecc3 |
children | 41c32da4cbd1 |
line wrap: on
line diff
--- a/src/goodjava/lucene/logging/LoggingIndexWriter.java Thu Sep 24 15:33:56 2020 -0600 +++ b/src/goodjava/lucene/logging/LoggingIndexWriter.java Sun Sep 27 22:07:18 2020 -0600 @@ -10,8 +10,6 @@ import java.util.Map; import java.util.Set; import java.util.HashSet; -import java.util.List; -import java.util.ArrayList; import java.util.Random; import java.util.concurrent.TimeUnit; import org.apache.lucene.document.Document; @@ -29,6 +27,7 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.store.FSDirectory; import goodjava.io.IoUtils; +import goodjava.lucene.api.GoodWriter; import goodjava.lucene.api.GoodIndexWriter; import goodjava.lucene.api.LuceneIndexWriter; import goodjava.lucene.api.GoodCollector; @@ -39,7 +38,7 @@ public class LoggingIndexWriter implements GoodIndexWriter { private static final Logger logger = LoggerFactory.getLogger(LoggingIndexWriter.class); - private static final int version = 1; + private static final int version = 2; private static final int OP_DELETE_ALL = 1; private static final int OP_DELETE_DOCUMENTS = 2; private static final int OP_ADD_DOCUMENT = 3; @@ -50,14 +49,18 @@ public final LuceneIndexWriter indexWriter; public boolean wasCreated; private final File logDir; - protected final List<LogFile> logs = new ArrayList<LogFile>(); + private final long logTime; + protected final LogFile[] logs = new LogFile[3]; private LogOutputStream log; private final File index; private final SemaphoreLock mergeLock = new SemaphoreLock(); - public LoggingIndexWriter(LuceneIndexWriter indexWriter,File logDir) throws IOException { + public LoggingIndexWriter(LuceneIndexWriter indexWriter,File logDir,long logTime) + throws IOException + { this.indexWriter = indexWriter; this.logDir = logDir; + this.logTime = logTime; IoUtils.mkdirs(logDir); if( !logDir.isDirectory() ) throw new RuntimeException(); @@ -66,10 +69,9 @@ DataInputStream dis = new DataInputStream(new FileInputStream(index)); try { if( dis.readInt() == version ) { - final int n = dis.readInt(); - for( int i=0; i<n; i++ ) { + for( int i=0; i<logs.length; i++ ) { File file = new File( logDir, dis.readUTF() ); - logs.add( new LogFile(file) ); + logs[i] = new LogFile(file); } deleteUnusedFiles(); setLog(); @@ -84,6 +86,10 @@ wasCreated = true; } + public IndexReader openReader() throws IOException { + return indexWriter.openReader(); + } + public IndexWriter getLuceneIndexWriter() { return indexWriter.getLuceneIndexWriter(); } @@ -91,7 +97,7 @@ private void setLog() throws IOException { if( log != null ) log.close(); - log = logs.get(logs.size()-1).output(); + log = logs[2].output(); } /* public synchronized boolean isMerging() { @@ -118,18 +124,29 @@ private void newLogs2() throws IOException { logger.info("building new logs"); - logs.clear(); - for( int i=0; i<2; i++ ) { - logs.add( newLogFile() ); + for( int i=0; i<logs.length; i++ ) { + logs[i] = newLogFile(); } - logLucene( System.currentTimeMillis(), logs.get(0), indexWriter ); + LogOutputStream log = logs[0].output(); + logLucene( System.currentTimeMillis(), log, indexWriter ); + log.close(); writeIndex(); setLog(); logger.info("done building new logs"); } - private static void logLucene(long time,LogFile logLucene,LuceneIndexWriter indexWriter) throws IOException { - LogOutputStream log = logLucene.output(); + public synchronized void logLucene() + throws IOException + { + //log.rollback(); ? + logLucene( System.currentTimeMillis(), log, indexWriter ); + } + + private static void logLucene(long time,LogOutputStream log,LuceneIndexWriter indexWriter) + throws IOException + { + log.writeLong(time); + log.writeByte(OP_DELETE_ALL); IndexReader reader = indexWriter.openReader(); final IndexSearcher searcher = new IndexSearcher(reader); Query query = new MatchAllDocsQuery(); @@ -144,7 +161,6 @@ }); reader.close(); log.commit(); - log.close(); } private LogFile newLogFile() throws IOException { @@ -159,7 +175,7 @@ deleteUnusedFiles(logs,index); } - private static void deleteUnusedFiles(List<LogFile> logs,File index) throws IOException { + private static void deleteUnusedFiles(LogFile[] logs,File index) throws IOException { Set<String> used = new HashSet<String>(); used.add( index.getName() ); for( LogFile lf : logs ) { @@ -176,11 +192,12 @@ writeIndex(logs,index); } - public static void writeIndex(List<LogFile> logs,File index) throws IOException { + public static void writeIndex(LogFile[] logs,File index) throws IOException { + if( logs.length != 3 ) + throw new RuntimeException(); ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(baos); dos.writeInt(version); - dos.writeInt(logs.size()); for( LogFile lf : logs ) { String fileName = lf.file.getName(); dos.writeUTF(fileName); @@ -194,28 +211,34 @@ } private void mergeLogs() throws IOException { - //logger.info("merge"); - if( logs.size() <= 3 ) + logger.info("merge"); + if( !mergeLock.isLocked() ) { + logger.error("merge without lock"); return; - LogFile first = logs.get(0); - LogFile second = logs.get(1); + } + LogFile first = logs[0]; + LogFile second = logs[1]; long lastTime = second.file.lastModified(); File dirFile = new File(logDir,"merge"); if( dirFile.exists() ) throw new RuntimeException(); Directory dir = FSDirectory.open(dirFile); LuceneIndexWriter mergeWriter = new LuceneIndexWriter( dir, indexWriter.goodConfig ); - playLog( first.input(), mergeWriter, null ); - playLog( second.input(), mergeWriter, null ); + playLog( first.input(), mergeWriter ); + playLog( second.input(), mergeWriter ); mergeWriter.commit(); LogFile merge = newLogFile(); - logLucene( lastTime, merge, mergeWriter ); + LogOutputStream log = merge.output(); + logLucene( lastTime, log, mergeWriter ); + log.close(); mergeWriter.close(); synchronized(this) { //check(); - logs.remove(0); - logs.set(0,merge); + logs[0] = merge; + logs[1] = logs[2]; + logs[2] = newLogFile(); writeIndex(); + setLog(); //check(null); } } @@ -265,7 +288,7 @@ protected boolean doCheck(SortField sortField) throws IOException { boolean ok = true; IndexReader indexReader; - List<LogInputStream> logReaders; + LogInputStream[] logReaders; synchronized(this) { indexReader = indexWriter.openReader(); logReaders = logReaders(logs); @@ -277,7 +300,7 @@ IoUtils.deleteRecursively(dirFile); Directory dir = FSDirectory.open(dirFile); LuceneIndexWriter checkWriter = new LuceneIndexWriter( dir, indexWriter.goodConfig ); - playLogs(logReaders,checkWriter,null); + playLogs(logReaders,checkWriter); //logger.info("check lucene"); IndexReader checkReader = checkWriter.openReader(); int nCheck = checkReader.numDocs(); @@ -375,12 +398,7 @@ log.commit(); if( mergeLock.isLocked() ) return; - if( log.logFile.end() > logs.get(0).end() ) { - logs.add( newLogFile() ); - writeIndex(); - setLog(); - } - if( logs.size() > 3 ) { + if( logs[1].file.lastModified() < System.currentTimeMillis() - logTime ) { getMergeLock(); new Thread(mergeLogs).start(); // mergeLogs.run(); @@ -431,86 +449,81 @@ } // return whether stopped at tag - public synchronized boolean playLogs(String upToTag) throws IOException { - return playLogs( logReaders(logs), indexWriter, upToTag ); + public synchronized void playLogs(GoodWriter writer) throws IOException { + if( writer == null ) + writer = indexWriter; + playLogs( logReaders(logs), writer ); } - private static List<LogInputStream> logReaders(List<LogFile> logs) throws IOException { - List<LogInputStream> logReaders = new ArrayList<LogInputStream>(); - for( LogFile log : logs ) { - logReaders.add( log.input() ); + private static LogInputStream[] logReaders(LogFile[] logs) throws IOException { + LogInputStream[] logReaders = new LogInputStream[logs.length]; + for( int i=0; i<logs.length; i++ ) { + logReaders[i] = logs[i].input(); } return logReaders; } - private static boolean playLogs(List<LogInputStream> logReaders,LuceneIndexWriter indexWriter,String upToTag) + private static void playLogs(LogInputStream[] logReaders,GoodWriter indexWriter) throws IOException { if( numDocs(indexWriter) != 0 ) throw new RuntimeException ("not empty"); - boolean rtn = false; for( LogInputStream reader : logReaders ) { - if( playLog(reader,indexWriter,upToTag) ) { - rtn = true; - break; - } + playLog(reader,indexWriter); } indexWriter.commit(); - return rtn; } - private static int numDocs(LuceneIndexWriter indexWriter) throws IOException { + private static int numDocs(GoodWriter indexWriter) throws IOException { IndexReader reader = indexWriter.openReader(); int n = reader.numDocs(); reader.close(); return n; } - private static boolean playLog(LogInputStream in,LuceneIndexWriter indexWriter,String upToTag) + private static void playLog(LogInputStream in,GoodWriter indexWriter) throws IOException { - boolean rtn = false; while( in.available() > 0 ) { - if( playOp(in,indexWriter,upToTag) ) { - rtn = true; - break; - } + playOp(in,indexWriter); } in.close(); - return rtn; } - private static boolean playOp(LogInputStream in,LuceneIndexWriter indexWriter,String upToTag) throws IOException { + private static void playOp(LogInputStream in,GoodWriter indexWriter) + throws IOException + { in.readLong(); // time int op = in.readByte(); switch(op) { case OP_DELETE_ALL: indexWriter.deleteAll(); - return false; + return; case OP_DELETE_DOCUMENTS: { Query query = in.readQuery(); //System.out.println("OP_DELETE_DOCUMENTS "+query); indexWriter.deleteDocuments(query); - return false; + return; } case OP_ADD_DOCUMENT: { Map storedFields = in.readMap(); indexWriter.addDocument(storedFields); - return false; + return; } case OP_UPDATE_DOCUMENT: { String keyFieldName = in.readUTF(); Map storedFields = in.readMap(); indexWriter.updateDocument(keyFieldName,storedFields); - return false; + return; } case OP_TAG: { String tag = in.readUTF(); - return tag.equals(upToTag); + indexWriter.tag(tag); + return; } default: throw new RuntimeException("invalid op "+op);