Mercurial Hosting > luan
changeset 1486:2469aa31f31b
LogOutputStream
author | Franklin Schmidt <fschmidt@gmail.com> |
---|---|
date | Fri, 01 May 2020 16:09:35 -0600 |
parents | 6a24c8b33d6b |
children | 044a360c2300 |
files | src/goodjava/lucene/logging/LogFile.java src/goodjava/lucene/logging/LogOutputStream.java src/goodjava/lucene/logging/LoggingIndexWriter.java |
diffstat | 3 files changed, 234 insertions(+), 208 deletions(-) [+] |
line wrap: on
line diff
--- a/src/goodjava/lucene/logging/LogFile.java Fri May 01 11:23:29 2020 -0600 +++ b/src/goodjava/lucene/logging/LogFile.java Fri May 01 16:09:35 2020 -0600 @@ -4,49 +4,24 @@ import java.io.InputStream; import java.io.OutputStream; import java.io.BufferedOutputStream; -import java.io.DataOutputStream; import java.io.RandomAccessFile; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; -import java.util.List; -import java.util.Map; -import org.apache.lucene.index.Term; -import org.apache.lucene.search.Query; -import org.apache.lucene.search.MatchAllDocsQuery; -import org.apache.lucene.search.TermQuery; -import org.apache.lucene.search.PrefixQuery; -import org.apache.lucene.search.WildcardQuery; -import org.apache.lucene.search.TermRangeQuery; -import org.apache.lucene.search.PhraseQuery; -import org.apache.lucene.search.NumericRangeQuery; -import org.apache.lucene.search.BooleanQuery; -import org.apache.lucene.search.BooleanClause; -import org.apache.lucene.util.BytesRef; import goodjava.logging.Logger; import goodjava.logging.LoggerFactory; import goodjava.io.LimitedInputStream; import goodjava.io.BufferedInputStream; -public class LogFile extends DataOutputStream { +public class LogFile { private static final Logger logger = LoggerFactory.getLogger(LogFile.class); public final File file; - private final RandomAccessFile raf; - private long end; + long end; - public static LogFile newLogFile(File file) throws IOException { + public LogFile(File file) throws IOException { + this.file = file; RandomAccessFile raf = new RandomAccessFile(file,"rwd"); - OutputStream out = new FileOutputStream(raf.getFD()); - out = new BufferedOutputStream(out); - return new LogFile(file,raf,out); - } - - protected LogFile(File file,RandomAccessFile raf,OutputStream out) throws IOException { - super(out); - this.file = file; - this.raf = raf; - if( raf.length() == 0 ) { end = 8; raf.writeLong(end); @@ -55,6 +30,7 @@ end = raf.readLong(); raf.seek(end); } + raf.close(); } public String toString() { @@ -65,6 +41,17 @@ return end; } + public LogOutputStream output() throws IOException { + RandomAccessFile raf = new RandomAccessFile(file,"rwd"); + OutputStream out = new FileOutputStream(raf.getFD()); + out = new BufferedOutputStream(out); + return newLogOutputStream(raf,out); + } + + protected LogOutputStream newLogOutputStream(RandomAccessFile raf,OutputStream out) throws IOException { + return new LogOutputStream(this,raf,out); + } + public LogInputStream input() throws IOException { InputStream in = new FileInputStream(file); in = new LimitedInputStream(in,end); @@ -78,19 +65,6 @@ return new LogInputStream(in); } - public void commit() throws IOException { - flush(); - end = raf.getFilePointer(); - raf.seek(0L); - raf.writeLong(end); - raf.seek(end); - } - - public void rollback() throws IOException { - flush(); - raf.seek(end); - } - static final int TYPE_NULL = 0; static final int TYPE_STRING = 1; static final int TYPE_INT = 2; @@ -107,150 +81,4 @@ static final int TYPE_QUERY_PHRASE = 13; static final int TYPE_QUERY_NUMERIC_RANGE = 14; static final int TYPE_QUERY_BOOLEAN = 15; - - public void writeObject(Object obj) throws IOException { - if( obj==null ) { - writeByte(TYPE_NULL); - return; - } - if( obj instanceof String ) { - writeByte(TYPE_STRING); - writeUTF((String)obj); - return; - } - if( obj instanceof Integer ) { - writeByte(TYPE_INT); - writeInt((Integer)obj); - return; - } - if( obj instanceof Long ) { - writeByte(TYPE_LONG); - writeLong((Long)obj); - return; - } - if( obj instanceof Float ) { - writeByte(TYPE_FLOAT); - writeFloat((Float)obj); - return; - } - if( obj instanceof Double ) { - writeByte(TYPE_DOUBLE); - writeDouble((Double)obj); - return; - } - if( obj instanceof byte[] ) { - writeByte(TYPE_BYTES); - writeByteArray((byte[])obj); - return; - } - if( obj instanceof List ) { - writeByte(TYPE_LIST); - writeList((List)obj); - return; - } - if( obj instanceof MatchAllDocsQuery ) { - writeByte(TYPE_QUERY_MATCH_ALL_DOCS); - return; - } - if( obj instanceof TermQuery ) { - writeByte(TYPE_QUERY_TERM); - TermQuery query = (TermQuery)obj; - writeTerm( query.getTerm() ); - return; - } - if( obj instanceof PrefixQuery ) { - writeByte(TYPE_QUERY_PREFIX); - PrefixQuery query = (PrefixQuery)obj; - writeTerm( query.getPrefix() ); - return; - } - if( obj instanceof WildcardQuery ) { - writeByte(TYPE_QUERY_TERM_RANGE); - WildcardQuery query = (WildcardQuery)obj; - writeTerm( query.getTerm() ); - return; - } - if( obj instanceof TermRangeQuery ) { - writeByte(TYPE_QUERY_TERM_RANGE); - TermRangeQuery query = (TermRangeQuery)obj; - writeUTF( query.getField() ); - writeBytesRef( query.getLowerTerm() ); - writeBytesRef( query.getUpperTerm() ); - writeBoolean( query.includesLower() ); - writeBoolean( query.includesUpper() ); - return; - } - if( obj instanceof PhraseQuery ) { - writeByte(TYPE_QUERY_PHRASE); - PhraseQuery query = (PhraseQuery)obj; - Term[] terms = query.getTerms(); - int[] positions = query.getPositions(); - if( terms.length != positions.length ) - throw new RuntimeException(); - writeInt( terms.length ); - for( int i=0; i<terms.length; i++ ) { - writeTerm( terms[i] ); - writeInt( positions[i] ); - } - return; - } - if( obj instanceof NumericRangeQuery ) { - writeByte(TYPE_QUERY_NUMERIC_RANGE); - NumericRangeQuery query = (NumericRangeQuery)obj; - writeUTF( query.getField() ); - writeObject( query.getMin() ); - writeObject( query.getMax() ); - writeBoolean( query.includesMin() ); - writeBoolean( query.includesMax() ); - return; - } - if( obj instanceof BooleanQuery ) { - writeByte(TYPE_QUERY_BOOLEAN); - BooleanQuery query = (BooleanQuery)obj; - BooleanClause[] a = query.getClauses(); - writeInt(a.length); - for( BooleanClause bc : a ) { - writeQuery( bc.getQuery() ); - writeUTF( bc.getOccur().name() ); - } - return; - } - throw new IllegalArgumentException("invalid type for "+obj); - } - - public void writeByteArray(byte[] bytes) throws IOException { - writeInt(bytes.length); - write(bytes); - } - - public void writeList(List list) throws IOException { - writeInt(list.size()); - for( Object obj : list ) { - writeObject(obj); - } - } - - public void writeMap(Map map) throws IOException { - writeInt(map.size()); - for( Object obj : map.entrySet() ) { - Map.Entry entry = (Map.Entry)obj; - writeObject( entry.getKey() ); - writeObject( entry.getValue() ); - } - } - - public void writeQuery(Query query) throws IOException { - writeObject(query); - } - - public void writeBytesRef(BytesRef br) throws IOException { - writeInt(br.length); - write(br.bytes,0,br.length); - } - - public void writeTerm(Term term) throws IOException { - writeUTF(term.field()); - writeBytesRef( term.bytes() ); - } - }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/goodjava/lucene/logging/LogOutputStream.java Fri May 01 16:09:35 2020 -0600 @@ -0,0 +1,196 @@ +package goodjava.lucene.logging; + +import java.io.OutputStream; +import java.io.DataOutputStream; +import java.io.RandomAccessFile; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import org.apache.lucene.index.Term; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.TermQuery; +import org.apache.lucene.search.PrefixQuery; +import org.apache.lucene.search.WildcardQuery; +import org.apache.lucene.search.TermRangeQuery; +import org.apache.lucene.search.PhraseQuery; +import org.apache.lucene.search.NumericRangeQuery; +import org.apache.lucene.search.BooleanQuery; +import org.apache.lucene.search.BooleanClause; +import org.apache.lucene.util.BytesRef; +import goodjava.logging.Logger; +import goodjava.logging.LoggerFactory; + + +public class LogOutputStream extends DataOutputStream { + private static final Logger logger = LoggerFactory.getLogger(LogOutputStream.class); + public final LogFile logFile; + private final RandomAccessFile raf; + + protected LogOutputStream(LogFile logFile,RandomAccessFile raf,OutputStream out) throws IOException { + super(out); + this.logFile = logFile; + this.raf = raf; + raf.seek(logFile.end); + } + + public void commit() throws IOException { + flush(); + long end = raf.getFilePointer(); + raf.seek(0L); + raf.writeLong(end); + logFile.end = end; + raf.seek(end); + } + + public void rollback() throws IOException { + flush(); + raf.seek(logFile.end); + } + + public void writeObject(Object obj) throws IOException { + if( obj==null ) { + writeByte(LogFile.TYPE_NULL); + return; + } + if( obj instanceof String ) { + writeByte(LogFile.TYPE_STRING); + writeUTF((String)obj); + return; + } + if( obj instanceof Integer ) { + writeByte(LogFile.TYPE_INT); + writeInt((Integer)obj); + return; + } + if( obj instanceof Long ) { + writeByte(LogFile.TYPE_LONG); + writeLong((Long)obj); + return; + } + if( obj instanceof Float ) { + writeByte(LogFile.TYPE_FLOAT); + writeFloat((Float)obj); + return; + } + if( obj instanceof Double ) { + writeByte(LogFile.TYPE_DOUBLE); + writeDouble((Double)obj); + return; + } + if( obj instanceof byte[] ) { + writeByte(LogFile.TYPE_BYTES); + writeByteArray((byte[])obj); + return; + } + if( obj instanceof List ) { + writeByte(LogFile.TYPE_LIST); + writeList((List)obj); + return; + } + if( obj instanceof MatchAllDocsQuery ) { + writeByte(LogFile.TYPE_QUERY_MATCH_ALL_DOCS); + return; + } + if( obj instanceof TermQuery ) { + writeByte(LogFile.TYPE_QUERY_TERM); + TermQuery query = (TermQuery)obj; + writeTerm( query.getTerm() ); + return; + } + if( obj instanceof PrefixQuery ) { + writeByte(LogFile.TYPE_QUERY_PREFIX); + PrefixQuery query = (PrefixQuery)obj; + writeTerm( query.getPrefix() ); + return; + } + if( obj instanceof WildcardQuery ) { + writeByte(LogFile.TYPE_QUERY_TERM_RANGE); + WildcardQuery query = (WildcardQuery)obj; + writeTerm( query.getTerm() ); + return; + } + if( obj instanceof TermRangeQuery ) { + writeByte(LogFile.TYPE_QUERY_TERM_RANGE); + TermRangeQuery query = (TermRangeQuery)obj; + writeUTF( query.getField() ); + writeBytesRef( query.getLowerTerm() ); + writeBytesRef( query.getUpperTerm() ); + writeBoolean( query.includesLower() ); + writeBoolean( query.includesUpper() ); + return; + } + if( obj instanceof PhraseQuery ) { + writeByte(LogFile.TYPE_QUERY_PHRASE); + PhraseQuery query = (PhraseQuery)obj; + Term[] terms = query.getTerms(); + int[] positions = query.getPositions(); + if( terms.length != positions.length ) + throw new RuntimeException(); + writeInt( terms.length ); + for( int i=0; i<terms.length; i++ ) { + writeTerm( terms[i] ); + writeInt( positions[i] ); + } + return; + } + if( obj instanceof NumericRangeQuery ) { + writeByte(LogFile.TYPE_QUERY_NUMERIC_RANGE); + NumericRangeQuery query = (NumericRangeQuery)obj; + writeUTF( query.getField() ); + writeObject( query.getMin() ); + writeObject( query.getMax() ); + writeBoolean( query.includesMin() ); + writeBoolean( query.includesMax() ); + return; + } + if( obj instanceof BooleanQuery ) { + writeByte(LogFile.TYPE_QUERY_BOOLEAN); + BooleanQuery query = (BooleanQuery)obj; + BooleanClause[] a = query.getClauses(); + writeInt(a.length); + for( BooleanClause bc : a ) { + writeQuery( bc.getQuery() ); + writeUTF( bc.getOccur().name() ); + } + return; + } + throw new IllegalArgumentException("invalid type for "+obj); + } + + public void writeByteArray(byte[] bytes) throws IOException { + writeInt(bytes.length); + write(bytes); + } + + public void writeList(List list) throws IOException { + writeInt(list.size()); + for( Object obj : list ) { + writeObject(obj); + } + } + + public void writeMap(Map map) throws IOException { + writeInt(map.size()); + for( Object obj : map.entrySet() ) { + Map.Entry entry = (Map.Entry)obj; + writeObject( entry.getKey() ); + writeObject( entry.getValue() ); + } + } + + public void writeQuery(Query query) throws IOException { + writeObject(query); + } + + public void writeBytesRef(BytesRef br) throws IOException { + writeInt(br.length); + write(br.bytes,0,br.length); + } + + public void writeTerm(Term term) throws IOException { + writeUTF(term.field()); + writeBytesRef( term.bytes() ); + } + +}
--- a/src/goodjava/lucene/logging/LoggingIndexWriter.java Fri May 01 11:23:29 2020 -0600 +++ b/src/goodjava/lucene/logging/LoggingIndexWriter.java Fri May 01 16:09:35 2020 -0600 @@ -47,6 +47,7 @@ public final LuceneIndexWriter indexWriter; private final File logDir; private final List<LogFile> logs = new ArrayList<LogFile>(); + private LogOutputStream log; private final File index; private boolean isMerging = false; @@ -64,9 +65,10 @@ final int n = dis.readInt(); for( int i=0; i<n; i++ ) { File file = new File( logDir, dis.readUTF() ); - logs.add( LogFile.newLogFile(file) ); + logs.add( new LogFile(file) ); } deleteUnusedFiles(); + setLog(); return; } } finally { @@ -76,6 +78,12 @@ newLogs(); } + private void setLog() throws IOException { + if( log != null ) + log.close(); + log = logs.get(logs.size()-1).output(); + } + public synchronized boolean isMerging() { return isMerging; } @@ -94,10 +102,12 @@ } logLucene( System.currentTimeMillis(), logs.get(0), indexWriter ); writeIndex(); + setLog(); logger.info("done building new logs"); } - private static void logLucene(long time,LogFile log,LuceneIndexWriter indexWriter) throws IOException { + private static void logLucene(long time,LogFile logLucene,LuceneIndexWriter indexWriter) throws IOException { + LogOutputStream log = logLucene.output(); IndexReader reader = indexWriter.openReader(); final IndexSearcher searcher = new IndexSearcher(reader); Query query = new MatchAllDocsQuery(); @@ -112,6 +122,7 @@ }); reader.close(); log.commit(); + log.close(); } private LogFile newLogFile() throws IOException { @@ -119,7 +130,7 @@ do { file = new File(logDir,"_"+rnd.nextInt(100)+".log"); } while( file.exists() ); - return LogFile.newLogFile(file); + return new LogFile(file); } private void deleteUnusedFiles() throws IOException { @@ -306,25 +317,21 @@ return col.total; } - private LogFile log() { - return logs.get(logs.size()-1); - } - public synchronized void close() throws IOException { indexWriter.close(); - LogFile log = log(); log.commit(); + log.close(); } public synchronized void commit() throws IOException { indexWriter.commit(); - LogFile log = log(); log.commit(); if( isMerging ) return; - if( log.end() > logs.get(0).end() ) { + if( log.logFile.end() > logs.get(0).end() ) { logs.add( newLogFile() ); writeIndex(); + setLog(); } if( logs.size() > 3 ) { isMerging = true; @@ -335,34 +342,29 @@ public synchronized void rollback() throws IOException { indexWriter.rollback(); - LogFile log = log(); log.rollback(); } public synchronized void deleteAll() throws IOException { indexWriter.deleteAll(); - LogFile log = log(); - writeOp(log,OP_DELETE_ALL); + writeOp(OP_DELETE_ALL); } public synchronized void deleteDocuments(Query query) throws IOException { indexWriter.deleteDocuments(query); - LogFile log = log(); - writeOp(log,OP_DELETE_DOCUMENTS); + writeOp(OP_DELETE_DOCUMENTS); log.writeQuery(query); } public synchronized void addDocument(Map<String,Object> storedFields) throws IOException { indexWriter.addDocument(storedFields); - LogFile log = log(); - writeOp(log,OP_ADD_DOCUMENT); + writeOp(OP_ADD_DOCUMENT); log.writeMap(storedFields); } public synchronized void updateDocument(String keyFieldName,Map<String,Object> storedFields) throws IOException { indexWriter.updateDocument(keyFieldName,storedFields); - LogFile log = log(); - writeOp(log,OP_UPDATE_DOCUMENT); + writeOp(OP_UPDATE_DOCUMENT); log.writeUTF(keyFieldName); log.writeMap(storedFields); } @@ -371,7 +373,7 @@ indexWriter.reindexDocuments(keyFieldName,query); } - private void writeOp(LogFile log,int op) throws IOException { + private void writeOp(int op) throws IOException { log.writeLong(System.currentTimeMillis()); log.writeByte(op); }