Mercurial Hosting > luan
view src/luan/modules/lucene/LuceneIndex.java @ 1258:e4d7a3114fa8
support "Content-Type: application/json; charset=utf-8"
author | Franklin Schmidt <fschmidt@gmail.com> |
---|---|
date | Thu, 20 Sep 2018 22:11:11 -0600 |
parents | 475905984870 |
children | 9fa8b8389578 |
line wrap: on
line source
package luan.modules.lucene; import java.io.Closeable; import java.io.File; import java.io.FileOutputStream; import java.io.FileInputStream; import java.io.IOException; import java.util.Iterator; import java.util.Map; import java.util.List; import java.util.ArrayList; import java.util.Set; import java.util.HashSet; import java.util.Collections; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.zip.ZipOutputStream; import java.util.zip.ZipEntry; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.core.KeywordAnalyzer; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; import org.apache.lucene.document.StoredField; import org.apache.lucene.document.StringField; import org.apache.lucene.document.TextField; import org.apache.lucene.document.IntField; import org.apache.lucene.document.LongField; import org.apache.lucene.document.DoubleField; import org.apache.lucene.index.IndexableField; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.Term; import org.apache.lucene.index.SnapshotDeletionPolicy; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.AtomicReaderContext; import org.apache.lucene.store.Directory; import org.apache.lucene.store.FSDirectory; import org.apache.lucene.util.Version; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.NumericUtils; import org.apache.lucene.search.Query; import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.Sort; import org.apache.lucene.search.SortField; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.TotalHitCountCollector; import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.Collector; import org.apache.lucene.search.Scorer; import org.apache.lucene.search.BooleanClause; import org.apache.lucene.search.highlight.Formatter; import org.apache.lucene.search.highlight.Highlighter; import org.apache.lucene.search.highlight.InvalidTokenOffsetsException; import org.apache.lucene.search.highlight.Fragmenter; import org.apache.lucene.search.highlight.NullFragmenter; import org.apache.lucene.search.highlight.SimpleSpanFragmenter; import org.apache.lucene.search.highlight.QueryScorer; import org.apache.lucene.search.highlight.TokenGroup; import luan.modules.lucene.queryparser.SaneQueryParser; import luan.modules.lucene.queryparser.FieldParser; import luan.modules.lucene.queryparser.MultiFieldParser; import luan.modules.lucene.queryparser.StringFieldParser; import luan.modules.lucene.queryparser.NumberFieldParser; import luan.lib.parser.ParseException; import luan.modules.Utils; import luan.Luan; import luan.LuanState; import luan.LuanTable; import luan.LuanFunction; import luan.LuanException; import luan.LuanRuntimeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public final class LuceneIndex implements Closeable { private static final Logger logger = LoggerFactory.getLogger(LuceneIndex.class); private static final String FLD_NEXT_ID = "nextId"; public static final StringFieldParser STRING_FIELD_PARSER = new StringFieldParser(new KeywordAnalyzer()); private static final Version version = Version.LUCENE_4_9; private final ReentrantLock writeLock = new ReentrantLock(); private final File indexDir; private SnapshotDeletionPolicy snapshotDeletionPolicy; private IndexWriter writer; private DirectoryReader reader; private IndexSearcher searcher; private final ThreadLocal<IndexSearcher> threadLocalSearcher = new ThreadLocal<IndexSearcher>(); private boolean isClosed = true; private final MultiFieldParser mfp; public final LuanTable indexed_only_fields = new LuanTable(); private final Analyzer analyzer; private final Exception created = new Exception("created"); private static ConcurrentMap<File,AtomicInteger> globalWriteCounters = new ConcurrentHashMap<File,AtomicInteger>(); private File fileDir; private int writeCount; public LuceneIndex(LuanState luan,String indexDirStr,FieldParser defaultFieldParser,String[] defaultFields) throws LuanException, IOException { mfp = defaultFieldParser==null ? new MultiFieldParser() : new MultiFieldParser(defaultFieldParser,defaultFields); mfp.fields.put( "type", STRING_FIELD_PARSER ); mfp.fields.put( "id", NumberFieldParser.LONG ); File indexDir = new File(indexDirStr); this.indexDir = indexDir; Analyzer analyzer = STRING_FIELD_PARSER.analyzer; if( defaultFieldParser instanceof StringFieldParser ) { StringFieldParser sfp = (StringFieldParser)defaultFieldParser; analyzer = sfp.analyzer; } this.analyzer = analyzer; luan.onClose(this); reopen(); } public void reopen() throws LuanException, IOException { if( !isClosed ) throw new RuntimeException(); isClosed = false; IndexWriterConfig conf = new IndexWriterConfig(version,analyzer); snapshotDeletionPolicy = new SnapshotDeletionPolicy(conf.getIndexDeletionPolicy()); conf.setIndexDeletionPolicy(snapshotDeletionPolicy); FSDirectory dir = FSDirectory.open(indexDir); fileDir = dir.getDirectory(); globalWriteCounters.putIfAbsent(fileDir,new AtomicInteger()); writer = new IndexWriter(dir,conf); writer.commit(); // commit index creation reader = DirectoryReader.open(dir); searcher = new IndexSearcher(reader); initId(); } private int globalWriteCount() { return globalWriteCounters.get(fileDir).get(); } private void wrote() { globalWriteCounters.get(fileDir).incrementAndGet(); } public void delete_all() throws IOException { boolean commit = !writeLock.isHeldByCurrentThread(); writeLock.lock(); try { writer.deleteAll(); id = idLim = 0; if(commit) writer.commit(); } finally { wrote(); writeLock.unlock(); } } private static Term term(String key,long value) { BytesRef br = new BytesRef(); NumericUtils.longToPrefixCoded(value,0,br); return new Term(key,br); } public void delete(LuanState luan,String queryStr) throws LuanException, IOException, ParseException { Query query = SaneQueryParser.parseQuery(mfp,queryStr); boolean commit = !writeLock.isHeldByCurrentThread(); writeLock.lock(); try { writer.deleteDocuments(query); if(commit) writer.commit(); } finally { wrote(); writeLock.unlock(); } } public void save(LuanState luan,LuanTable doc) throws LuanException, IOException { Set indexedOnlySet = new HashSet(); Object typeObj = doc.get(luan,"type"); if( typeObj==null ) throw new LuanException("missing 'type' field"); if( !(typeObj instanceof String) ) throw new LuanException("type must be string"); String type = (String)typeObj; Object indexedOnlyObj = indexed_only_fields.get(luan,type); if( indexedOnlyObj != null ) { if( !(indexedOnlyObj instanceof LuanTable) ) throw new LuanException("indexed_only_fields elements must be tables"); LuanTable indexedOnly = (LuanTable)indexedOnlyObj; for( Map.Entry<Object,Object> entry : indexedOnly.iterable(luan) ) { Object key = entry.getKey(); if( !(key instanceof String) ) throw new LuanException("indexed_only_fields."+type+" entries must be strings"); String name = (String)key; Object value = entry.getValue(); if( !(value instanceof LuanFunction) ) throw new LuanException("indexed_only_fields."+type+" values must be functions"); LuanFunction fn = (LuanFunction)value; value = Luan.first(fn.call(luan,new Object[]{doc})); doc.put(luan, name, value ); indexedOnlySet.add(name); } } Object obj = doc.get(luan,"id"); Long id; try { id = (Long)obj; } catch(ClassCastException e) { throw new LuanException("id should be Long but is "+obj.getClass().getSimpleName()); } boolean commit = !writeLock.isHeldByCurrentThread(); writeLock.lock(); try { if( id == null ) { id = nextId(luan); doc.put(luan,"id",id); writer.addDocument(toLucene(luan,doc,indexedOnlySet)); } else { writer.updateDocument( term("id",id), toLucene(luan,doc,indexedOnlySet) ); } if(commit) writer.commit(); } finally { wrote(); writeLock.unlock(); } } public void update_in_transaction(LuanState luan,LuanFunction fn) throws IOException, LuanException { boolean commit = !writeLock.isHeldByCurrentThread(); writeLock.lock(); try { fn.call(luan); if(commit) writer.commit(); } finally { wrote(); writeLock.unlock(); } } public void run_in_lock(LuanState luan,LuanFunction fn) throws IOException, LuanException { if( writeLock.isHeldByCurrentThread() ) throw new RuntimeException(); writeLock.lock(); try { synchronized(this) { fn.call(luan); } } finally { wrote(); writeLock.unlock(); } } private long id; private long idLim; private final int idBatch = 10; private void initId() throws LuanException, IOException { TopDocs td = searcher.search(new TermQuery(new Term("type","next_id")),1); switch(td.totalHits) { case 0: id = 0; idLim = 0; break; case 1: idLim = (Long)searcher.doc(td.scoreDocs[0].doc).getField(FLD_NEXT_ID).numericValue(); id = idLim; break; default: throw new RuntimeException(); } } public synchronized long nextId(LuanState luan) throws LuanException, IOException { if( ++id > idLim ) { idLim += idBatch; LuanTable doc = new LuanTable(); doc.rawPut( "type", "next_id" ); doc.rawPut( FLD_NEXT_ID, idLim ); writer.updateDocument(new Term("type","next_id"),toLucene(luan,doc,Collections.EMPTY_SET)); wrote(); } return id; } /* public void backup(String zipFile) throws LuanException, IOException { if( !zipFile.endsWith(".zip") ) throw new LuanException("file "+zipFile+" doesn't end with '.zip'"); IndexCommit ic = snapshotDeletionPolicy.snapshot(); try { ZipOutputStream out = new ZipOutputStream(new FileOutputStream(zipFile)); for( String fileName : ic.getFileNames() ) { out.putNextEntry(new ZipEntry(fileName)); FileInputStream in = new FileInputStream(new File(indexDir,fileName)); Utils.copyAll(in,out); in.close(); out.closeEntry(); } out.close(); } finally { snapshotDeletionPolicy.release(ic); } } */ public SnapshotDeletionPolicy snapshotDeletionPolicy() { return snapshotDeletionPolicy; } public Object snapshot(LuanState luan,LuanFunction fn) throws LuanException, IOException { IndexCommit ic = snapshotDeletionPolicy.snapshot(); try { String dir = fileDir.toString(); LuanTable fileNames = new LuanTable(new ArrayList(ic.getFileNames())); return fn.call(luan,new Object[]{dir,fileNames}); } finally { snapshotDeletionPolicy.release(ic); } } public String to_string() { return writer.getDirectory().toString(); } public void close() throws IOException { if( !isClosed ) { writer.close(); reader.close(); isClosed = true; } } protected void finalize() throws Throwable { if( !isClosed ) { logger.error("not closed",created); close(); } super.finalize(); } private static class DocFn extends LuanFunction { final IndexSearcher searcher; int docID; DocFn(IndexSearcher searcher) { this.searcher = searcher; } @Override public Object call(LuanState luan,Object[] args) throws LuanException { try { return toTable(searcher.doc(docID)); } catch(IOException e) { throw new LuanException(e); } } } private static abstract class MyCollector extends Collector { int docBase; int i = 0; @Override public void setScorer(Scorer scorer) {} @Override public void setNextReader(AtomicReaderContext context) { this.docBase = context.docBase; } @Override public boolean acceptsDocsOutOfOrder() { return true; } } private synchronized IndexSearcher openSearcher() throws IOException { int gwc = globalWriteCount(); if( writeCount != gwc ) { writeCount = gwc; DirectoryReader newReader = DirectoryReader.openIfChanged(reader); if( newReader != null ) { reader.decRef(); reader = newReader; searcher = new IndexSearcher(reader); } } reader.incRef(); return searcher; } // call in finally block private static void close(IndexSearcher searcher) throws IOException { searcher.getIndexReader().decRef(); } public void ensure_open() throws IOException { close(openSearcher()); } public int advanced_search( final LuanState luan, String queryStr, LuanFunction fn, Integer n, String sortStr ) throws LuanException, IOException, ParseException { Utils.checkNotNull(queryStr); Query query = SaneQueryParser.parseQuery(mfp,queryStr); IndexSearcher searcher = threadLocalSearcher.get(); boolean inTransaction = searcher != null; if( !inTransaction ) searcher = openSearcher(); try { if( fn!=null && n==null ) { if( sortStr != null ) throw new LuanException("sort must be nil when n is nil"); final DocFn docFn = new DocFn(searcher); MyCollector col = new MyCollector() { @Override public void collect(int doc) { try { docFn.docID = docBase + doc; fn.call(luan,new Object[]{++i,docFn}); } catch(LuanException e) { throw new LuanRuntimeException(e); } } }; try { searcher.search(query,col); } catch(LuanRuntimeException e) { throw (LuanException)e.getCause(); } return col.i; } if( fn==null || n==0 ) { TotalHitCountCollector thcc = new TotalHitCountCollector(); searcher.search(query,thcc); return thcc.getTotalHits(); } Sort sort = sortStr==null ? null : SaneQueryParser.parseSort(mfp,sortStr); TopDocs td = sort==null ? searcher.search(query,n) : searcher.search(query,n,sort); final ScoreDoc[] scoreDocs = td.scoreDocs; DocFn docFn = new DocFn(searcher); for( int i=0; i<scoreDocs.length; i++ ) { docFn.docID = scoreDocs[i].doc; fn.call(luan,new Object[]{i+1,docFn}); } return td.totalHits; } finally { if( !inTransaction ) close(searcher); } } public Object search_in_transaction(LuanState luan,LuanFunction fn) throws LuanException, IOException { if( threadLocalSearcher.get() != null ) throw new LuanException("can't nest search_in_transaction calls"); IndexSearcher searcher = openSearcher(); threadLocalSearcher.set(searcher); try { return fn.call(luan); } finally { threadLocalSearcher.set(null); close(searcher); } } public FieldParser getIndexedFieldParser(String field) { return mfp.fields.get(field); } public void setIndexedFieldParser(String field,FieldParser fp) { if( fp==null ) { // delete mfp.fields.remove(field); return; } mfp.fields.put( field, fp ); } private IndexableField newField(String name,Object value,Field.Store store,Set<String> indexed) throws LuanException { if( value instanceof String ) { String s = (String)value; FieldParser fp = mfp.fields.get(name); if( fp != null ) { if( fp instanceof StringFieldParser && fp != STRING_FIELD_PARSER ) { return new TextField(name, s, store); } else { return new StringField(name, s, store); } } else { return new StoredField(name, s); } } else if( value instanceof Integer ) { int i = (Integer)value; if( indexed.contains(name) ) { return new IntField(name, i, store); } else { return new StoredField(name, i); } } else if( value instanceof Long ) { long i = (Long)value; if( indexed.contains(name) ) { return new LongField(name, i, store); } else { return new StoredField(name, i); } } else if( value instanceof Double ) { double i = (Double)value; if( indexed.contains(name) ) { return new DoubleField(name, i, store); } else { return new StoredField(name, i); } } else if( value instanceof byte[] ) { byte[] b = (byte[])value; return new StoredField(name, b); } else throw new LuanException("invalid value type "+value.getClass()+"' for '"+name+"'"); } private Document toLucene(LuanState luan,LuanTable table,Set indexOnly) throws LuanException { Set<String> indexed = mfp.fields.keySet(); Document doc = new Document(); for( Map.Entry<Object,Object> entry : table.iterable(luan) ) { Object key = entry.getKey(); if( !(key instanceof String) ) throw new LuanException("key must be string"); String name = (String)key; Object value = entry.getValue(); Field.Store store = indexOnly.contains(name) ? Field.Store.NO : Field.Store.YES; if( !(value instanceof LuanTable) ) { doc.add(newField(name, value, store, indexed)); } else { // list LuanTable list = (LuanTable)value; for( Object el : list.asList() ) { doc.add(newField(name, el, store, indexed)); } } } return doc; } private static Object getValue(IndexableField ifld) throws LuanException { BytesRef br = ifld.binaryValue(); if( br != null ) return br.bytes; Number n = ifld.numericValue(); if( n != null ) return n; String s = ifld.stringValue(); if( s != null ) return s; throw new LuanException("invalid field type for "+ifld); } private static LuanTable toTable(Document doc) throws LuanException { if( doc==null ) return null; LuanTable table = new LuanTable(); for( IndexableField ifld : doc ) { String name = ifld.name(); Object value = getValue(ifld); Object old = table.rawGet(name); if( old == null ) { table.rawPut(name,value); } else { LuanTable list; if( old instanceof LuanTable ) { list = (LuanTable)old; } else { list = new LuanTable(); list.rawPut(1,old); table.rawPut(name,list); } list.rawPut(list.rawLength()+1,value); } } return table; } private static final Formatter nullFormatter = new Formatter() { public String highlightTerm(String originalText,TokenGroup tokenGroup) { return originalText; } }; public LuanFunction highlighter(LuanState luan,String queryStr,LuanFunction formatter,final Integer fragmentSize,String dotdotdot) throws ParseException { Query query = SaneQueryParser.parseQuery(mfp,queryStr); Formatter fmt = new Formatter() { public String highlightTerm(String originalText,TokenGroup tokenGroup) { if( tokenGroup.getTotalScore() <= 0 ) return originalText; try { return (String)Luan.first(formatter.call(luan,new Object[]{originalText})); } catch(LuanException e) { throw new LuanRuntimeException(e); } } }; QueryScorer queryScorer = new QueryScorer(query); final Highlighter chooser = fragmentSize==null ? null : new Highlighter(nullFormatter,queryScorer); if( chooser != null ) chooser.setTextFragmenter( new SimpleSpanFragmenter(queryScorer,fragmentSize) ); final Highlighter hl = new Highlighter(fmt,queryScorer); hl.setTextFragmenter( new NullFragmenter() ); return new LuanFunction() { @Override public String call(LuanState luan,Object[] args) throws LuanException { String text = (String)args[0]; try { if( chooser != null ) { String s = chooser.getBestFragment(analyzer,null,text); if( s != null ) { if( dotdotdot != null ) { boolean atStart = text.startsWith(s); boolean atEnd = text.endsWith(s); if( !atStart ) s = dotdotdot + s; if( !atEnd ) s = s + dotdotdot; } text = s; } else if( text.length() > fragmentSize ) { text = text.substring(0,fragmentSize); if( dotdotdot != null ) text += "..."; } } String s = hl.getBestFragment(analyzer,null,text); return s!=null ? s : text; } catch(LuanRuntimeException e) { throw (LuanException)e.getCause(); } catch(IOException e) { throw new RuntimeException(e); } catch(InvalidTokenOffsetsException e) { throw new RuntimeException(e); } } }; } }