Mercurial Hosting > luan
view src/luan/modules/lucene/LuceneIndex.java @ 1390:179c4882c6b6
backup work
author | Franklin Schmidt <fschmidt@gmail.com> |
---|---|
date | Wed, 04 Sep 2019 00:06:42 -0600 |
parents | 2024d23ddd64 |
children | 94f48cc76de8 |
line wrap: on
line source
package luan.modules.lucene; import java.io.Closeable; import java.io.File; import java.io.FileOutputStream; import java.io.FileInputStream; import java.io.IOException; import java.util.Arrays; import java.util.Iterator; import java.util.Map; import java.util.HashMap; import java.util.List; import java.util.ArrayList; import java.util.Set; import java.util.HashSet; import java.util.Collections; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.zip.ZipOutputStream; import java.util.zip.ZipEntry; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.TokenStream; import org.apache.lucene.analysis.core.KeywordAnalyzer; import org.apache.lucene.analysis.en.EnglishAnalyzer; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; import org.apache.lucene.document.StoredField; import org.apache.lucene.document.StringField; import org.apache.lucene.document.TextField; import org.apache.lucene.document.IntField; import org.apache.lucene.document.LongField; import org.apache.lucene.document.DoubleField; import org.apache.lucene.index.IndexableField; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.Term; import org.apache.lucene.index.SnapshotDeletionPolicy; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.AtomicReaderContext; import org.apache.lucene.store.Directory; import org.apache.lucene.store.FSDirectory; import org.apache.lucene.util.Version; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.NumericUtils; import org.apache.lucene.search.Query; import org.apache.lucene.search.PrefixQuery; import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.Sort; import org.apache.lucene.search.SortField; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.TotalHitCountCollector; import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.Collector; import org.apache.lucene.search.Scorer; import org.apache.lucene.search.Explanation; import org.apache.lucene.search.highlight.Formatter; import org.apache.lucene.search.highlight.Highlighter; import org.apache.lucene.search.highlight.InvalidTokenOffsetsException; import org.apache.lucene.search.highlight.Fragmenter; import org.apache.lucene.search.highlight.NullFragmenter; import org.apache.lucene.search.highlight.SimpleSpanFragmenter; import org.apache.lucene.search.highlight.QueryScorer; import org.apache.lucene.search.highlight.TokenGroup; import luan.lib.queryparser.SaneQueryParser; import luan.lib.queryparser.FieldParser; import luan.lib.queryparser.MultiFieldParser; import luan.lib.queryparser.StringFieldParser; import luan.lib.queryparser.NumberFieldParser; import luan.lib.parser.ParseException; import luan.modules.Utils; import luan.Luan; import luan.LuanTable; import luan.LuanFunction; import luan.LuanException; import luan.LuanRuntimeException; import luan.lib.logging.Logger; import luan.lib.logging.LoggerFactory; public final class LuceneIndex { private static final Logger logger = LoggerFactory.getLogger(LuceneIndex.class); private static final class Closer implements Closeable { final LuceneIndex li; boolean isClosed = false; private final Exception created = new Exception("created"); Closer(Luan luan,LuceneIndex li) { this.li = li; luan.onClose(this); } public void close() throws IOException { if( !isClosed ) { li.close(); isClosed = true; } } protected void finalize() throws Throwable { if( !isClosed ) { logger.error("not closed",created); close(); } super.finalize(); } } private static Map<String,LuceneIndex> indexes = new HashMap<String,LuceneIndex>(); public static Object[] getLuceneIndex(Luan luan,File indexDir,FieldParser defaultFieldParser,String[] defaultFields,LuanFunction completer) throws LuanException, IOException { String key = indexDir.getCanonicalPath(); synchronized(indexes) { LuceneIndex li = indexes.get(key); if( li == null ) { li = new LuceneIndex(indexDir,defaultFieldParser,defaultFields,key,completer); li.openCount = 1; indexes.put(key,li); } else { if( defaultFieldParser != li.defaultFieldParser ) throw new LuanException("default_type doesn't match previous use"); if( !Arrays.equals(defaultFields,li.defaultFields) ) throw new LuanException("default_fields don't match previous use"); li.openCount++; } return new Object[]{li,new Closer(luan,li)}; } } private static final Version version = Version.LUCENE_4_9; private static final String FLD_NEXT_ID = "nextId"; public static final StringFieldParser STRING_FIELD_PARSER = new StringFieldParser(new KeywordAnalyzer()); public static final StringFieldParser ENGLISH_FIELD_PARSER = new StringFieldParser(new EnglishAnalyzer(version)); private final ReentrantLock writeLock = new ReentrantLock(); private final File indexDir; private SnapshotDeletionPolicy snapshotDeletionPolicy; private IndexWriter writer; private DirectoryReader reader; private IndexSearcher searcher; private final ThreadLocal<IndexSearcher> threadLocalSearcher = new ThreadLocal<IndexSearcher>(); private final MultiFieldParser mfp; private final Analyzer analyzer; private File fileDir; private int writeCount; private AtomicInteger writeCounter = new AtomicInteger(); private Set<String> indexOnly = new HashSet<String>(); private int openCount; private final String key; private final FieldParser defaultFieldParser; private final String[] defaultFields; private final PostgresBackup postgresBackup; private LuceneIndex(File indexDir,FieldParser defaultFieldParser,String[] defaultFields,String key,LuanFunction completer) throws LuanException, IOException { this.key = key; this.defaultFieldParser = defaultFieldParser; this.defaultFields = defaultFields; mfp = defaultFieldParser==null ? new MultiFieldParser() : new MultiFieldParser(defaultFieldParser,defaultFields); mfp.fields.put( "type", STRING_FIELD_PARSER ); mfp.fields.put( "id", NumberFieldParser.LONG ); this.indexDir = indexDir; Analyzer analyzer = STRING_FIELD_PARSER.analyzer; if( defaultFieldParser instanceof StringFieldParser ) { StringFieldParser sfp = (StringFieldParser)defaultFieldParser; analyzer = sfp.analyzer; } this.analyzer = analyzer; boolean wasCreated = reopen(); postgresBackup = completer!=null ? PostgresBackup.newInstance() : null; if( postgresBackup != null ) { if( !wasCreated && postgresBackup.wasCreated ) { logger.error("rebuilding postgres backup"); rebuild_postgres_backup(completer); } else if( wasCreated && !postgresBackup.wasCreated ) { logger.error("restoring from postgres"); restore_from_postgres(); } } } public boolean reopen() throws IOException { IndexWriterConfig conf = new IndexWriterConfig(version,analyzer); snapshotDeletionPolicy = new SnapshotDeletionPolicy(conf.getIndexDeletionPolicy()); conf.setIndexDeletionPolicy(snapshotDeletionPolicy); FSDirectory dir = FSDirectory.open(indexDir); fileDir = dir.getDirectory(); boolean wasCreated = !fileDir.exists(); writer = new IndexWriter(dir,conf); writer.commit(); // commit index creation reader = DirectoryReader.open(dir); searcher = new IndexSearcher(reader); initId(); return wasCreated; } private void wrote() { writeCounter.incrementAndGet(); } public void delete_all() throws IOException { boolean commit = !writeLock.isHeldByCurrentThread(); writeLock.lock(); try { writer.deleteAll(); id = idLim = 0; if( postgresBackup != null ) postgresBackup.deleteAll(); if(commit) writer.commit(); } finally { wrote(); writeLock.unlock(); } } private static Term term(String key,long value) { BytesRef br = new BytesRef(); NumericUtils.longToPrefixCoded(value,0,br); return new Term(key,br); } private void backupDelete(Query query) throws IOException { if( postgresBackup != null ) { final List<Long> ids = new ArrayList<Long>(); IndexSearcher searcher = openSearcher(); MyCollector col = new MyCollector() { @Override public void collect(int iDoc) throws IOException { Document doc = searcher.doc( docBase + iDoc ); Long id = (Long)doc.getField("id").numericValue(); ids.add(id); } }; searcher.search(query,col); postgresBackup.begin(); for( Long id : ids ) { postgresBackup.delete(id); } postgresBackup.commit(); } } public void delete(String queryStr) throws IOException, ParseException { Query query = SaneQueryParser.parseQuery(mfp,queryStr); boolean commit = !writeLock.isHeldByCurrentThread(); writeLock.lock(); try { backupDelete(query); writer.deleteDocuments(query); if(commit) writer.commit(); } finally { wrote(); writeLock.unlock(); } } public void indexed_only_fields(List<String> fields) { indexOnly.addAll(fields); } public void save(LuanTable doc,LuanTable boosts) throws LuanException, IOException { if( boosts!=null && postgresBackup!=null ) logger.error("boosts are not saved to postgres backup"); Object obj = doc.get("id"); Long id; try { id = (Long)obj; } catch(ClassCastException e) { throw new LuanException("id should be Long but is "+obj.getClass().getSimpleName()); } boolean commit = !writeLock.isHeldByCurrentThread(); writeLock.lock(); try { if( id == null ) { id = nextId(); doc.put("id",id); writer.addDocument(toLucene(doc,boosts)); if( postgresBackup != null ) postgresBackup.add(id,doc); } else { writer.updateDocument( term("id",id), toLucene(doc,boosts) ); if( postgresBackup != null ) postgresBackup.update(id,doc); } if(commit) writer.commit(); } finally { wrote(); writeLock.unlock(); } } public Object run_in_transaction(LuanFunction fn) throws IOException, LuanException { boolean commit = !writeLock.isHeldByCurrentThread(); writeLock.lock(); boolean ok = false; try { if( commit && postgresBackup != null ) postgresBackup.begin(); Object rtn = fn.call(); ok = true; if(commit) { if( postgresBackup != null ) postgresBackup.commit(); writer.commit(); } return rtn; } finally { if( !ok && commit ) { if( postgresBackup != null ) postgresBackup.rollback(); writer.rollback(); reopen(); } wrote(); writeLock.unlock(); } } // ??? public Object run_in_lock(LuanFunction fn) throws IOException, LuanException { if( writeLock.isHeldByCurrentThread() ) throw new RuntimeException(); writeLock.lock(); try { synchronized(this) { return fn.call(); } } finally { wrote(); writeLock.unlock(); } } private long id; private long idLim; private final int idBatch = 10; private void initId() throws IOException { TopDocs td = searcher.search(new TermQuery(new Term("type","next_id")),1); switch(td.totalHits) { case 0: id = 0; idLim = 0; break; case 1: idLim = (Long)searcher.doc(td.scoreDocs[0].doc).getField(FLD_NEXT_ID).numericValue(); id = idLim; break; default: throw new RuntimeException(); } } private void saveNextId(long nextId) throws LuanException, IOException { Map doc = new HashMap(); doc.put( "type", "next_id" ); doc.put( FLD_NEXT_ID, idLim ); writer.updateDocument(new Term("type","next_id"),toLucene(doc.entrySet(),null)); } public synchronized long nextId() throws LuanException, IOException { if( ++id > idLim ) { idLim += idBatch; saveNextId(idLim); wrote(); } return id; } /* public void backup(String zipFile) throws LuanException, IOException { if( !zipFile.endsWith(".zip") ) throw new LuanException("file "+zipFile+" doesn't end with '.zip'"); IndexCommit ic = snapshotDeletionPolicy.snapshot(); try { ZipOutputStream out = new ZipOutputStream(new FileOutputStream(zipFile)); for( String fileName : ic.getFileNames() ) { out.putNextEntry(new ZipEntry(fileName)); FileInputStream in = new FileInputStream(new File(indexDir,fileName)); Utils.copyAll(in,out); in.close(); out.closeEntry(); } out.close(); } finally { snapshotDeletionPolicy.release(ic); } } */ public SnapshotDeletionPolicy snapshotDeletionPolicy() { return snapshotDeletionPolicy; } public Object snapshot(LuanFunction fn) throws LuanException, IOException { IndexCommit ic = snapshotDeletionPolicy.snapshot(); try { String dir = fileDir.toString(); LuanTable fileNames = new LuanTable(fn.luan(),new ArrayList(ic.getFileNames())); return fn.call(dir,fileNames); } finally { snapshotDeletionPolicy.release(ic); } } public String to_string() { return writer.getDirectory().toString(); } private synchronized void close() throws IOException { if( openCount > 0 ) { if( --openCount == 0 ) { doClose(); synchronized(indexes) { indexes.remove(key); } } } } public void doClose() throws IOException { if( postgresBackup != null ) postgresBackup.close(); writer.close(); reader.close(); } private static class DocFn extends LuanFunction { final IndexSearcher searcher; final Query query; int docID; DocFn(Luan luan,IndexSearcher searcher,Query query) { super(luan); this.searcher = searcher; this.query = query; } @Override public Object call(Object[] args) throws LuanException { try { LuanTable doc = toTable(luan(),searcher.doc(docID)); if( args.length > 0 && "explain".equals(args[0]) ) { Explanation explanation = searcher.explain(query,docID); return new Object[]{doc,explanation}; } else { return doc; } } catch(IOException e) { throw new LuanException(e); } } } private static abstract class MyCollector extends Collector { int docBase; int i = 0; @Override public void setScorer(Scorer scorer) {} @Override public void setNextReader(AtomicReaderContext context) { this.docBase = context.docBase; } @Override public boolean acceptsDocsOutOfOrder() { return true; } } private synchronized IndexSearcher openSearcher() throws IOException { int gwc = writeCounter.get(); if( writeCount != gwc ) { writeCount = gwc; DirectoryReader newReader = DirectoryReader.openIfChanged(reader); if( newReader != null ) { reader.decRef(); reader = newReader; searcher = new IndexSearcher(reader); } } reader.incRef(); return searcher; } // call in finally block private static void close(IndexSearcher searcher) throws IOException { searcher.getIndexReader().decRef(); } public void ensure_open() throws IOException { close(openSearcher()); } public int advanced_search( String queryStr, LuanFunction fn, Integer n, String sortStr ) throws LuanException, IOException, ParseException { Utils.checkNotNull(queryStr); Query query = SaneQueryParser.parseQuery(mfp,queryStr); IndexSearcher searcher = threadLocalSearcher.get(); boolean inTransaction = searcher != null; if( !inTransaction ) searcher = openSearcher(); try { if( fn!=null && n==null ) { if( sortStr != null ) throw new LuanException("sort must be nil when n is nil"); final DocFn docFn = new DocFn(fn.luan(),searcher,query); MyCollector col = new MyCollector() { @Override public void collect(int doc) { try { docFn.docID = docBase + doc; fn.call(++i,docFn); } catch(LuanException e) { throw new LuanRuntimeException(e); } } }; try { searcher.search(query,col); } catch(LuanRuntimeException e) { throw (LuanException)e.getCause(); } return col.i; } if( fn==null || n==0 ) { TotalHitCountCollector thcc = new TotalHitCountCollector(); searcher.search(query,thcc); return thcc.getTotalHits(); } Sort sort = sortStr==null ? null : SaneQueryParser.parseSort(mfp,sortStr); TopDocs td = sort==null ? searcher.search(query,n) : searcher.search(query,n,sort); final ScoreDoc[] scoreDocs = td.scoreDocs; DocFn docFn = new DocFn(fn.luan(),searcher,query); for( int i=0; i<scoreDocs.length; i++ ) { ScoreDoc scoreDoc = scoreDocs[i]; docFn.docID = scoreDoc.doc; fn.call(i+1,docFn,scoreDoc.score); } return td.totalHits; } finally { if( !inTransaction ) close(searcher); } } public Object search_in_transaction(LuanFunction fn) throws LuanException, IOException { if( threadLocalSearcher.get() != null ) throw new LuanException("can't nest search_in_transaction calls"); IndexSearcher searcher = openSearcher(); threadLocalSearcher.set(searcher); try { return fn.call(); } finally { threadLocalSearcher.set(null); close(searcher); } } public FieldParser getIndexedFieldParser(String field) { return mfp.fields.get(field); } public void setIndexedFieldParser(String field,FieldParser fp) { if( fp==null ) { // delete mfp.fields.remove(field); return; } mfp.fields.put( field, fp ); } private IndexableField newField(String name,Object value,Set<String> indexed,Float boost) throws LuanException { boolean hasBoost = boost!=null; IndexableField fld = newField2(name,value,indexed,hasBoost); if( hasBoost ) ((Field)fld).setBoost(boost); return fld; } private IndexableField newField2(String name,Object value,Set<String> indexed,boolean hasBoost) throws LuanException { Field.Store store = indexOnly.contains(name) ? Field.Store.NO : Field.Store.YES; if( value instanceof String ) { String s = (String)value; FieldParser fp = mfp.fields.get(name); if( fp != null ) { if( fp instanceof StringFieldParser && fp != STRING_FIELD_PARSER ) { return new TextField(name, s, store); } else if (hasBoost) { // fuck you modern lucene developers return new Field(name, s, store, Field.Index.NOT_ANALYZED); } else { return new StringField(name, s, store); } } else { return new StoredField(name, s); } } else if( value instanceof Integer ) { int i = (Integer)value; if( indexed.contains(name) ) { return new IntField(name, i, store); } else { return new StoredField(name, i); } } else if( value instanceof Long ) { long i = (Long)value; if( indexed.contains(name) ) { return new LongField(name, i, store); } else { return new StoredField(name, i); } } else if( value instanceof Double ) { double i = (Double)value; if( indexed.contains(name) ) { return new DoubleField(name, i, store); } else { return new StoredField(name, i); } } else if( value instanceof byte[] ) { byte[] b = (byte[])value; return new StoredField(name, b); } else throw new LuanException("invalid value type "+value.getClass()+"' for '"+name+"'"); } private Document toLucene(LuanTable table,LuanTable boosts) throws LuanException { return toLucene(table.iterable(),boosts); } private Document toLucene(Iterable<Map.Entry> iterable,LuanTable boosts) throws LuanException { Set<String> indexed = mfp.fields.keySet(); Document doc = new Document(); for( Map.Entry<Object,Object> entry : iterable ) { Object key = entry.getKey(); if( !(key instanceof String) ) throw new LuanException("key must be string"); String name = (String)key; Object value = entry.getValue(); Float boost = null; if( boosts != null ) { Object obj = boosts.get(name); if( obj != null ) { if( !(obj instanceof Number) ) throw new LuanException("boost '"+name+"' must be number"); boost = ((Number)obj).floatValue(); } } if( !(value instanceof LuanTable) ) { doc.add(newField( name, value, indexed, boost )); } else { // list LuanTable list = (LuanTable)value; for( Object el : list.asList() ) { doc.add(newField( name, el, indexed, boost )); } } } return doc; } private static Object getValue(IndexableField ifld) throws LuanException { BytesRef br = ifld.binaryValue(); if( br != null ) return br.bytes; Number n = ifld.numericValue(); if( n != null ) return n; String s = ifld.stringValue(); if( s != null ) return s; throw new LuanException("invalid field type for "+ifld); } private static LuanTable toTable(Luan luan,Document doc) throws LuanException { if( doc==null ) return null; LuanTable table = new LuanTable(luan); for( IndexableField ifld : doc ) { String name = ifld.name(); Object value = getValue(ifld); Object old = table.rawGet(name); if( old == null ) { table.rawPut(name,value); } else { LuanTable list; if( old instanceof LuanTable ) { list = (LuanTable)old; } else { list = new LuanTable(luan); list.rawPut(1,old); table.rawPut(name,list); } list.rawPut(list.rawLength()+1,value); } } return table; } private static final Formatter nullFormatter = new Formatter() { public String highlightTerm(String originalText,TokenGroup tokenGroup) { return originalText; } }; public LuanFunction highlighter(String queryStr,final LuanFunction formatter,final Integer fragmentSize,String dotdotdot) throws ParseException { Query query = SaneQueryParser.parseQuery(mfp,queryStr); Formatter fmt = new Formatter() { public String highlightTerm(String originalText,TokenGroup tokenGroup) { if( tokenGroup.getTotalScore() <= 0 ) return originalText; try { return (String)Luan.first(formatter.call(originalText)); } catch(LuanException e) { throw new LuanRuntimeException(e); } } }; QueryScorer queryScorer = new QueryScorer(query); final Highlighter chooser = fragmentSize==null ? null : new Highlighter(nullFormatter,queryScorer); if( chooser != null ) chooser.setTextFragmenter( new SimpleSpanFragmenter(queryScorer,fragmentSize) ); final Highlighter hl = new Highlighter(fmt,queryScorer); hl.setTextFragmenter( new NullFragmenter() ); return new LuanFunction(false) { // ??? @Override public String call(Object[] args) throws LuanException { String text = (String)args[0]; try { if( chooser != null ) { String s = chooser.getBestFragment(analyzer,null,text); if( s != null ) { if( dotdotdot != null ) { boolean atStart = text.startsWith(s); boolean atEnd = text.endsWith(s); if( !atStart ) s = dotdotdot + s; if( !atEnd ) s = s + dotdotdot; } text = s; } else if( text.length() > fragmentSize ) { text = text.substring(0,fragmentSize); if( dotdotdot != null ) text += "..."; } } String s = hl.getBestFragment(analyzer,null,text); return s!=null ? s : text; } catch(LuanRuntimeException e) { throw (LuanException)e.getCause(); } catch(IOException e) { throw new RuntimeException(e); } catch(InvalidTokenOffsetsException e) { throw new RuntimeException(e); } } }; } public int count_tokens(String text) throws IOException { int n = 0; TokenStream ts = analyzer.tokenStream(null,text); ts.reset(); while( ts.incrementToken() ) { n++; } ts.close(); return n; } public boolean hasPostgresBackup() { return postgresBackup != null; } public void rebuild_postgres_backup(LuanFunction completer) throws IOException, LuanException { writeLock.lock(); boolean ok = false; try { postgresBackup.begin(); postgresBackup.deleteAll(); Query query = new PrefixQuery(new Term("id")); IndexSearcher searcher = openSearcher(); MyCollector col = new MyCollector() { @Override public void collect(int iDoc) throws IOException { try { Document doc = searcher.doc( docBase + iDoc ); LuanTable tbl = toTable(completer.luan(),doc); tbl = (LuanTable)completer.call(tbl); Long id = (Long)tbl.get("id"); //logger.info("id = "+id); postgresBackup.add(id,tbl); } catch(LuanException e) { throw new LuanRuntimeException(e); } } }; try { searcher.search(query,col); } catch(LuanRuntimeException e) { throw (LuanException)e.getCause(); } ok = true; postgresBackup.commit(); } finally { if( !ok ) postgresBackup.rollback(); writeLock.unlock(); } } public void restore_from_postgres() throws IOException, LuanException { if( postgresBackup==null ) throw new NullPointerException(); if( writeLock.isHeldByCurrentThread() ) throw new RuntimeException(); writeLock.lock(); boolean ok = false; try { writer.deleteAll(); long nextId = postgresBackup.maxId() + 1; postgresBackup.restoreLucene(this); id = idLim = nextId; ok = true; writer.commit(); } finally { if( !ok ) { writer.rollback(); reopen(); } wrote(); writeLock.unlock(); } } void restore(LuanTable doc) throws LuanException, IOException { writer.addDocument(toLucene(doc,null)); } }