Mercurial Hosting > luan
changeset 1544:35601f15ecc3
add lucene log tag and restore_from_log
author | Franklin Schmidt <fschmidt@gmail.com> |
---|---|
date | Sun, 20 Sep 2020 20:36:55 -0600 |
parents | 1db694d98003 |
children | f46b81048a80 |
files | src/goodjava/lucene/api/GoodIndexWriter.java src/goodjava/lucene/api/LuceneIndexWriter.java src/goodjava/lucene/logging/LoggingIndexWriter.java src/luan/modules/lucene/Lucene.luan src/luan/modules/lucene/LuceneIndex.java |
diffstat | 5 files changed, 109 insertions(+), 27 deletions(-) [+] |
line wrap: on
line diff
--- a/src/goodjava/lucene/api/GoodIndexWriter.java Sun Sep 20 16:45:31 2020 -0600 +++ b/src/goodjava/lucene/api/GoodIndexWriter.java Sun Sep 20 20:36:55 2020 -0600 @@ -15,5 +15,6 @@ public void addDocument(Map<String,Object> storedFields) throws IOException; public void updateDocument(String keyFieldName,Map<String,Object> storedFields) throws IOException; public void reindexDocuments(String keyFieldName,Query query) throws IOException; + public void tag(String tag) throws IOException; public IndexWriter getLuceneIndexWriter(); }
--- a/src/goodjava/lucene/api/LuceneIndexWriter.java Sun Sep 20 16:45:31 2020 -0600 +++ b/src/goodjava/lucene/api/LuceneIndexWriter.java Sun Sep 20 20:36:55 2020 -0600 @@ -203,4 +203,7 @@ if( !status.clean ) logger.error("index not clean"); } + + public void tag(String tag) throws IOException {} + }
--- a/src/goodjava/lucene/logging/LoggingIndexWriter.java Sun Sep 20 16:45:31 2020 -0600 +++ b/src/goodjava/lucene/logging/LoggingIndexWriter.java Sun Sep 20 20:36:55 2020 -0600 @@ -44,9 +44,11 @@ private static final int OP_DELETE_DOCUMENTS = 2; private static final int OP_ADD_DOCUMENT = 3; private static final int OP_UPDATE_DOCUMENT = 4; + private static final int OP_TAG = 5; private static final Random rnd = new Random(); public final LuceneIndexWriter indexWriter; + public boolean wasCreated; private final File logDir; protected final List<LogFile> logs = new ArrayList<LogFile>(); private LogOutputStream log; @@ -71,6 +73,7 @@ } deleteUnusedFiles(); setLog(); + wasCreated = false; return; } } finally { @@ -78,6 +81,7 @@ } } newLogs(); + wasCreated = true; } public IndexWriter getLuceneIndexWriter() { @@ -201,8 +205,8 @@ throw new RuntimeException(); Directory dir = FSDirectory.open(dirFile); LuceneIndexWriter mergeWriter = new LuceneIndexWriter( dir, indexWriter.goodConfig ); - playLog( first.input(), mergeWriter ); - playLog( second.input(), mergeWriter ); + playLog( first.input(), mergeWriter, null ); + playLog( second.input(), mergeWriter, null ); mergeWriter.commit(); LogFile merge = newLogFile(); logLucene( lastTime, merge, mergeWriter ); @@ -273,18 +277,17 @@ IoUtils.deleteRecursively(dirFile); Directory dir = FSDirectory.open(dirFile); LuceneIndexWriter checkWriter = new LuceneIndexWriter( dir, indexWriter.goodConfig ); - playLogs(logReaders,checkWriter); + playLogs(logReaders,checkWriter,null); //logger.info("check lucene"); IndexReader checkReader = checkWriter.openReader(); + int nCheck = checkReader.numDocs(); + int nOrig = indexReader.numDocs(); + if( nCheck != nOrig ) { + logger.error("numDocs mismatch: lucene="+nOrig+" logs="+nCheck); + ok = false; + } if( sortField == null ) { - int nCheck = checkReader.numDocs(); - int nOrig = indexReader.numDocs(); - if( nCheck != nOrig ) { - logger.error("numDocs mismatch: lucene="+nOrig+" logs="+nCheck); - ok = false; - } - //logger.info("numDocs="+nOrig); - if( hash(indexReader) != hash(checkReader) ) { + if( ok && hash(indexReader) != hash(checkReader) ) { logger.error("hash mismatch"); ok = false; } @@ -413,6 +416,11 @@ log.writeMap(storedFields); } + public synchronized void tag(String tag) throws IOException { + writeOp(OP_TAG); + log.writeUTF(tag); + } + public synchronized void reindexDocuments(String keyFieldName,Query query) throws IOException { indexWriter.reindexDocuments(keyFieldName,query); } @@ -422,8 +430,9 @@ log.writeByte(op); } - public synchronized void playLogs() throws IOException { - playLogs( logReaders(logs), indexWriter ); + // return whether stopped at tag + public synchronized boolean playLogs(String upToTag) throws IOException { + return playLogs( logReaders(logs), indexWriter, upToTag ); } private static List<LogInputStream> logReaders(List<LogFile> logs) throws IOException { @@ -434,13 +443,20 @@ return logReaders; } - private static void playLogs(List<LogInputStream> logReaders,LuceneIndexWriter indexWriter) throws IOException { + private static boolean playLogs(List<LogInputStream> logReaders,LuceneIndexWriter indexWriter,String upToTag) + throws IOException + { if( numDocs(indexWriter) != 0 ) throw new RuntimeException ("not empty"); + boolean rtn = false; for( LogInputStream reader : logReaders ) { - playLog(reader,indexWriter); + if( playLog(reader,indexWriter,upToTag) ) { + rtn = true; + break; + } } indexWriter.commit(); + return rtn; } private static int numDocs(LuceneIndexWriter indexWriter) throws IOException { @@ -450,35 +466,51 @@ return n; } - private static void playLog(LogInputStream in,LuceneIndexWriter indexWriter) throws IOException { + private static boolean playLog(LogInputStream in,LuceneIndexWriter indexWriter,String upToTag) + throws IOException + { + boolean rtn = false; while( in.available() > 0 ) { - playOp(in,indexWriter); + if( playOp(in,indexWriter,upToTag) ) { + rtn = true; + break; + } } in.close(); + return rtn; } - private static void playOp(LogInputStream in,LuceneIndexWriter indexWriter) throws IOException { + private static boolean playOp(LogInputStream in,LuceneIndexWriter indexWriter,String upToTag) throws IOException { in.readLong(); // time int op = in.readByte(); switch(op) { case OP_DELETE_ALL: indexWriter.deleteAll(); - return; + return false; case OP_DELETE_DOCUMENTS: - indexWriter.deleteDocuments( in.readQuery() ); - return; + { + Query query = in.readQuery(); + //System.out.println("OP_DELETE_DOCUMENTS "+query); + indexWriter.deleteDocuments(query); + return false; + } case OP_ADD_DOCUMENT: { Map storedFields = in.readMap(); indexWriter.addDocument(storedFields); - return; + return false; } case OP_UPDATE_DOCUMENT: { String keyFieldName = in.readUTF(); Map storedFields = in.readMap(); indexWriter.updateDocument(keyFieldName,storedFields); - return; + return false; + } + case OP_TAG: + { + String tag = in.readUTF(); + return tag.equals(upToTag); } default: throw new RuntimeException("invalid op "+op);
--- a/src/luan/modules/lucene/Lucene.luan Sun Sep 20 16:45:31 2020 -0600 +++ b/src/luan/modules/lucene/Lucene.luan Sun Sep 20 20:36:55 2020 -0600 @@ -72,14 +72,15 @@ index.ensure_open = java_index.ensure_open index.highlighter = java_index.highlighter index.count_tokens = java_index.count_tokens - --index.close = java_index.close + index.tag = java_index.tag index.rebuild_log = java_index.rebuild_log + index.restore_from_log = java_index.restore_from_log index.has_postgres_backup = java_index.hasPostgresBackup() index.rebuild_postgres_backup = java_index.rebuild_postgres_backup index.restore_from_postgres = java_index.restore_from_postgres - index.force_restore_from_postgres = java_index.force_restore_from_postgres + --index.force_restore_from_postgres = java_index.force_restore_from_postgres index.check = java_index.check function index.not_in_transaction() end
--- a/src/luan/modules/lucene/LuceneIndex.java Sun Sep 20 16:45:31 2020 -0600 +++ b/src/luan/modules/lucene/LuceneIndex.java Sun Sep 20 20:36:55 2020 -0600 @@ -417,6 +417,10 @@ } } + public void tag(String tag) throws IOException { + writer.tag(tag); + } + public String to_string() { @@ -737,10 +741,11 @@ writeLock.lock(); boolean ok = false; try { - writer.deleteAll(); + IndexWriter iw = writer.getLuceneIndexWriter(); + iw.deleteAll(); postgresBackup.restoreLucene(this); ok = true; - writer.commit(); + iw.commit(); wrote(); ensure_open(); // refresh searcher initId(); @@ -762,6 +767,46 @@ writer.addDocument(toLucene(doc)); } + public void restore_from_log() + throws IOException, LuanException, SQLException, ParseException + { + LoggingIndexWriter loggingWriter = (LoggingIndexWriter)writer; + if( wasCreated && !loggingWriter.wasCreated ) { + logger.error("restoring from log"); + force_restore_from_log(); + } + } + + public void force_restore_from_log() + throws IOException + { + logger.warn("start force_restore_from_log"); + if( writeLock.isHeldByCurrentThread() ) + throw new RuntimeException(); + writeLock.lock(); + boolean ok = false; + try { + LoggingIndexWriter loggingWriter = (LoggingIndexWriter)writer; + IndexWriter iw = writer.getLuceneIndexWriter(); + iw.deleteAll(); + loggingWriter.playLogs(null); + ok = true; + iw.commit(); + wrote(); + ensure_open(); // refresh searcher + initId(); + wasCreated = false; + } finally { + if( !ok ) { + writer.rollback(); + reopen(); + } + wrote(); + writeLock.unlock(); + } + logger.warn("end force_restore_from_log"); + } + public void check(Luan luan) throws IOException, SQLException, LuanException, ParseException { boolean hasPostgres = postgresBackup != null; String msg = "start check";