Mercurial Hosting > luan
changeset 1538:634f6765830e
use goodjava/lucene/logging
author | Franklin Schmidt <fschmidt@gmail.com> |
---|---|
date | Fri, 07 Aug 2020 21:42:16 -0600 |
parents | f7649ad6e3e7 |
children | c27dc6af87ca |
files | src/goodjava/lucene/backup/BackupIndexWriter.java src/goodjava/lucene/logging/LoggingIndexWriter.java src/goodjava/lucene/logging/SemaphoreLock.java src/luan/modules/lucene/Lucene.luan src/luan/modules/lucene/LuceneIndex.java |
diffstat | 5 files changed, 101 insertions(+), 27 deletions(-) [+] |
line wrap: on
line diff
--- a/src/goodjava/lucene/backup/BackupIndexWriter.java Fri Aug 07 13:38:25 2020 -0600 +++ b/src/goodjava/lucene/backup/BackupIndexWriter.java Fri Aug 07 21:42:16 2020 -0600 @@ -58,9 +58,11 @@ } } - protected void doCheck(SortField sortField) throws IOException { - super.doCheck(sortField); - runSyncWithChecksum(); + protected boolean doCheck(SortField sortField) throws IOException { + boolean ok = super.doCheck(sortField); + if( ok ) + runSyncWithChecksum(); + return ok; } public void runSync() {
--- a/src/goodjava/lucene/logging/LoggingIndexWriter.java Fri Aug 07 13:38:25 2020 -0600 +++ b/src/goodjava/lucene/logging/LoggingIndexWriter.java Fri Aug 07 21:42:16 2020 -0600 @@ -13,6 +13,7 @@ import java.util.List; import java.util.ArrayList; import java.util.Random; +import java.util.concurrent.TimeUnit; import org.apache.lucene.document.Document; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexReader; @@ -50,7 +51,7 @@ protected final List<LogFile> logs = new ArrayList<LogFile>(); private LogOutputStream log; private final File index; - private boolean isMerging = false; + private final SemaphoreLock mergeLock = new SemaphoreLock(); public LoggingIndexWriter(LuceneIndexWriter indexWriter,File logDir) throws IOException { this.indexWriter = indexWriter; @@ -92,18 +93,30 @@ log.close(); log = logs.get(logs.size()-1).output(); } - +/* public synchronized boolean isMerging() { - return isMerging; + return mergeLock.isLocked(); } - - private synchronized void isNotMerging() { - isMerging = false; +*/ + private void getMergeLock() { + try { + if( !mergeLock.tryLock(1,TimeUnit.MINUTES) ) + throw new RuntimeException("failed to acquire lock"); + } catch(InterruptedException e) { + throw new RuntimeException(e); + } } public synchronized void newLogs() throws IOException { - if( isMerging ) - throw new RuntimeException("merging"); + getMergeLock(); + try { + newLogs2(); + } finally { + mergeLock.unlock(); + } + } + + private void newLogs2() throws IOException { logger.info("building new logs"); logs.clear(); for( int i=0; i<2; i++ ) { @@ -182,6 +195,8 @@ private void mergeLogs() throws IOException { //logger.info("merge"); + if( logs.size() <= 3 ) + return; LogFile first = logs.get(0); LogFile second = logs.get(1); long lastTime = second.file.lastModified(); @@ -210,7 +225,7 @@ } catch(IOException e) { throw new RuntimeException(e); } finally { - isNotMerging(); + mergeLock.unlock(); } } }; @@ -236,18 +251,19 @@ private volatile boolean isChecking = false; - public void check(SortField sortField) throws IOException { + public boolean check(SortField sortField) throws IOException { if( isChecking ) throw new RuntimeException("another check is running"); isChecking = true; try { - doCheck(sortField); + return doCheck(sortField); } finally { isChecking = false; } } - protected void doCheck(SortField sortField) throws IOException { + protected boolean doCheck(SortField sortField) throws IOException { + boolean ok = true; IndexReader indexReader; List<LogInputStream> logReaders; synchronized(this) { @@ -255,24 +271,26 @@ logReaders = logReaders(logs); } try { - logger.info("check start"); + //logger.info("check start"); indexWriter.check(); File dirFile = new File(logDir,"check"); IoUtils.deleteRecursively(dirFile); Directory dir = FSDirectory.open(dirFile); LuceneIndexWriter checkWriter = new LuceneIndexWriter( dir, indexWriter.goodConfig ); playLogs(logReaders,checkWriter); - logger.info("check lucene"); + //logger.info("check lucene"); IndexReader checkReader = checkWriter.openReader(); if( sortField == null ) { int nCheck = checkReader.numDocs(); int nOrig = indexReader.numDocs(); if( nCheck != nOrig ) { logger.error("numDocs mismatch: lucene="+nOrig+" logs="+nCheck); + ok = false; } - logger.info("numDocs="+nOrig); + //logger.info("numDocs="+nOrig); if( hash(indexReader) != hash(checkReader) ) { logger.error("hash mismatch"); + ok = false; } } else { Sort sort = new Sort(sortField); @@ -291,25 +309,30 @@ logger.error(sortFieldName+" "+origFld+" not equal"); logger.error("lucene = "+origFields); logger.error("logs = "+checkFields); + ok = false; } origFields = LuceneUtils.toMap(origIter.next()); checkFields = LuceneUtils.toMap(checkIter.next()); } else if( cmp < 0 ) { logger.error(sortFieldName+" "+origFld+" found in lucene but not logs"); + ok = false; origFields = LuceneUtils.toMap(origIter.next()); } else { // > logger.error(sortFieldName+" "+checkFld+" found in logs but not lucene"); + ok = false; checkFields = LuceneUtils.toMap(checkIter.next()); } } while( origFields!=null ) { Comparable origFld = (Comparable)origFields.get(sortFieldName); logger.error(sortFieldName+" "+origFld+" found in lucene but not logs"); + ok = false; origFields = LuceneUtils.toMap(origIter.next()); } while( checkFields!=null ) { Comparable checkFld = (Comparable)checkFields.get(sortFieldName); logger.error(sortFieldName+" "+checkFld+" found in logs but not lucene"); + ok = false; checkFields = LuceneUtils.toMap(checkIter.next()); } //logger.info("check done"); @@ -317,10 +340,11 @@ checkReader.close(); checkWriter.close(); IoUtils.deleteRecursively(dirFile); - logger.info("check done"); + //logger.info("check done"); } finally { indexReader.close(); } + return ok; } private static abstract class HashCollector extends GoodCollector { @@ -350,7 +374,7 @@ public synchronized void commit() throws IOException { indexWriter.commit(); log.commit(); - if( isMerging ) + if( mergeLock.isLocked() ) return; if( log.logFile.end() > logs.get(0).end() ) { logs.add( newLogFile() ); @@ -358,7 +382,7 @@ setLog(); } if( logs.size() > 3 ) { - isMerging = true; + getMergeLock(); new Thread(mergeLogs).start(); // mergeLogs.run(); }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/goodjava/lucene/logging/SemaphoreLock.java Fri Aug 07 21:42:16 2020 -0600 @@ -0,0 +1,21 @@ +package goodjava.lucene.logging; + +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; + + +public final class SemaphoreLock { + private final Semaphore semaphore = new Semaphore(1); + + public void unlock() { + semaphore.release(); + } + + public boolean isLocked() { + return semaphore.availablePermits() == 0; + } + + public boolean tryLock(long time,TimeUnit unit) throws InterruptedException { + return semaphore.tryAcquire(time,unit); + } +}
--- a/src/luan/modules/lucene/Lucene.luan Fri Aug 07 13:38:25 2020 -0600 +++ b/src/luan/modules/lucene/Lucene.luan Fri Aug 07 21:42:16 2020 -0600 @@ -34,13 +34,19 @@ Lucene.quote = GoodQueryParser.quote +local function get_file(f,name) + type(f)=="table" or error(name.." must be table") + f.to_uri_string and matches(f.to_uri_string(),"^file:") or error(name.." must be file") + return f.java.file or error() +end + function Lucene.index(index_dir,options) - type(index_dir)=="table" or error "index_dir must be table" - index_dir.to_uri_string and matches(index_dir.to_uri_string(),"^file:") or error "must be file" - options = options or {} local index = {} index.dir = index_dir - local java_index = LuceneIndex.getLuceneIndex(index_dir.java.file,options) + index_dir = get_file(index_dir) + options = options or {} + options.log_dir = options.log_dir and get_file(options.log_dir) + local java_index = LuceneIndex.getLuceneIndex(index_dir,options) index.java = java_index index.indexed_fields = {} @@ -69,6 +75,8 @@ index.count_tokens = java_index.count_tokens --index.close = java_index.close + index.rebuild_log = java_index.rebuild_log + index.has_postgres_backup = java_index.hasPostgresBackup() index.rebuild_postgres_backup = java_index.rebuild_postgres_backup index.restore_from_postgres = java_index.restore_from_postgres
--- a/src/luan/modules/lucene/LuceneIndex.java Fri Aug 07 13:38:25 2020 -0600 +++ b/src/luan/modules/lucene/LuceneIndex.java Fri Aug 07 21:42:16 2020 -0600 @@ -78,6 +78,7 @@ import goodjava.lucene.api.LuceneIndexWriter; import goodjava.lucene.api.GoodIndexWriterConfig; import goodjava.lucene.api.LuceneUtils; +import goodjava.lucene.logging.LoggingIndexWriter; import goodjava.parser.ParseException; import luan.modules.Utils; import luan.Luan; @@ -121,6 +122,7 @@ public static final StringFieldParser STRING_FIELD_PARSER = new StringFieldParser(new KeywordAnalyzer()); public static final StringFieldParser LOWERCASE_FIELD_PARSER = new StringFieldParser(new LowercaseAnalyzer(luceneVersion)); public static final StringFieldParser ENGLISH_FIELD_PARSER = new StringFieldParser(new EnglishAnalyzer(luceneVersion)); + private static final SortField ID_SORT = new SortField("id",SortField.Type.LONG); private final Object version; @@ -140,6 +142,7 @@ private final PostgresBackup postgresBackup; private boolean wasCreated; + private final File logDir; private LuceneIndex(Luan luan,File indexDir,LuanTable options) throws LuanException, IOException, ClassNotFoundException, SQLException @@ -151,6 +154,7 @@ String[] defaultFields = defaultFieldsTbl==null ? null : (String[])defaultFieldsTbl.asList().toArray(new String[0]); LuanTable postgresSpec = Utils.removeTable(options,"postgres_spec"); LuanFunction supplementer = Utils.removeFunction(options,"supplementer"); + logDir = (File)options.remove("log_dir"); Utils.checkEmpty(options); mfp = defaultFieldParser==null ? new MultiFieldParser() : new MultiFieldParser(defaultFieldParser,defaultFields); @@ -185,6 +189,8 @@ fsDir = FSDirectory.open(indexDir); boolean wasCreated = !fsDir.getDirectory().exists(); writer = new LuceneIndexWriter(fsDir,config); + if( logDir != null ) + writer = new LoggingIndexWriter((LuceneIndexWriter)writer,logDir); reader = DirectoryReader.open(fsDir); searcher = new IndexSearcher(reader); initId(); @@ -781,15 +787,28 @@ CheckIndex.Status status = new CheckIndex(fsDir).checkIndex(); if( !status.clean ) logger.error("index not clean"); - if( hasPostgres ) + if( writer instanceof LoggingIndexWriter ) { + LoggingIndexWriter loggingWriter = (LoggingIndexWriter)writer; + logger.info("log check"); + boolean ok = loggingWriter.check(ID_SORT); + } + if( hasPostgres ) { + logger.info("postgres check"); checkPostgres(luan); + } logger.info("end check"); } + public void rebuild_log() throws IOException { + logger.info("start rebuild_log"); + LoggingIndexWriter loggingWriter = (LoggingIndexWriter)writer; + loggingWriter.newLogs(); + logger.info("end rebuild_log"); + } + private void checkPostgres(Luan luan) throws IOException, SQLException, LuanException, ParseException { - //logger.info("start postgres check"); final PostgresBackup.Checker postgresChecker = postgresBackup.newChecker(); final IndexSearcher searcher = openSearcher(); try {