view src/luan/modules/lucene/LuceneIndex.java @ 1347:643cf1c37723

move webserver to lib and bug fixes
author Franklin Schmidt <fschmidt@gmail.com>
date Mon, 25 Feb 2019 13:02:33 -0700
parents efd1c6380f2c
children 709f7498a363
line wrap: on
line source

package luan.modules.lucene;

import java.io.Closeable;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
import java.util.HashMap;
import java.util.List;
import java.util.ArrayList;
import java.util.Set;
import java.util.HashSet;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.zip.ZipOutputStream;
import java.util.zip.ZipEntry;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.core.KeywordAnalyzer;
import org.apache.lucene.analysis.en.EnglishAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.StoredField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.document.IntField;
import org.apache.lucene.document.LongField;
import org.apache.lucene.document.DoubleField;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.SnapshotDeletionPolicy;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.AtomicReaderContext;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.util.Version;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.NumericUtils;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.TotalHitCountCollector;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.Explanation;
import org.apache.lucene.search.highlight.Formatter;
import org.apache.lucene.search.highlight.Highlighter;
import org.apache.lucene.search.highlight.InvalidTokenOffsetsException;
import org.apache.lucene.search.highlight.Fragmenter;
import org.apache.lucene.search.highlight.NullFragmenter;
import org.apache.lucene.search.highlight.SimpleSpanFragmenter;
import org.apache.lucene.search.highlight.QueryScorer;
import org.apache.lucene.search.highlight.TokenGroup;
import luan.lib.queryparser.SaneQueryParser;
import luan.lib.queryparser.FieldParser;
import luan.lib.queryparser.MultiFieldParser;
import luan.lib.queryparser.StringFieldParser;
import luan.lib.queryparser.NumberFieldParser;
import luan.lib.parser.ParseException;
import luan.modules.Utils;
import luan.Luan;
import luan.LuanTable;
import luan.LuanFunction;
import luan.LuanException;
import luan.LuanRuntimeException;
import luan.lib.logging.Logger;
import luan.lib.logging.LoggerFactory;


public final class LuceneIndex {
	private static final Logger logger = LoggerFactory.getLogger(LuceneIndex.class);

	private static final class Closer implements Closeable {
		final LuceneIndex li;
		boolean isClosed = false;
		private final Exception created = new Exception("created");

		Closer(Luan luan,LuceneIndex li) {
			this.li = li;
			luan.onClose(this);
		}

		public void close() throws IOException {
			if( !isClosed ) {
				li.close();
				isClosed = true;
			}
		}

		protected void finalize() throws Throwable {
			if( !isClosed ) {
				logger.error("not closed",created);
				close();
			}
			super.finalize();
		}
	}

	private static Map<String,LuceneIndex> indexes = new HashMap<String,LuceneIndex>();

	public static Object[] getLuceneIndex(Luan luan,String indexDirStr,FieldParser defaultFieldParser,String[] defaultFields)
		throws LuanException, IOException
	{
		String key = new File(indexDirStr).getCanonicalPath();
		synchronized(indexes) {
			LuceneIndex li = indexes.get(key);
			if( li == null ) {
				li = new LuceneIndex(indexDirStr,defaultFieldParser,defaultFields,key);
				li.openCount = 1;
				indexes.put(key,li);
			} else {
				if( defaultFieldParser != li.defaultFieldParser )
					throw new LuanException("default_type doesn't match previous use");
				if( !Arrays.equals(defaultFields,li.defaultFields) )
					throw new LuanException("default_fields don't match previous use");
				li.openCount++;
			}
			return new Object[]{li,new Closer(luan,li)};
		}
	}

	private static final Version version = Version.LUCENE_4_9;
	private static final String FLD_NEXT_ID = "nextId";
	public static final StringFieldParser STRING_FIELD_PARSER = new StringFieldParser(new KeywordAnalyzer());
	public static final StringFieldParser ENGLISH_FIELD_PARSER = new StringFieldParser(new EnglishAnalyzer(version));

	private final ReentrantLock writeLock = new ReentrantLock();
	private final File indexDir;
	private SnapshotDeletionPolicy snapshotDeletionPolicy;
	private IndexWriter writer;
	private DirectoryReader reader;
	private IndexSearcher searcher;
	private final ThreadLocal<IndexSearcher> threadLocalSearcher = new ThreadLocal<IndexSearcher>();
	private final MultiFieldParser mfp;
	private final Analyzer analyzer;

	private File fileDir;
	private int writeCount;
	private AtomicInteger writeCounter = new AtomicInteger();

	private Set<String> indexOnly = new HashSet<String>();

	private int openCount;
	private final String key;
	private final FieldParser defaultFieldParser;
	private final String[] defaultFields;

	private LuceneIndex(String indexDirStr,FieldParser defaultFieldParser,String[] defaultFields,String key)
		throws LuanException, IOException
	{
		this.key = key;
		this.defaultFieldParser = defaultFieldParser;
		this.defaultFields = defaultFields;
		mfp = defaultFieldParser==null ? new MultiFieldParser() : new MultiFieldParser(defaultFieldParser,defaultFields);
		mfp.fields.put( "type", STRING_FIELD_PARSER );
		mfp.fields.put( "id", NumberFieldParser.LONG );
		File indexDir = new File(indexDirStr);
		this.indexDir = indexDir;
		Analyzer analyzer = STRING_FIELD_PARSER.analyzer;
		if( defaultFieldParser instanceof StringFieldParser ) {
			StringFieldParser sfp = (StringFieldParser)defaultFieldParser;
			analyzer = sfp.analyzer;
		}
		this.analyzer = analyzer;
		reopen();
	}

	public void reopen() throws LuanException, IOException {
		IndexWriterConfig conf = new IndexWriterConfig(version,analyzer);
		snapshotDeletionPolicy = new SnapshotDeletionPolicy(conf.getIndexDeletionPolicy());
		conf.setIndexDeletionPolicy(snapshotDeletionPolicy);
		FSDirectory dir = FSDirectory.open(indexDir);
		fileDir = dir.getDirectory();
		writer = new IndexWriter(dir,conf);
		writer.commit();  // commit index creation
		reader = DirectoryReader.open(dir);
		searcher = new IndexSearcher(reader);
		initId();
	}

	private void wrote() {
		writeCounter.incrementAndGet();
	}

	public void delete_all() throws IOException {
		boolean commit = !writeLock.isHeldByCurrentThread();
		writeLock.lock();
		try {
			writer.deleteAll();
			id = idLim = 0;
			if(commit) writer.commit();
		} finally {
			wrote();
			writeLock.unlock();
		}
	}

	private static Term term(String key,long value) {
		BytesRef br = new BytesRef();
		NumericUtils.longToPrefixCoded(value,0,br);
		return new Term(key,br);
	}

	public void delete(String queryStr)
		throws IOException, ParseException
	{
		Query query = SaneQueryParser.parseQuery(mfp,queryStr);

		boolean commit = !writeLock.isHeldByCurrentThread();
		writeLock.lock();
		try {
			writer.deleteDocuments(query);
			if(commit) writer.commit();
		} finally {
			wrote();
			writeLock.unlock();
		}
	}

	public void indexed_only_fields(List<String> fields) {
		indexOnly.addAll(fields);
	}

	public void save(LuanTable doc,LuanTable boosts)
		throws LuanException, IOException
	{
		Object obj = doc.get("id");
		Long id;
		try {
			id = (Long)obj;
		} catch(ClassCastException e) {
			throw new LuanException("id should be Long but is "+obj.getClass().getSimpleName());
		}

		boolean commit = !writeLock.isHeldByCurrentThread();
		writeLock.lock();
		try {
			if( id == null ) {
				id = nextId();
				doc.put("id",id);
				writer.addDocument(toLucene(doc,boosts));
			} else {
				writer.updateDocument( term("id",id), toLucene(doc,boosts) );
			}
			if(commit) writer.commit();
		} finally {
			wrote();
			writeLock.unlock();
		}
	}

	public void update_in_transaction(LuanFunction fn) throws IOException, LuanException {
		boolean commit = !writeLock.isHeldByCurrentThread();
		writeLock.lock();
		try {
			fn.call();
			if(commit) writer.commit();
		} finally {
			wrote();
			writeLock.unlock();
		}
	}

	public void run_in_lock(LuanFunction fn) throws IOException, LuanException {
		if( writeLock.isHeldByCurrentThread() )
			throw new RuntimeException();
		writeLock.lock();
		try {
			synchronized(this) {
				fn.call();
			}
		} finally {
			wrote();
			writeLock.unlock();
		}
	}


	private long id;
	private long idLim;
	private final int idBatch = 10;

	private void initId() throws LuanException, IOException {
		TopDocs td = searcher.search(new TermQuery(new Term("type","next_id")),1);
		switch(td.totalHits) {
		case 0:
			id = 0;
			idLim = 0;
			break;
		case 1:
			idLim = (Long)searcher.doc(td.scoreDocs[0].doc).getField(FLD_NEXT_ID).numericValue();
			id = idLim;
			break;
		default:
			throw new RuntimeException();
		}
	}

	public synchronized long nextId() throws LuanException, IOException {
		if( ++id > idLim ) {
			idLim += idBatch;
			Map doc = new HashMap();
			doc.put( "type", "next_id" );
			doc.put( FLD_NEXT_ID, idLim );
			writer.updateDocument(new Term("type","next_id"),toLucene(doc.entrySet(),null));
			wrote();
		}
		return id;
	}

/*
	public void backup(String zipFile) throws LuanException, IOException {
		if( !zipFile.endsWith(".zip") )
			throw new LuanException("file "+zipFile+" doesn't end with '.zip'");
		IndexCommit ic = snapshotDeletionPolicy.snapshot();
		try {
			ZipOutputStream out = new ZipOutputStream(new FileOutputStream(zipFile));
			for( String fileName : ic.getFileNames() ) {
				out.putNextEntry(new ZipEntry(fileName));
				FileInputStream in = new FileInputStream(new File(indexDir,fileName));
				Utils.copyAll(in,out);
				in.close();
				out.closeEntry();
			}
			out.close();
		} finally {
			snapshotDeletionPolicy.release(ic);
		}
	}
*/
	public SnapshotDeletionPolicy snapshotDeletionPolicy() {
		return snapshotDeletionPolicy;
	}

	public Object snapshot(LuanFunction fn) throws LuanException, IOException {
		IndexCommit ic = snapshotDeletionPolicy.snapshot();
		try {
			String dir = fileDir.toString();
			LuanTable fileNames = new LuanTable(fn.luan(),new ArrayList(ic.getFileNames()));
			return fn.call(dir,fileNames);
		} finally {
			snapshotDeletionPolicy.release(ic);
		}
	}



	public String to_string() {
		return writer.getDirectory().toString();
	}

	private synchronized void close() throws IOException {
		if( openCount > 0 ) {
			if( --openCount == 0 ) {
				doClose();
				synchronized(indexes) {
					indexes.remove(key);
				}
			}
		}
	}

	public void doClose() throws IOException {
		writer.close();
		reader.close();
	}


	private static class DocFn extends LuanFunction {
		final IndexSearcher searcher;
		final Query query;
		int docID;

		DocFn(Luan luan,IndexSearcher searcher,Query query) {
			super(luan);
			this.searcher = searcher;
			this.query = query;
		}

		@Override public Object call(Object[] args) throws LuanException {
			try {
				LuanTable doc = toTable(luan(),searcher.doc(docID));
				if( args.length > 0 && "explain".equals(args[0]) ) {
					Explanation explanation = searcher.explain(query,docID);
					return new Object[]{doc,explanation};
				} else {
					return doc;
				}
			} catch(IOException e) {
				throw new LuanException(e);
			}
		}
	}

	private static abstract class MyCollector extends Collector {
		int docBase;
		int i = 0;

		@Override public void setScorer(Scorer scorer) {}
		@Override public void setNextReader(AtomicReaderContext context) {
			this.docBase = context.docBase;
		}
		@Override public boolean acceptsDocsOutOfOrder() {
			return true;
		}
	}

	private synchronized IndexSearcher openSearcher() throws IOException {
		int gwc = writeCounter.get();
		if( writeCount != gwc ) {
			writeCount = gwc;
			DirectoryReader newReader = DirectoryReader.openIfChanged(reader);
			if( newReader != null ) {
				reader.decRef();
				reader = newReader;
				searcher = new IndexSearcher(reader);
			}
		}
		reader.incRef();
		return searcher;
	}

	// call in finally block
	private static void close(IndexSearcher searcher) throws IOException {
		searcher.getIndexReader().decRef();
	}

	public void ensure_open() throws IOException {
		close(openSearcher());
	}

	public int advanced_search( String queryStr, LuanFunction fn, Integer n, String sortStr )
		throws LuanException, IOException, ParseException
	{
		Utils.checkNotNull(queryStr);
		Query query = SaneQueryParser.parseQuery(mfp,queryStr);
		IndexSearcher searcher = threadLocalSearcher.get();
		boolean inTransaction = searcher != null;
		if( !inTransaction )
			searcher = openSearcher();
		try {
			if( fn!=null && n==null ) {
				if( sortStr != null )
					throw new LuanException("sort must be nil when n is nil");
				final DocFn docFn = new DocFn(fn.luan(),searcher,query);
				MyCollector col = new MyCollector() {
					@Override public void collect(int doc) {
						try {
							docFn.docID = docBase + doc;
							fn.call(++i,docFn);
						} catch(LuanException e) {
							throw new LuanRuntimeException(e);
						}
					}
				};
				try {
					searcher.search(query,col);
				} catch(LuanRuntimeException e) {
					throw (LuanException)e.getCause();
				}
				return col.i;
			}
			if( fn==null || n==0 ) {
				TotalHitCountCollector thcc = new TotalHitCountCollector();
				searcher.search(query,thcc);
				return thcc.getTotalHits();
			}
			Sort sort = sortStr==null ? null : SaneQueryParser.parseSort(mfp,sortStr);
			TopDocs td = sort==null ? searcher.search(query,n) : searcher.search(query,n,sort);
			final ScoreDoc[] scoreDocs = td.scoreDocs;
			DocFn docFn = new DocFn(fn.luan(),searcher,query);
			for( int i=0; i<scoreDocs.length; i++ ) {
				ScoreDoc scoreDoc = scoreDocs[i];
				docFn.docID = scoreDoc.doc;
				fn.call(i+1,docFn,scoreDoc.score);
			}
			return td.totalHits;
		} finally {
			if( !inTransaction )
				close(searcher);
		}
	}

	public Object search_in_transaction(LuanFunction fn) throws LuanException, IOException {
		if( threadLocalSearcher.get() != null )
			throw new LuanException("can't nest search_in_transaction calls");
		IndexSearcher searcher = openSearcher();
		threadLocalSearcher.set(searcher);
		try {
			return fn.call();
		} finally {
			threadLocalSearcher.set(null);
			close(searcher);
		}
	}


	public FieldParser getIndexedFieldParser(String field) {
		return mfp.fields.get(field);
	}

	public void setIndexedFieldParser(String field,FieldParser fp) {
		if( fp==null ) {  // delete
			mfp.fields.remove(field);
			return;
		}
		mfp.fields.put( field, fp );
	}


	private IndexableField newField(String name,Object value,Set<String> indexed,Float boost)
		throws LuanException
	{
		boolean hasBoost = boost!=null;
		IndexableField fld = newField2(name,value,indexed,hasBoost);
		if( hasBoost )
			((Field)fld).setBoost(boost);
		return fld;
	}

	private IndexableField newField2(String name,Object value,Set<String> indexed,boolean hasBoost)
		throws LuanException
	{
		Field.Store store = indexOnly.contains(name) ? Field.Store.NO : Field.Store.YES;
		if( value instanceof String ) {
			String s = (String)value;
			FieldParser fp = mfp.fields.get(name);
			if( fp != null ) {
				if( fp instanceof StringFieldParser && fp != STRING_FIELD_PARSER ) {
					return new TextField(name, s, store);
				} else if (hasBoost) {
					// fuck you modern lucene developers
					return new Field(name, s, store, Field.Index.NOT_ANALYZED);
				} else {
					return new StringField(name, s, store);
				}
			} else {
				return new StoredField(name, s);
			}
		} else if( value instanceof Integer ) {
			int i = (Integer)value;
			if( indexed.contains(name) ) {
				return new IntField(name, i, store);
			} else {
				return new StoredField(name, i);
			}
		} else if( value instanceof Long ) {
			long i = (Long)value;
			if( indexed.contains(name) ) {
				return new LongField(name, i, store);
			} else {
				return new StoredField(name, i);
			}
		} else if( value instanceof Double ) {
			double i = (Double)value;
			if( indexed.contains(name) ) {
				return new DoubleField(name, i, store);
			} else {
				return new StoredField(name, i);
			}
		} else if( value instanceof byte[] ) {
			byte[] b = (byte[])value;
			return new StoredField(name, b);
		} else
			throw new LuanException("invalid value type "+value.getClass()+"' for '"+name+"'");
	}

	private Document toLucene(LuanTable table,LuanTable boosts) throws LuanException {
		return toLucene(table.iterable(),boosts);
	}

	private Document toLucene(Iterable<Map.Entry> iterable,LuanTable boosts) throws LuanException {
		Set<String> indexed = mfp.fields.keySet();
		Document doc = new Document();
		for( Map.Entry<Object,Object> entry : iterable ) {
			Object key = entry.getKey();
			if( !(key instanceof String) )
				throw new LuanException("key must be string");
			String name = (String)key;
			Object value = entry.getValue();
			Float boost = null;
			if( boosts != null ) {
				Object obj = boosts.get(name);
				if( obj != null ) {
					if( !(obj instanceof Number) )
						throw new LuanException("boost '"+name+"' must be number");
					boost = ((Number)obj).floatValue();
				}
			}
			if( !(value instanceof LuanTable) ) {
				doc.add(newField( name, value, indexed, boost ));
			} else { // list
				LuanTable list = (LuanTable)value;
				for( Object el : list.asList() ) {
					doc.add(newField( name, el, indexed, boost ));
				}
			}
		}
		return doc;
	}

	private static Object getValue(IndexableField ifld) throws LuanException {
		BytesRef br = ifld.binaryValue();
		if( br != null )
			return br.bytes;
		Number n = ifld.numericValue();
		if( n != null )
			return n;
		String s = ifld.stringValue();
		if( s != null )
			return s;
		throw new LuanException("invalid field type for "+ifld);
	}

	private static LuanTable toTable(Luan luan,Document doc) throws LuanException {
		if( doc==null )
			return null;
		LuanTable table = new LuanTable(luan);
		for( IndexableField ifld : doc ) {
			String name = ifld.name();
			Object value = getValue(ifld);
			Object old = table.rawGet(name);
			if( old == null ) {
				table.rawPut(name,value);
			} else {
				LuanTable list;
				if( old instanceof LuanTable ) {
					list = (LuanTable)old;
				} else {
					list = new LuanTable(luan);
					list.rawPut(1,old);
					table.rawPut(name,list);
				}
				list.rawPut(list.rawLength()+1,value);
			}
		}
		return table;
	}


	private static final Formatter nullFormatter = new Formatter() {
		public String highlightTerm(String originalText,TokenGroup tokenGroup) {
			return originalText;
		}
	};

	public LuanFunction highlighter(String queryStr,final LuanFunction formatter,final Integer fragmentSize,String dotdotdot)
		throws ParseException
	{
		Query query = SaneQueryParser.parseQuery(mfp,queryStr);
		Formatter fmt = new Formatter() {
			public String highlightTerm(String originalText,TokenGroup tokenGroup) {
			if( tokenGroup.getTotalScore() <= 0 )
					return originalText;
				try {
					return (String)Luan.first(formatter.call(originalText));
				} catch(LuanException e) {
					throw new LuanRuntimeException(e);
				}
			}
		};
		QueryScorer queryScorer = new QueryScorer(query);
		final Highlighter chooser = fragmentSize==null ? null : new Highlighter(nullFormatter,queryScorer);
		if( chooser != null )
			chooser.setTextFragmenter( new SimpleSpanFragmenter(queryScorer,fragmentSize) );
		final Highlighter hl = new Highlighter(fmt,queryScorer);
		hl.setTextFragmenter( new NullFragmenter() );
		return new LuanFunction(false) {  // ???
			@Override public String call(Object[] args) throws LuanException {
				String text = (String)args[0];
				try {
					if( chooser != null ) {
						String s = chooser.getBestFragment(analyzer,null,text);
						if( s != null ) {
							if( dotdotdot != null ) {
								boolean atStart = text.startsWith(s);
								boolean atEnd = text.endsWith(s);
								if( !atStart )
									s = dotdotdot + s;
								if( !atEnd )
									s = s + dotdotdot;
							}
							text = s;
						} else if( text.length() > fragmentSize ) {
							text = text.substring(0,fragmentSize);
							if( dotdotdot != null )
								text += "...";
						}
					}
					String s = hl.getBestFragment(analyzer,null,text);
					return s!=null ? s : text;
				} catch(LuanRuntimeException e) {
					throw (LuanException)e.getCause();
				} catch(IOException e) {
					throw new RuntimeException(e);
				} catch(InvalidTokenOffsetsException e) {
					throw new RuntimeException(e);
				}
			}
		};
	}

	public int count_tokens(String text)
		throws IOException
	{
		int n = 0;
		TokenStream ts = analyzer.tokenStream(null,text);
		ts.reset();
		while( ts.incrementToken() ) {
			n++;
		}
		ts.close();
		return n;
	}

}