Mercurial Hosting > luan
view src/luan/modules/lucene/LuceneIndex.java @ 1582:f28cc30d56cb
start goodjava/mail
author | Franklin Schmidt <fschmidt@gmail.com> |
---|---|
date | Sat, 06 Mar 2021 21:13:34 -0700 |
parents | 8fbcc4747091 |
children | 582384548a69 |
line wrap: on
line source
package luan.modules.lucene; import java.io.Closeable; import java.io.File; import java.io.FileOutputStream; import java.io.FileInputStream; import java.io.IOException; import java.lang.ref.Reference; import java.lang.ref.WeakReference; import java.sql.SQLException; import java.util.Arrays; import java.util.Iterator; import java.util.Map; import java.util.HashMap; import java.util.List; import java.util.ArrayList; import java.util.Set; import java.util.HashSet; import java.util.Collections; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.zip.ZipOutputStream; import java.util.zip.ZipEntry; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.TokenStream; import org.apache.lucene.analysis.core.KeywordAnalyzer; import org.apache.lucene.analysis.en.EnglishAnalyzer; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; import org.apache.lucene.document.StoredField; import org.apache.lucene.document.StringField; import org.apache.lucene.document.TextField; import org.apache.lucene.document.IntField; import org.apache.lucene.document.LongField; import org.apache.lucene.document.DoubleField; import org.apache.lucene.index.IndexableField; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.Term; import org.apache.lucene.index.SnapshotDeletionPolicy; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.AtomicReaderContext; import org.apache.lucene.index.CheckIndex; import org.apache.lucene.store.Directory; import org.apache.lucene.store.FSDirectory; import org.apache.lucene.util.Version; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.NumericUtils; import org.apache.lucene.search.Query; import org.apache.lucene.search.PrefixQuery; import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.Sort; import org.apache.lucene.search.SortField; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.TotalHitCountCollector; import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.Collector; import org.apache.lucene.search.Scorer; import org.apache.lucene.search.Explanation; import org.apache.lucene.search.highlight.Formatter; import org.apache.lucene.search.highlight.Highlighter; import org.apache.lucene.search.highlight.InvalidTokenOffsetsException; import org.apache.lucene.search.highlight.Fragmenter; import org.apache.lucene.search.highlight.NullFragmenter; import org.apache.lucene.search.highlight.SimpleSpanFragmenter; import org.apache.lucene.search.highlight.QueryScorer; import org.apache.lucene.search.highlight.TokenGroup; import goodjava.lucene.analysis.LowercaseAnalyzer; import goodjava.lucene.queryparser.GoodQueryParser; import goodjava.lucene.queryparser.FieldParser; import goodjava.lucene.queryparser.MultiFieldParser; import goodjava.lucene.queryparser.StringFieldParser; import goodjava.lucene.queryparser.NumberFieldParser; import goodjava.lucene.api.GoodIndexWriter; import goodjava.lucene.api.LuceneIndexWriter; import goodjava.lucene.api.GoodIndexWriterConfig; import goodjava.lucene.api.LuceneUtils; import goodjava.lucene.logging.LoggingIndexWriter; import goodjava.lucene.logging.OpDoer; import goodjava.parser.ParseException; import luan.modules.Utils; import luan.Luan; import luan.LuanTable; import luan.LuanFunction; import luan.LuanException; import luan.LuanRuntimeException; import luan.modules.parsers.LuanToString; import goodjava.logging.Logger; import goodjava.logging.LoggerFactory; public final class LuceneIndex { private static final Logger logger = LoggerFactory.getLogger(LuceneIndex.class); private static Map<String,Reference<LuceneIndex>> indexes = new HashMap<String,Reference<LuceneIndex>>(); public static LuceneIndex getLuceneIndex(Luan luan,File indexDir,LuanTable options) throws LuanException, IOException, ClassNotFoundException, SQLException { String key = indexDir.getCanonicalPath(); synchronized(indexes) { Reference<LuceneIndex> ref = indexes.get(key); if( ref != null ) { LuceneIndex li = ref.get(); if( li != null ) { Object version = options.get(luan,"version"); if( version==null || version.equals(li.version) ) return li; li.closeWriter(); } } LuceneIndex li = new LuceneIndex(luan,indexDir,options); indexes.put(key, new WeakReference<LuceneIndex>(li)); return li; } } private static final Version luceneVersion = Version.LUCENE_4_9; public static final StringFieldParser STRING_FIELD_PARSER = new StringFieldParser(new KeywordAnalyzer()); public static final StringFieldParser LOWERCASE_FIELD_PARSER = new StringFieldParser(new LowercaseAnalyzer(luceneVersion)); public static final StringFieldParser ENGLISH_FIELD_PARSER = new StringFieldParser(new EnglishAnalyzer(luceneVersion)); private static final SortField ID_SORT = new SortField("id",SortField.Type.LONG); private static final SortField ID_DESC_SORT = new SortField("id",SortField.Type.LONG,true); private final Object version; private final ReentrantLock writeLock = new ReentrantLock(); private final File indexDir; private GoodIndexWriter writer; private DirectoryReader reader; private IndexSearcher searcher; private final ThreadLocal<IndexSearcher> threadLocalSearcher = new ThreadLocal<IndexSearcher>(); private final MultiFieldParser mfp; private final Analyzer analyzer; // ??? private FSDirectory fsDir; private int writeCount; private AtomicInteger writeCounter = new AtomicInteger(); private final GoodIndexWriterConfig config; private final PostgresBackup postgresBackup; private boolean wasCreated; private final File logDir; private final long logTime; private LuceneIndex(Luan luan,File indexDir,LuanTable options) throws LuanException, IOException, ClassNotFoundException, SQLException { options = new LuanTable(options); this.version = options.remove("version"); FieldParser defaultFieldParser = (FieldParser)options.remove("default_type"); LuanTable defaultFieldsTbl = Utils.removeTable(options,"default_fields"); String[] defaultFields = defaultFieldsTbl==null ? null : (String[])defaultFieldsTbl.asList().toArray(new String[0]); LuanTable postgresSpec = Utils.removeTable(options,"postgres_spec"); LuanFunction supplementer = Utils.removeFunction(options,"supplementer"); logDir = (File)options.remove("log_dir"); logTime = (Long)options.remove("log_time"); Utils.checkEmpty(options); mfp = defaultFieldParser==null ? new MultiFieldParser() : new MultiFieldParser(defaultFieldParser,defaultFields); mfp.fields.put( "type", STRING_FIELD_PARSER ); mfp.fields.put( "id", NumberFieldParser.LONG ); this.indexDir = indexDir; Analyzer analyzer = STRING_FIELD_PARSER.analyzer; if( defaultFieldParser instanceof StringFieldParser ) { StringFieldParser sfp = (StringFieldParser)defaultFieldParser; analyzer = sfp.analyzer; } this.analyzer = analyzer; this.config = new SupplementingConfig(luceneVersion,mfp,luan,supplementer); wasCreated = reopen(); if( postgresSpec == null ) { postgresBackup = null; } else { postgresBackup = new PostgresBackup(postgresSpec); if( !wasCreated && postgresBackup.wasCreated ) { logger.error("rebuilding postgres backup"); rebuild_postgres_backup(luan); /* } else if( wasCreated && !postgresBackup.wasCreated ) { logger.error("restoring from postgres"); restore_from_postgres(); */ } } } public boolean reopen() throws IOException { fsDir = FSDirectory.open(indexDir); boolean wasCreated = !fsDir.getDirectory().exists(); writer = new LuceneIndexWriter(fsDir,config); if( logDir != null ) writer = new LoggingIndexWriter((LuceneIndexWriter)writer,logDir,logTime); reader = DirectoryReader.open(fsDir); searcher = new IndexSearcher(reader); initId(); return wasCreated; } private void wrote() { writeCounter.incrementAndGet(); } public void delete_all() throws IOException, SQLException { boolean commit = !writeLock.isHeldByCurrentThread(); writeLock.lock(); try { writer.deleteAll(); id = 0; if( postgresBackup != null ) postgresBackup.deleteAll(); if(commit) writer.commit(); } finally { wrote(); writeLock.unlock(); } } private static Term term(String key,long value) { BytesRef br = new BytesRef(); NumericUtils.longToPrefixCoded(value,0,br); return new Term(key,br); } private static final Set<String> ID_SET = Collections.singleton("id"); private void backupDelete(Query query) throws IOException, SQLException, LuanException { if( postgresBackup != null ) { final List<Long> ids = new ArrayList<Long>(); IndexSearcher searcher = openSearcher(); try { MyCollector col = new MyCollector() { @Override public void collect(int iDoc) throws IOException { Document doc = searcher.doc( docBase + iDoc, ID_SET ); Long id = (Long)doc.getField("id").numericValue(); ids.add(id); } }; searcher.search(query,col); } finally { close(searcher); } postgresBackup.begin(); for( Long id : ids ) { postgresBackup.delete(id); } postgresBackup.commit(); } } public void delete(String queryStr) throws IOException, ParseException, SQLException, LuanException { Query query = GoodQueryParser.parseQuery(mfp,queryStr); boolean commit = !writeLock.isHeldByCurrentThread(); writeLock.lock(); try { backupDelete(query); writer.deleteDocuments(query); if(commit) writer.commit(); } finally { wrote(); writeLock.unlock(); } } public void reindex(String queryStr) throws IOException, ParseException { Query query = GoodQueryParser.parseQuery(mfp,queryStr); boolean commit = !writeLock.isHeldByCurrentThread(); writeLock.lock(); try { writer.reindexDocuments("id",query); if(commit) writer.commit(); } finally { wrote(); writeLock.unlock(); } } public void save(Luan luan,LuanTable doc) throws LuanException, IOException, SQLException { Object obj = doc.get(luan,"id"); Long id; try { id = (Long)obj; } catch(ClassCastException e) { throw new LuanException("id should be Long but is "+obj.getClass().getSimpleName()); } boolean commit = !writeLock.isHeldByCurrentThread(); writeLock.lock(); try { if( id == null ) { id = ++this.id; doc.put(luan,"id",id); if( postgresBackup != null ) postgresBackup.add(luan,doc); writer.addDocument(toLucene(doc)); } else { if( postgresBackup != null ) postgresBackup.update(luan,doc); writer.updateDocument( "id", toLucene(doc) ); } if(commit) writer.commit(); } finally { wrote(); writeLock.unlock(); } } public boolean is_in_transaction() { return writeLock.isHeldByCurrentThread(); } public Object run_in_transaction(Luan luan,LuanFunction fn) throws IOException, LuanException, SQLException { boolean commit = !writeLock.isHeldByCurrentThread(); writeLock.lock(); boolean ok = false; try { if( commit && postgresBackup != null ) postgresBackup.begin(); Object rtn = fn.call(luan); ok = true; if(commit) { if( postgresBackup != null ) postgresBackup.commit(); writer.commit(); } return rtn; } finally { if( !ok && commit ) { if( postgresBackup != null ) postgresBackup.rollback(); writer.rollback(); reopen(); } wrote(); writeLock.unlock(); } } // ??? public Object run_in_lock(Luan luan,LuanFunction fn) throws IOException, LuanException { if( writeLock.isHeldByCurrentThread() ) throw new RuntimeException(); writeLock.lock(); try { synchronized(this) { return fn.call(luan); } } finally { wrote(); writeLock.unlock(); } } private long id; private void initId() throws IOException { TopDocs td = searcher.search(new MatchAllDocsQuery(),1,new Sort(ID_DESC_SORT)); switch(td.scoreDocs.length) { case 0: id = 0; break; case 1: id = (Long)searcher.doc(td.scoreDocs[0].doc).getField("id").numericValue(); break; default: throw new RuntimeException(); } } /* public void backup(String zipFile) throws LuanException, IOException { if( !zipFile.endsWith(".zip") ) throw new LuanException("file "+zipFile+" doesn't end with '.zip'"); IndexCommit ic = snapshotDeletionPolicy.snapshot(); try { ZipOutputStream out = new ZipOutputStream(new FileOutputStream(zipFile)); for( String fileName : ic.getFileNames() ) { out.putNextEntry(new ZipEntry(fileName)); FileInputStream in = new FileInputStream(new File(indexDir,fileName)); Utils.copyAll(in,out); in.close(); out.closeEntry(); } out.close(); } finally { snapshotDeletionPolicy.release(ic); } } */ public SnapshotDeletionPolicy snapshotDeletionPolicy() { return (SnapshotDeletionPolicy)writer.getLuceneIndexWriter().getConfig().getIndexDeletionPolicy(); } public Object snapshot(Luan luan,LuanFunction fn) throws LuanException, IOException { SnapshotDeletionPolicy snapshotDeletionPolicy = snapshotDeletionPolicy(); IndexCommit ic = snapshotDeletionPolicy.snapshot(); try { String dir = fsDir.getDirectory().toString(); LuanTable fileNames = new LuanTable(new ArrayList(ic.getFileNames())); return fn.call(luan,dir,fileNames); } finally { snapshotDeletionPolicy.release(ic); } } public void tag(String tag) throws IOException { boolean commit = !writeLock.isHeldByCurrentThread(); writeLock.lock(); try { writer.tag(tag); if(commit) writer.commit(); } finally { writeLock.unlock(); } } public String to_string() { return writer.getLuceneIndexWriter().getDirectory().toString(); } protected void finalize() throws Throwable { close(); super.finalize(); } public void close() throws IOException, SQLException { closeWriter(); reader.close(); } private void closeWriter() throws IOException, SQLException { writeLock.lock(); try { writer.close(); if( postgresBackup != null ) postgresBackup.close(); } finally { writeLock.unlock(); } } private static class DocFn extends LuanFunction { final IndexSearcher searcher; final Query query; int docID; DocFn(IndexSearcher searcher,Query query) { this.searcher = searcher; this.query = query; } @Override public Object call(Luan luan,Object[] args) throws LuanException { try { LuanTable doc = toTable(searcher.doc(docID)); if( args.length > 0 && "explain".equals(args[0]) ) { Explanation explanation = searcher.explain(query,docID); return new Object[]{doc,explanation}; } else { return doc; } } catch(IOException e) { throw new LuanException(e); } } } private static abstract class MyCollector extends Collector { int docBase; int i = 0; @Override public void setScorer(Scorer scorer) {} @Override public void setNextReader(AtomicReaderContext context) { this.docBase = context.docBase; } @Override public boolean acceptsDocsOutOfOrder() { return true; } } private synchronized IndexSearcher openSearcher() throws IOException { int gwc = writeCounter.get(); if( writeCount != gwc ) { writeCount = gwc; DirectoryReader newReader = DirectoryReader.openIfChanged(reader); // DirectoryReader newReader = DirectoryReader.openIfChanged(reader,writer.getLuceneIndexWriter(),true); if( newReader != null ) { reader.decRef(); reader = newReader; searcher = new IndexSearcher(reader); } } reader.incRef(); return searcher; } // call in finally block private static void close(IndexSearcher searcher) throws IOException { searcher.getIndexReader().decRef(); } public void ensure_open() throws IOException { close(openSearcher()); } public int advanced_search( final Luan luan, String queryStr, LuanFunction fn, Integer n, String sortStr ) throws LuanException, IOException, ParseException { Utils.checkNotNull(queryStr); Query query = GoodQueryParser.parseQuery(mfp,queryStr); IndexSearcher searcher = threadLocalSearcher.get(); boolean inTransaction = searcher != null; if( !inTransaction ) searcher = openSearcher(); try { if( fn!=null && n==null ) { if( sortStr != null ) throw new LuanException("sort must be nil when n is nil"); final DocFn docFn = new DocFn(searcher,query); MyCollector col = new MyCollector() { @Override public void collect(int doc) { try { docFn.docID = docBase + doc; fn.call(luan,++i,docFn); } catch(LuanException e) { throw new LuanRuntimeException(e); } } }; try { searcher.search(query,col); } catch(LuanRuntimeException e) { throw (LuanException)e.getCause(); } return col.i; } if( fn==null || n==0 ) { TotalHitCountCollector thcc = new TotalHitCountCollector(); searcher.search(query,thcc); return thcc.getTotalHits(); } Sort sort = sortStr==null ? null : GoodQueryParser.parseSort(mfp,sortStr); TopDocs td = sort==null ? searcher.search(query,n) : searcher.search(query,n,sort); final ScoreDoc[] scoreDocs = td.scoreDocs; DocFn docFn = new DocFn(searcher,query); for( int i=0; i<scoreDocs.length; i++ ) { ScoreDoc scoreDoc = scoreDocs[i]; docFn.docID = scoreDoc.doc; fn.call(luan,i+1,docFn,scoreDoc.score); } return td.totalHits; } finally { if( !inTransaction ) close(searcher); } } public Object search_in_transaction(Luan luan,LuanFunction fn) throws LuanException, IOException { if( threadLocalSearcher.get() != null ) throw new LuanException("can't nest search_in_transaction calls"); IndexSearcher searcher = openSearcher(); threadLocalSearcher.set(searcher); try { return fn.call(luan); } finally { threadLocalSearcher.set(null); close(searcher); } } public FieldParser getIndexedFieldParser(String field) { return mfp.fields.get(field); } public void setIndexedFieldParser(String field,FieldParser fp) { if( fp==null ) { // delete mfp.fields.remove(field); return; } mfp.fields.put( field, fp ); } static Map<String,Object> toLucene(LuanTable table) throws LuanException { return SupplementingConfig.toLucene(table); } private static LuanTable toTable(Document doc) throws LuanException { return doc==null ? null : SupplementingConfig.toTable(LuceneUtils.toMap(doc)); } private static final Formatter nullFormatter = new Formatter() { public String highlightTerm(String originalText,TokenGroup tokenGroup) { return originalText; } }; public LuanFunction highlighter(final Luan luan,String queryStr,final LuanFunction formatter,final Integer fragmentSize,String dotdotdot) throws ParseException { Query query = GoodQueryParser.parseQuery(mfp,queryStr); Formatter fmt = new Formatter() { public String highlightTerm(String originalText,TokenGroup tokenGroup) { if( tokenGroup.getTotalScore() <= 0 ) return originalText; try { return (String)Luan.first(formatter.call(luan,originalText)); } catch(LuanException e) { throw new LuanRuntimeException(e); } } }; QueryScorer queryScorer = new QueryScorer(query); final Highlighter chooser = fragmentSize==null ? null : new Highlighter(nullFormatter,queryScorer); if( chooser != null ) chooser.setTextFragmenter( new SimpleSpanFragmenter(queryScorer,fragmentSize) ); final Highlighter hl = new Highlighter(fmt,queryScorer); hl.setTextFragmenter( new NullFragmenter() ); return new LuanFunction() { @Override public String call(Luan luan,Object[] args) throws LuanException { String text = (String)args[0]; try { if( chooser != null ) { String s = chooser.getBestFragment(analyzer,null,text); if( s != null ) { if( dotdotdot != null ) { boolean atStart = text.startsWith(s); boolean atEnd = text.endsWith(s); if( !atStart ) s = dotdotdot + s; if( !atEnd ) s = s + dotdotdot; } text = s; } else if( text.length() > fragmentSize ) { text = text.substring(0,fragmentSize); if( dotdotdot != null ) text += "..."; } } String s = hl.getBestFragment(analyzer,null,text); return s!=null ? s : text; } catch(LuanRuntimeException e) { throw (LuanException)e.getCause(); } catch(IOException e) { throw new RuntimeException(e); } catch(InvalidTokenOffsetsException e) { throw new RuntimeException(e); } } }; } public int count_tokens(String text) throws IOException { int n = 0; TokenStream ts = analyzer.tokenStream(null,text); ts.reset(); while( ts.incrementToken() ) { n++; } ts.close(); return n; } public boolean hasPostgresBackup() { return postgresBackup != null; } public void rebuild_postgres_backup(Luan luan) throws IOException, LuanException, SQLException { logger.info("start rebuild_postgres_backup"); writeLock.lock(); IndexSearcher searcher = openSearcher(); boolean ok = false; try { postgresBackup.begin(); postgresBackup.deleteAll(); Query query = new PrefixQuery(new Term("id")); MyCollector col = new MyCollector() { @Override public void collect(int iDoc) throws IOException { try { Document doc = searcher.doc( docBase + iDoc ); LuanTable tbl = toTable(doc); postgresBackup.add(luan,tbl); } catch(LuanException e) { throw new LuanRuntimeException(e); } catch(SQLException e) { throw new RuntimeException(e); } } }; try { searcher.search(query,col); } catch(LuanRuntimeException e) { throw (LuanException)e.getCause(); } ok = true; postgresBackup.commit(); } finally { close(searcher); if( !ok ) postgresBackup.rollback(); writeLock.unlock(); } logger.info("end rebuild_postgres_backup"); } public void restore_from_postgres() throws IOException, LuanException, SQLException, ParseException { if( postgresBackup!=null && wasCreated && !postgresBackup.wasCreated ) { logger.error("restoring from postgres"); force_restore_from_postgres(); } } public void force_restore_from_postgres() throws IOException, LuanException, SQLException, ParseException { logger.warn("start restore_from_postgres"); if( postgresBackup==null ) throw new NullPointerException(); if( writeLock.isHeldByCurrentThread() ) throw new RuntimeException(); writeLock.lock(); boolean ok = false; try { writer.tag("restore_from_postgres"); writer.deleteAll(); postgresBackup.restoreLucene(this); ok = true; writer.commit(); wrote(); ensure_open(); // refresh searcher initId(); wasCreated = false; } finally { if( !ok ) { writer.rollback(); reopen(); } wrote(); writeLock.unlock(); } logger.warn("end restore_from_postgres"); } void restore(LuanTable doc) throws LuanException, IOException { writer.addDocument(toLucene(doc)); } public void relog() throws IOException, LuanException { logger.info("start relog"); writeLock.lock(); try { LoggingIndexWriter loggingWriter = (LoggingIndexWriter)writer; loggingWriter.logLucene(); } finally { writeLock.unlock(); } logger.info("end relog"); } public void restore_from_log(Luan luan,LuanFunction handler) throws IOException, LuanException, SQLException, ParseException { LoggingIndexWriter loggingWriter = (LoggingIndexWriter)writer; if( wasCreated && !loggingWriter.wasCreated ) { logger.error("restoring from log"); force_restore_from_log(luan,handler); } } public void force_restore_from_log(Luan luan,LuanFunction handler) throws IOException { logger.warn("start force_restore_from_log"); if( writeLock.isHeldByCurrentThread() ) throw new RuntimeException(); OpDoer opDoer = handler==null ? null : new LuanOpDoer(writer,luan,handler); writeLock.lock(); boolean ok = false; try { LoggingIndexWriter loggingWriter = (LoggingIndexWriter)writer; loggingWriter.playLogs(opDoer); ok = true; wrote(); ensure_open(); // refresh searcher initId(); wasCreated = false; } finally { if( !ok ) { writer.rollback(); reopen(); } wrote(); writeLock.unlock(); } logger.warn("end force_restore_from_log"); } public void check() throws IOException, SQLException, LuanException, ParseException { boolean hasPostgres = postgresBackup != null; String msg = "start check"; if( hasPostgres ) msg += " with postgres"; logger.info(msg); CheckIndex.Status status = new CheckIndex(fsDir).checkIndex(); if( !status.clean ) logger.error("index not clean"); if( writer instanceof LoggingIndexWriter ) { LoggingIndexWriter loggingWriter = (LoggingIndexWriter)writer; logger.info("log check"); boolean ok = loggingWriter.check(ID_SORT); } if( hasPostgres ) { logger.info("postgres check"); checkPostgres(); } logger.info("end check"); } private void checkPostgres() throws IOException, SQLException, LuanException, ParseException { final PostgresBackup.Checker postgresChecker = postgresBackup.newChecker(); final IndexSearcher searcher = openSearcher(); try { final List<Long> idsLucene = new ArrayList<Long>(); Query query = new PrefixQuery(new Term("id")); MyCollector col = new MyCollector() { @Override public void collect(int iDoc) throws IOException { Document doc = searcher.doc( docBase + iDoc ); Long id = (Long)doc.getField("id").numericValue(); idsLucene.add(id); } }; searcher.search(query,col); Collections.sort(idsLucene); final List<Long> idsPostgres = postgresChecker.getIds(); final int nLucene = idsLucene.size(); final int nPostgres = idsPostgres.size(); int iLucene = 0; int iPostgres = 0; LuanToString lts = new LuanToString(null,null); lts.settingsInit.strict = true; lts.settingsInit.numberTypes = true; while( iLucene < nLucene && iPostgres < nPostgres ) { long idLucene = idsLucene.get(iLucene); long idPostgres = idsPostgres.get(iPostgres); if( idLucene < idPostgres ) { iLucene++; checkPostgres(postgresChecker,lts,idLucene); } else if( idLucene > idPostgres ) { iPostgres++; checkPostgres(postgresChecker,lts,idPostgres); } else { // == LuanTable docPostgres = postgresChecker.getDoc(idPostgres); TopDocs td = searcher.search(new TermQuery(term("id",idLucene)),1); if( td.totalHits != 1 ) throw new RuntimeException(); Document doc = searcher.doc( td.scoreDocs[0].doc ); LuanTable docLucene = toTable(doc); if( !equal(docPostgres,docLucene) ) { checkPostgres(postgresChecker,lts,idPostgres); } iLucene++; iPostgres++; } } while( iLucene < nLucene ) { long idLucene = idsLucene.get(iLucene++); checkPostgres(postgresChecker,lts,idLucene); } while( iPostgres < nPostgres ) { long idPostgres = idsPostgres.get(iPostgres++); checkPostgres(postgresChecker,lts,idPostgres); } } finally { close(searcher); postgresChecker.close(); } } private void checkPostgres(PostgresBackup.Checker postgresChecker,LuanToString lts,long id) throws IOException, SQLException, LuanException, ParseException { //logger.info("check id "+id); writeLock.lock(); try { final IndexSearcher searcher = openSearcher(); try { LuanTable docPostgres = postgresChecker.getDoc(id); TopDocs td = searcher.search(new TermQuery(term("id",id)),1); LuanTable docLucene; if( td.totalHits == 0 ) { docLucene = null; } else if( td.totalHits == 1 ) { Document doc = searcher.doc( td.scoreDocs[0].doc ); docLucene = toTable(doc); } else throw new RuntimeException(); if( docPostgres == null ) { if( docLucene != null ) logger.error("id "+id+" found in lucene but not postgres"); return; } if( docLucene == null ) { logger.error("id "+id+" found in postgres but not lucene"); return; } if( !equal(docPostgres,docLucene) ) { logger.error("id "+id+" not equal"); logger.error("lucene = "+lts.toString(docLucene)); logger.error("postgres = "+lts.toString(docPostgres)); } } finally { close(searcher); } } finally { writeLock.unlock(); } } private static boolean equal(LuanTable t1,LuanTable t2) throws LuanException { return t1!=null && t2!=null && toJava(t1).equals(toJava(t2)); } private static Map toJava(LuanTable t) throws LuanException { Map map = t.asMap(); for( Iterator iter = map.entrySet().iterator(); iter.hasNext(); ) { Map.Entry entry = (Map.Entry)iter.next(); Object value = entry.getValue(); if( value instanceof LuanTable ) { LuanTable v = (LuanTable)value; if( !v.isList() ) logger.error("not list"); List list = v.asList(); if( list.isEmpty() ) { iter.remove(); } else if( list.size() == 1 ) { entry.setValue(list.get(0)); } else { entry.setValue(list); } } } return map; } }