diff src/goodjava/lucene/logging/LoggingIndexWriter.java @ 1465:5e3870618377

lucene.logging dir
author Franklin Schmidt <fschmidt@gmail.com>
date Sun, 12 Apr 2020 15:59:57 -0600
parents e5d48b85351c
children 6c6ce14db6a8
line wrap: on
line diff
--- a/src/goodjava/lucene/logging/LoggingIndexWriter.java	Fri Apr 03 10:04:52 2020 -0600
+++ b/src/goodjava/lucene/logging/LoggingIndexWriter.java	Sun Apr 12 15:59:57 2020 -0600
@@ -1,94 +1,364 @@
 package goodjava.lucene.logging;
 
+import java.io.File;
+import java.io.RandomAccessFile;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.DataInputStream;
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.util.Map;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Random;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.search.IndexSearcher;
 import org.apache.lucene.search.Query;
+import org.apache.lucene.search.MatchAllDocsQuery;
+import org.apache.lucene.search.TopDocs;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FSDirectory;
 import goodjava.lucene.api.GoodIndexWriter;
+import goodjava.lucene.api.LuceneIndexWriter;
+import goodjava.lucene.api.GoodCollector;
+import goodjava.lucene.api.LuceneUtils;
+import goodjava.logging.Logger;
+import goodjava.logging.LoggerFactory;
 
 
-public class LoggingIndexWriter implements GoodIndexWriter {
+public final class LoggingIndexWriter implements GoodIndexWriter {
+	private static final Logger logger = LoggerFactory.getLogger(LoggingIndexWriter.class);
+	private static final int version = 1;
 	private static final int OP_DELETE_ALL = 1;
 	private static final int OP_DELETE_DOCUMENTS = 2;
 	private static final int OP_ADD_DOCUMENT = 3;
 	private static final int OP_UPDATE_DOCUMENT = 4;
+	private static final Random rnd = new Random();
 
-	public final GoodIndexWriter indexWriter;
-	private final LogFile logFile;
+	public final LuceneIndexWriter indexWriter;
+	private final File logDir;
+	private final List<LogFile> logs = new ArrayList<LogFile>();
+	private final File index;
+	private boolean isMerging = false;
 
-	public LoggingIndexWriter(GoodIndexWriter indexWriter) throws IOException {
+	public LoggingIndexWriter(LuceneIndexWriter indexWriter,File logDir) throws IOException {
 		this.indexWriter = indexWriter;
-		logFile = new LogFile("lucene.log","rw");
-		logFile.gotoStart();  // for now
+		this.logDir = logDir;
+		logDir.mkdirs();
+		if( !logDir.isDirectory() )
+			throw new RuntimeException();
+		index = new File(logDir,"index");
+		if( index.exists() ) {
+			DataInputStream dis = new DataInputStream(new FileInputStream(index));
+			try {
+				if( dis.readInt() == version ) {
+					final int n = dis.readInt();
+					for( int i=0; i<n; i++ ) {
+						File file = new File( logDir, dis.readUTF() );
+						logs.add( new LogFile(file,"rwd") );
+					}
+					deleteUnusedFiles();
+					log().gotoEnd();
+					return;
+				}
+			} finally {
+				dis.close();
+			}
+		}
+		for( int i=0; i<2; i++ ) {
+			logs.add( newLogFile() );
+		}
+		isMerging = true;
+		new Thread(new Runnable(){public void run(){
+			try {
+				logLucene( System.currentTimeMillis(), logs.get(0), indexWriter );
+				synchronized(LoggingIndexWriter.this) {
+					writeIndex();
+				}
+			} catch(IOException e) {
+				throw new RuntimeException(e);
+			} finally {
+				synchronized(LoggingIndexWriter.this) {
+					isMerging = false;
+				}
+			}
+		}}).start();
 	}
 
-	public void close() throws IOException {
-		indexWriter.close();
-		logFile.commit();
+	private static void logLucene(long time,LogFile log,LuceneIndexWriter indexWriter) throws IOException {
+		IndexReader reader = indexWriter.openReader();
+		final IndexSearcher searcher = new IndexSearcher(reader);
+		Query query = new MatchAllDocsQuery();
+		searcher.search( query, new GoodCollector(){
+			public void collectDoc(int iDoc) throws IOException {
+				Document doc = searcher.doc(iDoc);
+				Map<String,Object> storedFields = LuceneUtils.toMap(doc);
+				log.writeLong(time);
+				log.writeByte(OP_ADD_DOCUMENT);
+				log.writeMap(storedFields);
+			}
+		});
+		reader.close();
+		log.commit();
+	}
+
+	private LogFile newLogFile() throws IOException {
+		File file;
+		do {
+			file = new File(logDir,"_"+rnd.nextInt(100)+".log");
+		} while( file.exists() );
+		return new LogFile(file,"rwd");
 	}
 
-	public void commit() throws IOException {
-		indexWriter.commit();
-		logFile.commit();
+	private void deleteUnusedFiles() {
+		Set<String> used = new HashSet<String>();
+		used.add( index.getName() );
+		for( LogFile lf : logs ) {
+			used.add( lf.file.getName() );
+		}
+		for( File f : logDir.listFiles() ) {
+			if( !used.contains(f.getName()) ) {
+				deleteFile(f);
+			}
+		}
 	}
 
-	public void rollback() throws IOException {
-		indexWriter.rollback();
-		logFile.gotoEnd();
+	private static void deleteFile(File file) {
+		if( file.isDirectory() ) {
+			for( File f : file.listFiles() ) {
+				deleteFile(f);
+			}
+		}
+		if( !file.delete() )
+			throw new RuntimeException(file.getName());
+	}
+
+	private void writeIndex() throws IOException {
+		ByteArrayOutputStream baos = new ByteArrayOutputStream();
+		DataOutputStream dos = new DataOutputStream(baos);
+		dos.writeInt(version);
+		dos.writeInt(logs.size());
+		for( LogFile lf : logs ) {
+			String fileName = lf.file.getName();
+			dos.writeUTF(fileName);
+		}
+		dos.close();
+		RandomAccessFile raf = new RandomAccessFile( index, "rwd" );
+		raf.write( baos.toByteArray() );
+		raf.close();
+		deleteUnusedFiles();
+		logger.info("writeIndex "+logs.toString());
 	}
 
-	public void deleteAll() throws IOException {
-		indexWriter.deleteAll();
-		logFile.writeByte(OP_DELETE_ALL);
+	private void mergeLogs() throws IOException {
+		logger.info("merge");
+		LogFile first = logs.get(0);
+		LogFile second = logs.get(1);
+		second.gotoEnd();
+		long lastTime = second.readLong();
+		File dirFile = new File(logDir,"merge");
+		if( dirFile.exists() )
+			throw new RuntimeException();
+		Directory dir = FSDirectory.open(dirFile);
+		LuceneIndexWriter mergeWriter = new LuceneIndexWriter( indexWriter.luceneVersion, dir, indexWriter.goodConfig );
+		playLog(first,mergeWriter);
+		playLog(second,mergeWriter);
+		mergeWriter.commit();
+		LogFile merge = newLogFile();
+		logLucene( lastTime, merge, mergeWriter );
+		mergeWriter.close();
+		synchronized(this) {
+			check();
+			logs.remove(0);
+			logs.set(0,merge);
+			writeIndex();
+			check();
+		}
 	}
+	private final Runnable mergeLogs = new Runnable() { public void run() {
+		try {
+			mergeLogs();
+/*
+		} catch(IOException e) {
+			throw new RuntimeException(e);
+*/
+		} catch(Exception e) {
+			e.printStackTrace();
+			System.exit(-1);
+		} finally {
+			synchronized(LoggingIndexWriter.this) {
+				isMerging = false;
+			}
+		}
+	} };
 
-	public void deleteDocuments(Query query) throws IOException {
-		indexWriter.deleteDocuments(query);
-		logFile.writeByte(OP_DELETE_DOCUMENTS);
-		logFile.writeQuery(query);
+	private void check() throws IOException {
+		File dirFile = new File(logDir,"check");
+		if( dirFile.exists() )
+			throw new RuntimeException();
+		Directory dir = FSDirectory.open(dirFile);
+		LuceneIndexWriter checkWriter = new LuceneIndexWriter( indexWriter.luceneVersion, dir, indexWriter.goodConfig );
+		playLog(checkWriter);
+		int nCheck = numDocs(checkWriter);
+		int nOrig = numDocs(indexWriter);
+		if( nCheck != nOrig ) {
+			logger.error("nCheck = "+nCheck);
+			logger.error("nOrig = "+nOrig);
+			//new Exception().printStackTrace();
+			Thread.dumpStack();
+			System.out.println();
+			System.out.println("indexWriter");
+			dump(indexWriter);
+			System.out.println("checkWriter");
+			dump(checkWriter);
+			System.exit(-1);
+		}
+		checkWriter.close();
+		deleteFile(dirFile);
 	}
 
-	public void addDocument(Map<String,Object> storedFields) throws IOException {
-		indexWriter.addDocument(storedFields);
-		logFile.writeByte(OP_ADD_DOCUMENT);
-		logFile.writeMap(storedFields);
+	private LogFile log() {
+		return logs.get(logs.size()-1);
+	}
+
+	public synchronized void close() throws IOException {
+		indexWriter.close();
+		LogFile log = log();
+		log.commit();
+	}
+
+	public synchronized void commit() throws IOException {
+		indexWriter.commit();
+		LogFile log = log();
+		log.commit();
+		if( isMerging )
+			return;
+		if( log.length() > logs.get(0).length() ) {
+			log.writeLong( System.currentTimeMillis() );
+			logs.add( newLogFile() );
+			writeIndex();
+		}
+		if( logs.size() > 3 ) {
+			isMerging = true;
+//			new Thread(mergeLogs).start();
+			mergeLogs.run();
+		}
 	}
 
-	public void updateDocument(String keyFieldName,Map<String,Object> storedFields) throws IOException {
-		indexWriter.updateDocument(keyFieldName,storedFields);
-		logFile.writeByte(OP_UPDATE_DOCUMENT);
-		logFile.writeUTF(keyFieldName);
-		logFile.writeMap(storedFields);
+	public synchronized void rollback() throws IOException {
+		indexWriter.rollback();
+		LogFile log = log();
+		log.gotoEnd();
+	}
+
+	public synchronized void deleteAll() throws IOException {
+		indexWriter.deleteAll();
+		LogFile log = log();
+		writeOp(log,OP_DELETE_ALL);
 	}
 
-	public void reindexDocuments(String keyFieldName,Query query) throws IOException {
+	public synchronized void deleteDocuments(Query query) throws IOException {
+		indexWriter.deleteDocuments(query);
+		LogFile log = log();
+		writeOp(log,OP_DELETE_DOCUMENTS);
+		log.writeQuery(query);
+	}
+
+	public synchronized void addDocument(Map<String,Object> storedFields) throws IOException {
+		indexWriter.addDocument(storedFields);
+		LogFile log = log();
+		writeOp(log,OP_ADD_DOCUMENT);
+		log.writeMap(storedFields);
+	}
+
+	public synchronized void updateDocument(String keyFieldName,Map<String,Object> storedFields) throws IOException {
+		indexWriter.updateDocument(keyFieldName,storedFields);
+		LogFile log = log();
+		writeOp(log,OP_UPDATE_DOCUMENT);
+		log.writeUTF(keyFieldName);
+		log.writeMap(storedFields);
+	}
+
+	public synchronized void reindexDocuments(String keyFieldName,Query query) throws IOException {
 		indexWriter.reindexDocuments(keyFieldName,query);
 	}
 
-	private void playOp() throws IOException {
-		int op = logFile.readByte();
+	private void writeOp(LogFile log,int op) throws IOException {
+		log.writeLong(System.currentTimeMillis());
+		log.writeByte(op);
+	}
+
+	public synchronized void playLog() throws IOException {
+		playLog(indexWriter);
+	}
+
+	private void playLog(LuceneIndexWriter indexWriter) throws IOException {
+		if( numDocs(indexWriter) != 0 )
+			throw new RuntimeException ("not empty");
+		for( LogFile log : logs ) {
+			playLog(log,indexWriter);
+		}
+		indexWriter.commit();
+	}
+
+	private static int numDocs(LuceneIndexWriter indexWriter) throws IOException {
+		IndexReader reader = indexWriter.openReader();
+		int n = reader.numDocs();
+		reader.close();
+		return n;
+	}
+
+	private static void playLog(LogFile log,LuceneIndexWriter indexWriter) throws IOException {
+		log.gotoStart();
+		while( log.hasMore() ) {
+			playOp(log,indexWriter);
+		}
+	}
+
+	private static void playOp(LogFile log,LuceneIndexWriter indexWriter) throws IOException {
+		log.readLong();  // time
+		int op = log.readByte();
 		switch(op) {
 		case OP_DELETE_ALL:
 			indexWriter.deleteAll();
 			return;
 		case OP_DELETE_DOCUMENTS:
-			indexWriter.deleteDocuments( logFile.readQuery() );
+			indexWriter.deleteDocuments( log.readQuery() );
 			return;
 		case OP_ADD_DOCUMENT:
-			indexWriter.addDocument( logFile.readMap() );
-			return;
+			{
+				Map storedFields = log.readMap();
+				indexWriter.addDocument(storedFields);
+				return;
+			}
 		case OP_UPDATE_DOCUMENT:
-			indexWriter.updateDocument( logFile.readUTF(), logFile.readMap() );
-			return;
+			{
+				String keyFieldName = log.readUTF();
+				Map storedFields = log.readMap();
+				indexWriter.updateDocument(keyFieldName,storedFields);
+				return;
+			}
 		default:
 			throw new RuntimeException("invalid op "+op);
 		}
 	}
 
-	public void playLog() throws IOException {
-		logFile.gotoStart();
-		while( logFile.hasMore() ) {
-			playOp();
+	private static void dump(LuceneIndexWriter indexWriter) throws IOException {
+		IndexReader reader = indexWriter.openReader();
+		IndexSearcher searcher = new IndexSearcher(reader);
+		Query query = new MatchAllDocsQuery();
+		TopDocs td = searcher.search(query,100);
+		System.out.println("totalHits = "+td.totalHits);
+		for( int i=0; i<td.scoreDocs.length; i++ ) {
+			Document doc = searcher.doc(td.scoreDocs[i].doc);
+			System.out.println(LuceneUtils.toMap(doc));
 		}
-		indexWriter.commit();
+		System.out.println();
+		reader.close();
 	}
+ 
 }