Mercurial Hosting > luan
view lucene/src/luan/modules/lucene/LuceneIndex.java @ 578:60c549d43988
remove LuanState.exception()
author | Franklin Schmidt <fschmidt@gmail.com> |
---|---|
date | Tue, 14 Jul 2015 17:40:48 -0600 |
parents | 7c3ad6db8ac3 |
children | 790d5de23042 |
line wrap: on
line source
package luan.modules.lucene; import java.io.Closeable; import java.io.File; import java.io.FileOutputStream; import java.io.FileInputStream; import java.io.IOException; import java.util.Iterator; import java.util.Map; import java.util.List; import java.util.ArrayList; import java.util.Set; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.zip.ZipOutputStream; import java.util.zip.ZipEntry; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.core.KeywordAnalyzer; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; import org.apache.lucene.document.StoredField; import org.apache.lucene.document.StringField; import org.apache.lucene.document.IntField; import org.apache.lucene.document.LongField; import org.apache.lucene.document.DoubleField; import org.apache.lucene.index.IndexableField; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.Term; import org.apache.lucene.index.SnapshotDeletionPolicy; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.AtomicReaderContext; import org.apache.lucene.store.Directory; import org.apache.lucene.store.FSDirectory; import org.apache.lucene.util.Version; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.NumericUtils; import org.apache.lucene.search.Query; import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.Sort; import org.apache.lucene.search.SortField; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.TotalHitCountCollector; import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.Collector; import org.apache.lucene.search.Scorer; import sane.lucene.queryparser.SaneQueryParser; import sane.lucene.queryparser.FieldParser; import sane.lucene.queryparser.MultiFieldParser; import sane.lucene.queryparser.StringFieldParser; import sane.lucene.queryparser.NumberFieldParser; import sane.lucene.queryparser.ParseException; import luan.modules.Utils; import luan.Luan; import luan.LuanState; import luan.LuanTable; import luan.LuanFunction; import luan.LuanJavaFunction; import luan.LuanException; import luan.LuanMeta; import luan.LuanRuntimeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public final class LuceneIndex implements Closeable { private static final Logger logger = LoggerFactory.getLogger(LuceneIndex.class); private static final String FLD_NEXT_ID = "nextId"; private static final Analyzer analyzer = new KeywordAnalyzer(); public static final FieldParser STRING_FIELD_PARSER = new StringFieldParser(analyzer); private final ReentrantLock writeLock = new ReentrantLock(); private final File indexDir; final SnapshotDeletionPolicy snapshotDeletionPolicy; private final IndexWriter writer; private DirectoryReader reader; private IndexSearcher searcher; private final ThreadLocal<IndexSearcher> threadLocalSearcher = new ThreadLocal<IndexSearcher>(); private boolean isClosed = false; private final MultiFieldParser mfp = new MultiFieldParser(); public LuceneIndex(LuanState luan,String indexDirStr) throws LuanException, IOException { mfp.fields.put( "type", STRING_FIELD_PARSER ); mfp.fields.put( "id", NumberFieldParser.LONG ); File indexDir = new File(indexDirStr); this.indexDir = indexDir; Directory dir = FSDirectory.open(indexDir); Version version = Version.LUCENE_4_9; IndexWriterConfig conf = new IndexWriterConfig(version,analyzer); snapshotDeletionPolicy = new SnapshotDeletionPolicy(conf.getIndexDeletionPolicy()); conf.setIndexDeletionPolicy(snapshotDeletionPolicy); writer = new IndexWriter(dir,conf); writer.commit(); // commit index creation reader = DirectoryReader.open(dir); luan.onClose(this); searcher = new IndexSearcher(reader); initId(luan); } public void delete_all() throws IOException { boolean commit = !writeLock.isHeldByCurrentThread(); writeLock.lock(); try { writer.deleteAll(); id = idLim = 0; if(commit) writer.commit(); } finally { writeLock.unlock(); } } private static Term term(String key,long value) { BytesRef br = new BytesRef(); NumericUtils.longToPrefixCoded(value,0,br); return new Term(key,br); } public void delete(LuanState luan,String queryStr) throws LuanException, IOException, ParseException { Query query = SaneQueryParser.parseQuery(mfp,queryStr); boolean commit = !writeLock.isHeldByCurrentThread(); writeLock.lock(); try { writer.deleteDocuments(query); if(commit) writer.commit(); } finally { writeLock.unlock(); } } public void save(LuanState luan,LuanTable doc) throws LuanException, IOException { if( doc.get(luan,"type")==null ) throw new LuanException(luan,"missing 'type' field"); Long id = (Long)doc.get(luan,"id"); boolean commit = !writeLock.isHeldByCurrentThread(); writeLock.lock(); try { if( id == null ) { id = nextId(luan); doc.put(luan,"id",id); writer.addDocument(toLucene(luan,doc)); } else { writer.updateDocument( term("id",id), toLucene(luan,doc) ); } if(commit) writer.commit(); } finally { writeLock.unlock(); } } public void update_in_transaction(LuanState luan,LuanFunction fn) throws IOException, LuanException { boolean commit = !writeLock.isHeldByCurrentThread(); writeLock.lock(); try { fn.call(luan); if(commit) writer.commit(); } finally { writeLock.unlock(); } } private long id = 0; private long idLim = 0; private final int idBatch = 10; private void initId(LuanState luan) throws LuanException, IOException { TopDocs td = searcher.search(new TermQuery(new Term("type","next_id")),1); // tmp hack if( td.totalHits == 0 ) { td = searcher.search(new TermQuery(new Term("type index","next_id")),1); if( td.totalHits == 1 ) { long idLim = (Long)searcher.doc(td.scoreDocs[0].doc).getField(FLD_NEXT_ID).numericValue(); LuanTable doc = new LuanTable(); doc.rawPut( "type", "next_id" ); doc.rawPut( FLD_NEXT_ID, idLim ); writer.addDocument(toLucene(luan,doc)); writer.commit(); } } switch(td.totalHits) { case 0: break; // do nothing case 1: idLim = (Long)searcher.doc(td.scoreDocs[0].doc).getField(FLD_NEXT_ID).numericValue(); id = idLim; break; default: throw new RuntimeException(); } } private synchronized long nextId(LuanState luan) throws LuanException, IOException { if( ++id > idLim ) { idLim += idBatch; LuanTable doc = new LuanTable(); doc.rawPut( "type", "next_id" ); doc.rawPut( FLD_NEXT_ID, idLim ); writer.updateDocument(new Term("type","next_id"),toLucene(luan,doc)); } return id; } public void backup(LuanState luan,String zipFile) throws LuanException, IOException { if( !zipFile.endsWith(".zip") ) throw new LuanException(luan,"file "+zipFile+" doesn't end with '.zip'"); IndexCommit ic = snapshotDeletionPolicy.snapshot(); try { ZipOutputStream out = new ZipOutputStream(new FileOutputStream(zipFile)); for( String fileName : ic.getFileNames() ) { out.putNextEntry(new ZipEntry(fileName)); FileInputStream in = new FileInputStream(new File(indexDir,fileName)); Utils.copyAll(in,out); in.close(); out.closeEntry(); } out.close(); } finally { snapshotDeletionPolicy.release(ic); } } public String to_string() { return writer.getDirectory().toString(); } public void close() throws IOException { if( !isClosed ) { writer.close(); reader.close(); isClosed = true; } } protected void finalize() throws Throwable { if( !isClosed ) { logger.error("not closed"); close(); } super.finalize(); } private static class DocFn extends LuanFunction { final IndexSearcher searcher; int docID; DocFn(IndexSearcher searcher) { this.searcher = searcher; } @Override public Object call(LuanState luan,Object[] args) throws LuanException { try { return toTable(luan,searcher.doc(docID)); } catch(IOException e) { throw new LuanException(luan,e); } } } private static abstract class MyCollector extends Collector { int docBase; int i = 0; @Override public void setScorer(Scorer scorer) {} @Override public void setNextReader(AtomicReaderContext context) { this.docBase = context.docBase; } @Override public boolean acceptsDocsOutOfOrder() { return true; } } private synchronized IndexSearcher openSearcher() throws IOException { DirectoryReader newReader = DirectoryReader.openIfChanged(reader); if( newReader != null ) { reader.decRef(); reader = newReader; searcher = new IndexSearcher(reader); } reader.incRef(); return searcher; } // call in finally block private static void close(IndexSearcher searcher) throws IOException { searcher.getIndexReader().decRef(); } public int advanced_search( final LuanState luan, String queryStr, LuanFunction fn, Integer n, String sortStr ) throws LuanException, IOException, ParseException { Utils.checkNotNull(luan,queryStr); Query query = SaneQueryParser.parseQuery(mfp,queryStr); IndexSearcher searcher = threadLocalSearcher.get(); boolean inTransaction = searcher != null; if( !inTransaction ) searcher = openSearcher(); try { if( fn!=null && n==null ) { if( sortStr != null ) throw new LuanException(luan,"sort must be nil when n is nil"); final DocFn docFn = new DocFn(searcher); MyCollector col = new MyCollector() { @Override public void collect(int doc) { try { docFn.docID = docBase + doc; fn.call(luan,new Object[]{++i,docFn}); } catch(LuanException e) { throw new LuanRuntimeException(e); } } }; try { searcher.search(query,col); } catch(LuanRuntimeException e) { throw (LuanException)e.getCause(); } return col.i; } if( fn==null || n==0 ) { TotalHitCountCollector thcc = new TotalHitCountCollector(); searcher.search(query,thcc); return thcc.getTotalHits(); } Sort sort = sortStr==null ? null : SaneQueryParser.parseSort(mfp,sortStr); TopDocs td = sort==null ? searcher.search(query,n) : searcher.search(query,n,sort); final ScoreDoc[] scoreDocs = td.scoreDocs; DocFn docFn = new DocFn(searcher); for( int i=0; i<scoreDocs.length; i++ ) { docFn.docID = scoreDocs[i].doc; fn.call(luan,new Object[]{i+1,docFn}); } return td.totalHits; } finally { if( !inTransaction ) close(searcher); } } public Object search_in_transaction(LuanState luan,LuanFunction fn) throws LuanException, IOException { if( threadLocalSearcher.get() != null ) throw new LuanException(luan,"can't nest search_in_transaction calls"); IndexSearcher searcher = openSearcher(); threadLocalSearcher.set(searcher); try { return fn.call(luan); } finally { threadLocalSearcher.set(null); close(searcher); } } public final LuanMeta indexedFieldsMeta = new LuanMeta() { @Override public boolean canNewindex() { return true; } @Override public Object __index(LuanState luan,LuanTable tbl,Object key) { return mfp.fields.get(key); } @Override public void __new_index(LuanState luan,LuanTable tbl,Object key,Object value) throws LuanException { if( !(key instanceof String) ) throw new LuanException(luan,"key must be string"); String field = (String)key; if( value==null ) { // delete mfp.fields.remove(field); return; } if( !(value instanceof FieldParser) ) throw new LuanException(luan,"value must be FieldParser like the values of Lucene.type"); FieldParser parser = (FieldParser)value; mfp.fields.put( field, parser ); } @Override public final Iterator keys(LuanTable tbl) { return mfp.fields.keySet().iterator(); } @Override protected String type(LuanTable tbl) { return "lucene-indexed-fields"; } }; private Document toLucene(LuanState luan,LuanTable table) throws LuanException { Set<String> indexed = mfp.fields.keySet(); Document doc = new Document(); for( Map.Entry<Object,Object> entry : table.iterable(luan) ) { Object key = entry.getKey(); if( !(key instanceof String) ) throw new LuanException(luan,"key must be string"); String name = (String)key; Object value = entry.getValue(); if( value instanceof String ) { String s = (String)value; if( indexed.contains(name) ) { doc.add(new StringField(name, s, Field.Store.YES)); } else { doc.add(new StoredField(name, s)); } } else if( value instanceof Integer ) { int i = (Integer)value; if( indexed.contains(name) ) { doc.add(new IntField(name, i, Field.Store.YES)); } else { doc.add(new StoredField(name, i)); } } else if( value instanceof Long ) { long i = (Long)value; if( indexed.contains(name) ) { doc.add(new LongField(name, i, Field.Store.YES)); } else { doc.add(new StoredField(name, i)); } } else if( value instanceof Double ) { double i = (Double)value; if( indexed.contains(name) ) { doc.add(new DoubleField(name, i, Field.Store.YES)); } else { doc.add(new StoredField(name, i)); } } else if( value instanceof byte[] ) { byte[] b = (byte[])value; doc.add(new StoredField(name, b)); } else throw new LuanException(luan,"invalid value type "+value.getClass()+"' for '"+name+"'"); } return doc; } private static LuanTable toTable(LuanState luan,Document doc) throws LuanException { if( doc==null ) return null; LuanTable table = new LuanTable(); for( IndexableField ifld : doc ) { String name = ifld.name(); BytesRef br = ifld.binaryValue(); if( br != null ) { table.rawPut(name,br.bytes); continue; } Number n = ifld.numericValue(); if( n != null ) { table.rawPut(name,n); continue; } String s = ifld.stringValue(); if( s != null ) { table.rawPut(name,s); continue; } throw new LuanException(luan,"invalid field type for "+ifld); } return table; } }