changeset 1544:35601f15ecc3

add lucene log tag and restore_from_log
author Franklin Schmidt <fschmidt@gmail.com>
date Sun, 20 Sep 2020 20:36:55 -0600 (2020-09-21)
parents 1db694d98003
children f46b81048a80
files src/goodjava/lucene/api/GoodIndexWriter.java src/goodjava/lucene/api/LuceneIndexWriter.java src/goodjava/lucene/logging/LoggingIndexWriter.java src/luan/modules/lucene/Lucene.luan src/luan/modules/lucene/LuceneIndex.java
diffstat 5 files changed, 109 insertions(+), 27 deletions(-) [+]
line wrap: on
line diff
--- a/src/goodjava/lucene/api/GoodIndexWriter.java	Sun Sep 20 16:45:31 2020 -0600
+++ b/src/goodjava/lucene/api/GoodIndexWriter.java	Sun Sep 20 20:36:55 2020 -0600
@@ -15,5 +15,6 @@
 	public void addDocument(Map<String,Object> storedFields) throws IOException;
 	public void updateDocument(String keyFieldName,Map<String,Object> storedFields) throws IOException;
 	public void reindexDocuments(String keyFieldName,Query query) throws IOException;
+	public void tag(String tag) throws IOException;
 	public IndexWriter getLuceneIndexWriter();
 }
--- a/src/goodjava/lucene/api/LuceneIndexWriter.java	Sun Sep 20 16:45:31 2020 -0600
+++ b/src/goodjava/lucene/api/LuceneIndexWriter.java	Sun Sep 20 20:36:55 2020 -0600
@@ -203,4 +203,7 @@
 		if( !status.clean )
 			logger.error("index not clean");
 	}
+
+	public void tag(String tag) throws IOException {}
+
 }
--- a/src/goodjava/lucene/logging/LoggingIndexWriter.java	Sun Sep 20 16:45:31 2020 -0600
+++ b/src/goodjava/lucene/logging/LoggingIndexWriter.java	Sun Sep 20 20:36:55 2020 -0600
@@ -44,9 +44,11 @@
 	private static final int OP_DELETE_DOCUMENTS = 2;
 	private static final int OP_ADD_DOCUMENT = 3;
 	private static final int OP_UPDATE_DOCUMENT = 4;
+	private static final int OP_TAG = 5;
 	private static final Random rnd = new Random();
 
 	public final LuceneIndexWriter indexWriter;
+	public boolean wasCreated;
 	private final File logDir;
 	protected final List<LogFile> logs = new ArrayList<LogFile>();
 	private LogOutputStream log;
@@ -71,6 +73,7 @@
 					}
 					deleteUnusedFiles();
 					setLog();
+					wasCreated = false;
 					return;
 				}
 			} finally {
@@ -78,6 +81,7 @@
 			}
 		}
 		newLogs();
+		wasCreated = true;
 	}
 
 	public IndexWriter getLuceneIndexWriter() {
@@ -201,8 +205,8 @@
 			throw new RuntimeException();
 		Directory dir = FSDirectory.open(dirFile);
 		LuceneIndexWriter mergeWriter = new LuceneIndexWriter( dir, indexWriter.goodConfig );
-		playLog( first.input(), mergeWriter );
-		playLog( second.input(), mergeWriter );
+		playLog( first.input(), mergeWriter, null );
+		playLog( second.input(), mergeWriter, null );
 		mergeWriter.commit();
 		LogFile merge = newLogFile();
 		logLucene( lastTime, merge, mergeWriter );
@@ -273,18 +277,17 @@
 			IoUtils.deleteRecursively(dirFile);
 			Directory dir = FSDirectory.open(dirFile);
 			LuceneIndexWriter checkWriter = new LuceneIndexWriter( dir, indexWriter.goodConfig );
-			playLogs(logReaders,checkWriter);
+			playLogs(logReaders,checkWriter,null);
 			//logger.info("check lucene");
 			IndexReader checkReader = checkWriter.openReader();
+			int nCheck = checkReader.numDocs();
+			int nOrig = indexReader.numDocs();
+			if( nCheck != nOrig ) {
+				logger.error("numDocs mismatch: lucene="+nOrig+" logs="+nCheck);
+				ok = false;
+			}
 			if( sortField == null ) {
-				int nCheck = checkReader.numDocs();
-				int nOrig = indexReader.numDocs();
-				if( nCheck != nOrig ) {
-					logger.error("numDocs mismatch: lucene="+nOrig+" logs="+nCheck);
-					ok = false;
-				}
-				//logger.info("numDocs="+nOrig);
-				if( hash(indexReader) != hash(checkReader) ) {
+				if( ok && hash(indexReader) != hash(checkReader) ) {
 					logger.error("hash mismatch");
 					ok = false;
 				}
@@ -413,6 +416,11 @@
 		log.writeMap(storedFields);
 	}
 
+	public synchronized void tag(String tag) throws IOException {
+		writeOp(OP_TAG);
+		log.writeUTF(tag);
+	}
+
 	public synchronized void reindexDocuments(String keyFieldName,Query query) throws IOException {
 		indexWriter.reindexDocuments(keyFieldName,query);
 	}
@@ -422,8 +430,9 @@
 		log.writeByte(op);
 	}
 
-	public synchronized void playLogs() throws IOException {
-		playLogs( logReaders(logs), indexWriter );
+	// return whether stopped at tag
+	public synchronized boolean playLogs(String upToTag) throws IOException {
+		return playLogs( logReaders(logs), indexWriter, upToTag );
 	}
 
 	private static List<LogInputStream> logReaders(List<LogFile> logs) throws IOException {
@@ -434,13 +443,20 @@
 		return logReaders;
 	}
 
-	private static void playLogs(List<LogInputStream> logReaders,LuceneIndexWriter indexWriter) throws IOException {
+	private static boolean playLogs(List<LogInputStream> logReaders,LuceneIndexWriter indexWriter,String upToTag)
+		throws IOException
+	{
 		if( numDocs(indexWriter) != 0 )
 			throw new RuntimeException ("not empty");
+		boolean rtn = false;
 		for( LogInputStream reader : logReaders ) {
-			playLog(reader,indexWriter);
+			if( playLog(reader,indexWriter,upToTag) ) {
+				rtn = true;
+				break;
+			}
 		}
 		indexWriter.commit();
+		return rtn;
 	}
 
 	private static int numDocs(LuceneIndexWriter indexWriter) throws IOException {
@@ -450,35 +466,51 @@
 		return n;
 	}
 
-	private static void playLog(LogInputStream in,LuceneIndexWriter indexWriter) throws IOException {
+	private static boolean playLog(LogInputStream in,LuceneIndexWriter indexWriter,String upToTag)
+		throws IOException
+	{
+		boolean rtn = false;
 		while( in.available() > 0 ) {
-			playOp(in,indexWriter);
+			if( playOp(in,indexWriter,upToTag) ) {
+				rtn = true;
+				break;
+			}
 		}
 		in.close();
+		return rtn;
 	}
 
-	private static void playOp(LogInputStream in,LuceneIndexWriter indexWriter) throws IOException {
+	private static boolean playOp(LogInputStream in,LuceneIndexWriter indexWriter,String upToTag) throws IOException {
 		in.readLong();  // time
 		int op = in.readByte();
 		switch(op) {
 		case OP_DELETE_ALL:
 			indexWriter.deleteAll();
-			return;
+			return false;
 		case OP_DELETE_DOCUMENTS:
-			indexWriter.deleteDocuments( in.readQuery() );
-			return;
+			{
+				Query query = in.readQuery();
+				//System.out.println("OP_DELETE_DOCUMENTS "+query);
+				indexWriter.deleteDocuments(query);
+				return false;
+			}
 		case OP_ADD_DOCUMENT:
 			{
 				Map storedFields = in.readMap();
 				indexWriter.addDocument(storedFields);
-				return;
+				return false;
 			}
 		case OP_UPDATE_DOCUMENT:
 			{
 				String keyFieldName = in.readUTF();
 				Map storedFields = in.readMap();
 				indexWriter.updateDocument(keyFieldName,storedFields);
-				return;
+				return false;
+			}
+		case OP_TAG:
+			{
+				String tag = in.readUTF();
+				return tag.equals(upToTag);
 			}
 		default:
 			throw new RuntimeException("invalid op "+op);
--- a/src/luan/modules/lucene/Lucene.luan	Sun Sep 20 16:45:31 2020 -0600
+++ b/src/luan/modules/lucene/Lucene.luan	Sun Sep 20 20:36:55 2020 -0600
@@ -72,14 +72,15 @@
 	index.ensure_open = java_index.ensure_open
 	index.highlighter = java_index.highlighter
 	index.count_tokens = java_index.count_tokens
-	--index.close = java_index.close
+	index.tag = java_index.tag
 
 	index.rebuild_log = java_index.rebuild_log
+	index.restore_from_log = java_index.restore_from_log
 
 	index.has_postgres_backup = java_index.hasPostgresBackup()
 	index.rebuild_postgres_backup = java_index.rebuild_postgres_backup
 	index.restore_from_postgres = java_index.restore_from_postgres
-	index.force_restore_from_postgres = java_index.force_restore_from_postgres
+	--index.force_restore_from_postgres = java_index.force_restore_from_postgres
 	index.check = java_index.check
 
 	function index.not_in_transaction() end
--- a/src/luan/modules/lucene/LuceneIndex.java	Sun Sep 20 16:45:31 2020 -0600
+++ b/src/luan/modules/lucene/LuceneIndex.java	Sun Sep 20 20:36:55 2020 -0600
@@ -417,6 +417,10 @@
 		}
 	}
 
+	public void tag(String tag) throws IOException {
+		writer.tag(tag);
+	}
+
 
 
 	public String to_string() {
@@ -737,10 +741,11 @@
 		writeLock.lock();
 		boolean ok = false;
 		try {
-			writer.deleteAll();
+			IndexWriter iw = writer.getLuceneIndexWriter();
+			iw.deleteAll();
 			postgresBackup.restoreLucene(this);
 			ok = true;
-			writer.commit();
+			iw.commit();
 			wrote();
 			ensure_open();  // refresh searcher
 			initId();
@@ -762,6 +767,46 @@
 		writer.addDocument(toLucene(doc));
 	}
 
+	public void restore_from_log()
+		throws IOException, LuanException, SQLException, ParseException
+	{
+		LoggingIndexWriter loggingWriter = (LoggingIndexWriter)writer;
+		if( wasCreated && !loggingWriter.wasCreated ) {
+			logger.error("restoring from log");
+			force_restore_from_log();
+		}
+	}
+
+	public void force_restore_from_log()
+		throws IOException
+	{
+		logger.warn("start force_restore_from_log");
+		if( writeLock.isHeldByCurrentThread() )
+			throw new RuntimeException();
+		writeLock.lock();
+		boolean ok = false;
+		try {
+			LoggingIndexWriter loggingWriter = (LoggingIndexWriter)writer;
+			IndexWriter iw = writer.getLuceneIndexWriter();
+			iw.deleteAll();
+			loggingWriter.playLogs(null);
+			ok = true;
+			iw.commit();
+			wrote();
+			ensure_open();  // refresh searcher
+			initId();
+			wasCreated = false;
+		} finally {
+			if( !ok ) {
+				writer.rollback();
+				reopen();
+			}
+			wrote();
+			writeLock.unlock();
+		}
+		logger.warn("end force_restore_from_log");
+	}
+
 	public void check(Luan luan) throws IOException, SQLException, LuanException, ParseException {
 		boolean hasPostgres = postgresBackup != null;
 		String msg = "start check";