changeset 1486:2469aa31f31b

LogOutputStream
author Franklin Schmidt <fschmidt@gmail.com>
date Fri, 01 May 2020 16:09:35 -0600
parents 6a24c8b33d6b
children 044a360c2300
files src/goodjava/lucene/logging/LogFile.java src/goodjava/lucene/logging/LogOutputStream.java src/goodjava/lucene/logging/LoggingIndexWriter.java
diffstat 3 files changed, 234 insertions(+), 208 deletions(-) [+]
line wrap: on
line diff
--- a/src/goodjava/lucene/logging/LogFile.java	Fri May 01 11:23:29 2020 -0600
+++ b/src/goodjava/lucene/logging/LogFile.java	Fri May 01 16:09:35 2020 -0600
@@ -4,49 +4,24 @@
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.BufferedOutputStream;
-import java.io.DataOutputStream;
 import java.io.RandomAccessFile;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import org.apache.lucene.index.Term;
-import org.apache.lucene.search.Query;
-import org.apache.lucene.search.MatchAllDocsQuery;
-import org.apache.lucene.search.TermQuery;
-import org.apache.lucene.search.PrefixQuery;
-import org.apache.lucene.search.WildcardQuery;
-import org.apache.lucene.search.TermRangeQuery;
-import org.apache.lucene.search.PhraseQuery;
-import org.apache.lucene.search.NumericRangeQuery;
-import org.apache.lucene.search.BooleanQuery;
-import org.apache.lucene.search.BooleanClause;
-import org.apache.lucene.util.BytesRef;
 import goodjava.logging.Logger;
 import goodjava.logging.LoggerFactory;
 import goodjava.io.LimitedInputStream;
 import goodjava.io.BufferedInputStream;
 
 
-public class LogFile extends DataOutputStream {
+public class LogFile {
 	private static final Logger logger = LoggerFactory.getLogger(LogFile.class);
 	public final File file;
-	private final RandomAccessFile raf;
-	private long end;
+	long end;
 
-	public static LogFile newLogFile(File file) throws IOException {
+	public LogFile(File file) throws IOException {
+		this.file = file;
 		RandomAccessFile raf = new RandomAccessFile(file,"rwd");
-		OutputStream out = new FileOutputStream(raf.getFD());
-		out = new BufferedOutputStream(out);
-		return new LogFile(file,raf,out);
-	}
-
-	protected LogFile(File file,RandomAccessFile raf,OutputStream out) throws IOException {
-		super(out);
-		this.file = file;
-		this.raf = raf;
-
 		if( raf.length() == 0 ) {
 			end = 8;
 			raf.writeLong(end);
@@ -55,6 +30,7 @@
 			end = raf.readLong();
 			raf.seek(end);
 		}
+		raf.close();
 	}
 
 	public String toString() {
@@ -65,6 +41,17 @@
 		return end;
 	}
 
+	public LogOutputStream output() throws IOException {
+		RandomAccessFile raf = new RandomAccessFile(file,"rwd");
+		OutputStream out = new FileOutputStream(raf.getFD());
+		out = new BufferedOutputStream(out);
+		return newLogOutputStream(raf,out);
+	}
+
+	protected LogOutputStream newLogOutputStream(RandomAccessFile raf,OutputStream out) throws IOException {
+		return new LogOutputStream(this,raf,out);
+	}
+
 	public LogInputStream input() throws IOException {
 		InputStream in = new FileInputStream(file);
 		in = new LimitedInputStream(in,end);
@@ -78,19 +65,6 @@
 		return new LogInputStream(in);
 	}
 
-	public void commit() throws IOException {
-		flush();
-		end = raf.getFilePointer();
-		raf.seek(0L);
-		raf.writeLong(end);
-		raf.seek(end);
-	}
-
-	public void rollback() throws IOException {
-		flush();
-		raf.seek(end);
-	}
-
 	static final int TYPE_NULL = 0;
 	static final int TYPE_STRING = 1;
 	static final int TYPE_INT = 2;
@@ -107,150 +81,4 @@
 	static final int TYPE_QUERY_PHRASE = 13;
 	static final int TYPE_QUERY_NUMERIC_RANGE = 14;
 	static final int TYPE_QUERY_BOOLEAN = 15;
-
-	public void writeObject(Object obj) throws IOException {
-		if( obj==null ) {
-			writeByte(TYPE_NULL);
-			return;
-		}
-		if( obj instanceof String ) {
-			writeByte(TYPE_STRING);
-			writeUTF((String)obj);
-			return;
-		}
-		if( obj instanceof Integer ) {
-			writeByte(TYPE_INT);
-			writeInt((Integer)obj);
-			return;
-		}
-		if( obj instanceof Long ) {
-			writeByte(TYPE_LONG);
-			writeLong((Long)obj);
-			return;
-		}
-		if( obj instanceof Float ) {
-			writeByte(TYPE_FLOAT);
-			writeFloat((Float)obj);
-			return;
-		}
-		if( obj instanceof Double ) {
-			writeByte(TYPE_DOUBLE);
-			writeDouble((Double)obj);
-			return;
-		}
-		if( obj instanceof byte[] ) {
-			writeByte(TYPE_BYTES);
-			writeByteArray((byte[])obj);
-			return;
-		}
-		if( obj instanceof List ) {
-			writeByte(TYPE_LIST);
-			writeList((List)obj);
-			return;
-		}
-		if( obj instanceof MatchAllDocsQuery ) {
-			writeByte(TYPE_QUERY_MATCH_ALL_DOCS);
-			return;
-		}
-		if( obj instanceof TermQuery ) {
-			writeByte(TYPE_QUERY_TERM);
-			TermQuery query = (TermQuery)obj;
-			writeTerm( query.getTerm() );
-			return;
-		}
-		if( obj instanceof PrefixQuery ) {
-			writeByte(TYPE_QUERY_PREFIX);
-			PrefixQuery query = (PrefixQuery)obj;
-			writeTerm( query.getPrefix() );
-			return;
-		}
-		if( obj instanceof WildcardQuery ) {
-			writeByte(TYPE_QUERY_TERM_RANGE);
-			WildcardQuery query = (WildcardQuery)obj;
-			writeTerm( query.getTerm() );
-			return;
-		}
-		if( obj instanceof TermRangeQuery ) {
-			writeByte(TYPE_QUERY_TERM_RANGE);
-			TermRangeQuery query = (TermRangeQuery)obj;
-			writeUTF( query.getField() );
-			writeBytesRef( query.getLowerTerm() );
-			writeBytesRef( query.getUpperTerm() );
-			writeBoolean( query.includesLower() );
-			writeBoolean( query.includesUpper() );
-			return;
-		}
-		if( obj instanceof PhraseQuery ) {
-			writeByte(TYPE_QUERY_PHRASE);
-			PhraseQuery query = (PhraseQuery)obj;
-			Term[] terms = query.getTerms();
-			int[] positions = query.getPositions();
-			if( terms.length != positions.length )
-				throw new RuntimeException();
-			writeInt( terms.length );
-			for( int i=0; i<terms.length; i++ ) {
-				writeTerm( terms[i] );
-				writeInt( positions[i] );
-			}
-			return;
-		}
-		if( obj instanceof NumericRangeQuery ) {
-			writeByte(TYPE_QUERY_NUMERIC_RANGE);
-			NumericRangeQuery query = (NumericRangeQuery)obj;
-			writeUTF( query.getField() );
-			writeObject( query.getMin() );
-			writeObject( query.getMax() );
-			writeBoolean( query.includesMin() );
-			writeBoolean( query.includesMax() );
-			return;
-		}
-		if( obj instanceof BooleanQuery ) {
-			writeByte(TYPE_QUERY_BOOLEAN);
-			BooleanQuery query = (BooleanQuery)obj;
-			BooleanClause[] a = query.getClauses();
-			writeInt(a.length);
-			for( BooleanClause bc : a ) {
-				writeQuery( bc.getQuery() );
-				writeUTF( bc.getOccur().name() );
-			}
-			return;
-		}
-		throw new IllegalArgumentException("invalid type for "+obj);
-	}
-
-	public void writeByteArray(byte[] bytes) throws IOException {
-		writeInt(bytes.length);
-		write(bytes);
-	}
-
-	public void writeList(List list) throws IOException {
-		writeInt(list.size());
-		for( Object obj : list ) {
-			writeObject(obj);
-		}
-	}
-
-	public void writeMap(Map map) throws IOException {
-		writeInt(map.size());
-		for( Object obj : map.entrySet() ) {
-			Map.Entry entry = (Map.Entry)obj;
-			writeObject( entry.getKey() );
-			writeObject( entry.getValue() );
-		}
-	}
-
-	public void writeQuery(Query query) throws IOException {
-		writeObject(query);
-	}
-
-	public void writeBytesRef(BytesRef br) throws IOException {
-		writeInt(br.length);
-		write(br.bytes,0,br.length);
-	}
-
-	public void writeTerm(Term term) throws IOException {
-		writeUTF(term.field());
-		writeBytesRef( term.bytes() );
-	}
-
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/goodjava/lucene/logging/LogOutputStream.java	Fri May 01 16:09:35 2020 -0600
@@ -0,0 +1,196 @@
+package goodjava.lucene.logging;
+
+import java.io.OutputStream;
+import java.io.DataOutputStream;
+import java.io.RandomAccessFile;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.MatchAllDocsQuery;
+import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.search.PrefixQuery;
+import org.apache.lucene.search.WildcardQuery;
+import org.apache.lucene.search.TermRangeQuery;
+import org.apache.lucene.search.PhraseQuery;
+import org.apache.lucene.search.NumericRangeQuery;
+import org.apache.lucene.search.BooleanQuery;
+import org.apache.lucene.search.BooleanClause;
+import org.apache.lucene.util.BytesRef;
+import goodjava.logging.Logger;
+import goodjava.logging.LoggerFactory;
+
+
+public class LogOutputStream extends DataOutputStream {
+	private static final Logger logger = LoggerFactory.getLogger(LogOutputStream.class);
+	public final LogFile logFile;
+	private final RandomAccessFile raf;
+
+	protected LogOutputStream(LogFile logFile,RandomAccessFile raf,OutputStream out) throws IOException {
+		super(out);
+		this.logFile = logFile;
+		this.raf = raf;
+		raf.seek(logFile.end);
+	}
+
+	public void commit() throws IOException {
+		flush();
+		long end = raf.getFilePointer();
+		raf.seek(0L);
+		raf.writeLong(end);
+		logFile.end = end;
+		raf.seek(end);
+	}
+
+	public void rollback() throws IOException {
+		flush();
+		raf.seek(logFile.end);
+	}
+
+	public void writeObject(Object obj) throws IOException {
+		if( obj==null ) {
+			writeByte(LogFile.TYPE_NULL);
+			return;
+		}
+		if( obj instanceof String ) {
+			writeByte(LogFile.TYPE_STRING);
+			writeUTF((String)obj);
+			return;
+		}
+		if( obj instanceof Integer ) {
+			writeByte(LogFile.TYPE_INT);
+			writeInt((Integer)obj);
+			return;
+		}
+		if( obj instanceof Long ) {
+			writeByte(LogFile.TYPE_LONG);
+			writeLong((Long)obj);
+			return;
+		}
+		if( obj instanceof Float ) {
+			writeByte(LogFile.TYPE_FLOAT);
+			writeFloat((Float)obj);
+			return;
+		}
+		if( obj instanceof Double ) {
+			writeByte(LogFile.TYPE_DOUBLE);
+			writeDouble((Double)obj);
+			return;
+		}
+		if( obj instanceof byte[] ) {
+			writeByte(LogFile.TYPE_BYTES);
+			writeByteArray((byte[])obj);
+			return;
+		}
+		if( obj instanceof List ) {
+			writeByte(LogFile.TYPE_LIST);
+			writeList((List)obj);
+			return;
+		}
+		if( obj instanceof MatchAllDocsQuery ) {
+			writeByte(LogFile.TYPE_QUERY_MATCH_ALL_DOCS);
+			return;
+		}
+		if( obj instanceof TermQuery ) {
+			writeByte(LogFile.TYPE_QUERY_TERM);
+			TermQuery query = (TermQuery)obj;
+			writeTerm( query.getTerm() );
+			return;
+		}
+		if( obj instanceof PrefixQuery ) {
+			writeByte(LogFile.TYPE_QUERY_PREFIX);
+			PrefixQuery query = (PrefixQuery)obj;
+			writeTerm( query.getPrefix() );
+			return;
+		}
+		if( obj instanceof WildcardQuery ) {
+			writeByte(LogFile.TYPE_QUERY_TERM_RANGE);
+			WildcardQuery query = (WildcardQuery)obj;
+			writeTerm( query.getTerm() );
+			return;
+		}
+		if( obj instanceof TermRangeQuery ) {
+			writeByte(LogFile.TYPE_QUERY_TERM_RANGE);
+			TermRangeQuery query = (TermRangeQuery)obj;
+			writeUTF( query.getField() );
+			writeBytesRef( query.getLowerTerm() );
+			writeBytesRef( query.getUpperTerm() );
+			writeBoolean( query.includesLower() );
+			writeBoolean( query.includesUpper() );
+			return;
+		}
+		if( obj instanceof PhraseQuery ) {
+			writeByte(LogFile.TYPE_QUERY_PHRASE);
+			PhraseQuery query = (PhraseQuery)obj;
+			Term[] terms = query.getTerms();
+			int[] positions = query.getPositions();
+			if( terms.length != positions.length )
+				throw new RuntimeException();
+			writeInt( terms.length );
+			for( int i=0; i<terms.length; i++ ) {
+				writeTerm( terms[i] );
+				writeInt( positions[i] );
+			}
+			return;
+		}
+		if( obj instanceof NumericRangeQuery ) {
+			writeByte(LogFile.TYPE_QUERY_NUMERIC_RANGE);
+			NumericRangeQuery query = (NumericRangeQuery)obj;
+			writeUTF( query.getField() );
+			writeObject( query.getMin() );
+			writeObject( query.getMax() );
+			writeBoolean( query.includesMin() );
+			writeBoolean( query.includesMax() );
+			return;
+		}
+		if( obj instanceof BooleanQuery ) {
+			writeByte(LogFile.TYPE_QUERY_BOOLEAN);
+			BooleanQuery query = (BooleanQuery)obj;
+			BooleanClause[] a = query.getClauses();
+			writeInt(a.length);
+			for( BooleanClause bc : a ) {
+				writeQuery( bc.getQuery() );
+				writeUTF( bc.getOccur().name() );
+			}
+			return;
+		}
+		throw new IllegalArgumentException("invalid type for "+obj);
+	}
+
+	public void writeByteArray(byte[] bytes) throws IOException {
+		writeInt(bytes.length);
+		write(bytes);
+	}
+
+	public void writeList(List list) throws IOException {
+		writeInt(list.size());
+		for( Object obj : list ) {
+			writeObject(obj);
+		}
+	}
+
+	public void writeMap(Map map) throws IOException {
+		writeInt(map.size());
+		for( Object obj : map.entrySet() ) {
+			Map.Entry entry = (Map.Entry)obj;
+			writeObject( entry.getKey() );
+			writeObject( entry.getValue() );
+		}
+	}
+
+	public void writeQuery(Query query) throws IOException {
+		writeObject(query);
+	}
+
+	public void writeBytesRef(BytesRef br) throws IOException {
+		writeInt(br.length);
+		write(br.bytes,0,br.length);
+	}
+
+	public void writeTerm(Term term) throws IOException {
+		writeUTF(term.field());
+		writeBytesRef( term.bytes() );
+	}
+
+}
--- a/src/goodjava/lucene/logging/LoggingIndexWriter.java	Fri May 01 11:23:29 2020 -0600
+++ b/src/goodjava/lucene/logging/LoggingIndexWriter.java	Fri May 01 16:09:35 2020 -0600
@@ -47,6 +47,7 @@
 	public final LuceneIndexWriter indexWriter;
 	private final File logDir;
 	private final List<LogFile> logs = new ArrayList<LogFile>();
+	private LogOutputStream log;
 	private final File index;
 	private boolean isMerging = false;
 
@@ -64,9 +65,10 @@
 					final int n = dis.readInt();
 					for( int i=0; i<n; i++ ) {
 						File file = new File( logDir, dis.readUTF() );
-						logs.add( LogFile.newLogFile(file) );
+						logs.add( new LogFile(file) );
 					}
 					deleteUnusedFiles();
+					setLog();
 					return;
 				}
 			} finally {
@@ -76,6 +78,12 @@
 		newLogs();
 	}
 
+	private void setLog() throws IOException {
+		if( log != null )
+			log.close();
+		log = logs.get(logs.size()-1).output();
+	}
+
 	public synchronized boolean isMerging() {
 		return isMerging;
 	}
@@ -94,10 +102,12 @@
 		}
 		logLucene( System.currentTimeMillis(), logs.get(0), indexWriter );
 		writeIndex();
+		setLog();
 		logger.info("done building new logs");
 	}
 
-	private static void logLucene(long time,LogFile log,LuceneIndexWriter indexWriter) throws IOException {
+	private static void logLucene(long time,LogFile logLucene,LuceneIndexWriter indexWriter) throws IOException {
+		LogOutputStream log = logLucene.output();
 		IndexReader reader = indexWriter.openReader();
 		final IndexSearcher searcher = new IndexSearcher(reader);
 		Query query = new MatchAllDocsQuery();
@@ -112,6 +122,7 @@
 		});
 		reader.close();
 		log.commit();
+		log.close();
 	}
 
 	private LogFile newLogFile() throws IOException {
@@ -119,7 +130,7 @@
 		do {
 			file = new File(logDir,"_"+rnd.nextInt(100)+".log");
 		} while( file.exists() );
-		return LogFile.newLogFile(file);
+		return new LogFile(file);
 	}
 
 	private void deleteUnusedFiles() throws IOException {
@@ -306,25 +317,21 @@
 		return col.total;
 	}
 
-	private LogFile log() {
-		return logs.get(logs.size()-1);
-	}
-
 	public synchronized void close() throws IOException {
 		indexWriter.close();
-		LogFile log = log();
 		log.commit();
+		log.close();
 	}
 
 	public synchronized void commit() throws IOException {
 		indexWriter.commit();
-		LogFile log = log();
 		log.commit();
 		if( isMerging )
 			return;
-		if( log.end() > logs.get(0).end() ) {
+		if( log.logFile.end() > logs.get(0).end() ) {
 			logs.add( newLogFile() );
 			writeIndex();
+			setLog();
 		}
 		if( logs.size() > 3 ) {
 			isMerging = true;
@@ -335,34 +342,29 @@
 
 	public synchronized void rollback() throws IOException {
 		indexWriter.rollback();
-		LogFile log = log();
 		log.rollback();
 	}
 
 	public synchronized void deleteAll() throws IOException {
 		indexWriter.deleteAll();
-		LogFile log = log();
-		writeOp(log,OP_DELETE_ALL);
+		writeOp(OP_DELETE_ALL);
 	}
 
 	public synchronized void deleteDocuments(Query query) throws IOException {
 		indexWriter.deleteDocuments(query);
-		LogFile log = log();
-		writeOp(log,OP_DELETE_DOCUMENTS);
+		writeOp(OP_DELETE_DOCUMENTS);
 		log.writeQuery(query);
 	}
 
 	public synchronized void addDocument(Map<String,Object> storedFields) throws IOException {
 		indexWriter.addDocument(storedFields);
-		LogFile log = log();
-		writeOp(log,OP_ADD_DOCUMENT);
+		writeOp(OP_ADD_DOCUMENT);
 		log.writeMap(storedFields);
 	}
 
 	public synchronized void updateDocument(String keyFieldName,Map<String,Object> storedFields) throws IOException {
 		indexWriter.updateDocument(keyFieldName,storedFields);
-		LogFile log = log();
-		writeOp(log,OP_UPDATE_DOCUMENT);
+		writeOp(OP_UPDATE_DOCUMENT);
 		log.writeUTF(keyFieldName);
 		log.writeMap(storedFields);
 	}
@@ -371,7 +373,7 @@
 		indexWriter.reindexDocuments(keyFieldName,query);
 	}
 
-	private void writeOp(LogFile log,int op) throws IOException {
+	private void writeOp(int op) throws IOException {
 		log.writeLong(System.currentTimeMillis());
 		log.writeByte(op);
 	}