view src/nabble/model/lucene/IndexCache.java @ 0:7ecd1a4ef557

add content
author Franklin Schmidt <fschmidt@gmail.com>
date Thu, 21 Mar 2019 19:15:52 -0600
parents
children
line wrap: on
line source

package nabble.model.lucene;

import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.HashMap;
import java.util.Set;
import java.util.Collections;
import java.util.List;
import java.util.ArrayList;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.NoSuchDirectoryException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import fschmidt.db.util.Unique;
import fschmidt.util.java.Computable;
import fschmidt.util.java.Memoizer;
import fschmidt.util.java.IoUtils;
import nabble.model.Init;
import nabble.model.Executors;


public final class IndexCache<K> {
	private static final Logger logger = LoggerFactory.getLogger(IndexCache.class);

	public interface Builder<K> {
		public void build(K key) throws Exception;
		public boolean exists(String keyString);
	}

	private final Unique<K> unique = new Unique<K>();
	private final File dir;
	private final Analyzer analyzer;
	private final int version;
	private final Builder<K> builder;
	private volatile boolean isShutdown = false;

	public IndexCache(File dir, Analyzer analyzer, int version, Builder<K> builder) {
		this.dir = dir;
		this.analyzer = analyzer;
		this.version = version;
		this.builder = builder;
		Executors.executeSometime(new Runnable(){
			public void run() {
				deleteUnusedIndexes();
			}
		});
	}

	private final Memoizer<K,FSDirectory> dirs = new Memoizer<K,FSDirectory>(new Computable<K,FSDirectory>() {
		public FSDirectory get(K key) {
			File dirFile = new File(dir,key.toString());
			dirFile.mkdirs();
			try {
				FSDirectory dir = FSDirectory.open(dirFile);
				if( IndexWriter.isLocked(dir) ) {
					logger.error("Lucene index "+dir+" was locked");
					IndexWriter.unlock(dir);
				}
				check(key,dir);
				return dir;
			} catch(IOException e) {
				logger.error(e.toString());
				System.exit(-1);
				throw new RuntimeException();  // never
			} catch(RuntimeException e) {
				logger.error(e.toString());
				System.exit(-1);
				throw new RuntimeException();  // never
			}
		}
	});


	private final Map<K,MyIndexSearcher> searcherCache = new HashMap<K,MyIndexSearcher>();

	private class MyIndexSearcher extends IndexSearcher implements LuceneSearcher {
		private final K key;
		private int opens = 0;
		private int closes = 0;
		private boolean isRemoved = false;

		MyIndexSearcher(K key,IndexReader reader) throws IOException {
			super(reader);
			this.key = key;
		}

		@Override public void close() throws IOException {
			K k = unique.get(key);
			try {
				synchronized(k) {
					if( ++closes == opens ) {
						final int oldOpens = opens;
						Executors.schedule(new Runnable() {
							public void run() {
								K k = unique.get(key);
								try {
									synchronized(k) {
										if( oldOpens == opens ) {
											if( !isRemoved )
												searcherCache.remove(key);
											try {
												MyIndexSearcher.super.close();
												getIndexReader().close();
											} catch(IOException e) {
												logger.error("",e);
											}
										}
									}
								} finally {
									unique.free(k);
								}
							}
						}, 5, TimeUnit.SECONDS );
					}
				}
			} finally {
				unique.free(k);
			}
		}
	}

	public LuceneSearcher openSearcher(K key) throws IOException {
		K k = unique.get(key);
		try {
			synchronized(k) {
				MyIndexSearcher searcher = searcherCache.get(key);
				if( searcher == null ) {
					searcher = new MyIndexSearcher( key, IndexReader.open(dirs.get(key)) );
					searcherCache.put(key,searcher);
				} else {
					IndexReader indexReader = searcher.getIndexReader();
					IndexReader newReader = indexReader.reopen();
					if( newReader != indexReader ) {
						searcherCache.remove(key);
						searcher.isRemoved = true;
						searcher = new MyIndexSearcher( key, newReader );
						searcherCache.put(key,searcher);
					}
				}
				searcher.opens++;
				return new LuceneSearcherImpl(searcher);
			}
		} finally {
			unique.free(k);
		}
	}


	private final Map<K,MyIndexWriter> writerCache = new HashMap<K,MyIndexWriter>();

	private class MyIndexWriter extends IndexWriter {
		private final K key;
		private int opens = 0;
		private int closes = 0;
		private boolean willCommit = false;

		MyIndexWriter(K key) throws IOException {
			super(dirs.get(key),analyzer,IndexWriter.MaxFieldLength.LIMITED);
			this.key = key;
		}

		@Override public void close() throws IOException {
			K k = unique.get(key);
			try {
				synchronized(k) {
					if( ++closes == opens ) {
						final int oldOpens = opens;
						Executors.schedule(new Runnable() {
							public void run() {
								K k = unique.get(key);
								try {
									synchronized(k) {
										if( oldOpens == opens ) {
											writerCache.remove(key);
											try {
												MyIndexWriter.super.close();
											} catch(NoSuchDirectoryException e) {
												logger.info("",e);  // site could be deleted
											} catch(IOException e) {
												logger.error("",e);
											}
											willCommit = false;
										}
									}
								} finally {
									unique.free(k);
								}
							}
						}, 5, TimeUnit.SECONDS );
					}
					if( !willCommit ) {
						willCommit = true;
						Executors.schedule(new Runnable() {
							public void run() {
								K k = unique.get(key);
								try {
									synchronized(k) {
										if( willCommit ) {
											try {
												commit();
											} catch(IOException e) {
												logger.error("",e);
											}
											willCommit = false;
										}
									}
								} finally {
									unique.free(k);
								}
							}
						}, 5, TimeUnit.SECONDS );
					}
				}
			} finally {
				unique.free(k);
			}
		}
	}

	public IndexWriter openIndexWriter(K key) throws IOException {
		if( isShutdown )
			throw new RuntimeException("shutdown");
		key = unique.get(key);
		try {
			synchronized(key) {
				MyIndexWriter indexWriter = writerCache.get(key);
				if( indexWriter == null ) {
					indexWriter = new MyIndexWriter(key);
					writerCache.put(key,indexWriter);
				}
				indexWriter.opens++;
				return indexWriter;
			}
		} finally {
			unique.free(key);
		}
	}


	private final Set<K> buildSet = Collections.newSetFromMap(new ConcurrentHashMap<K,Boolean>());

	private void check(K key,FSDirectory dir) throws IOException {
		if( !Init.hasDaemons )
			return;
		File checkFile = checkFile(dir);
		int currentVersion = 0;
		try {
			currentVersion = Integer.parseInt(IoUtils.read(checkFile).trim());
		} catch (Exception e) {}
		if (currentVersion > version) {
			throw new RuntimeException("Lucene index version for "+key+" is "+version
					+", found version "+currentVersion+" in "+dir.getFile());
		}
		if( currentVersion == version )
			return;  // ok

		if( !buildSet.add(key) )
			throw new RuntimeException("already building "+key);
		build(key,dir);
	}

	private File checkFile(FSDirectory dir) {
		return new File(dir.getFile(), "version");
	}

	private void build(K k,FSDirectory dir) throws IOException {
		final File checkFile = checkFile(dir);
		if( checkFile.exists() && !checkFile.delete() )
			logger.error("couldn't delete "+checkFile+" for build");
		final K key = unique.get(k);
		try {
			synchronized(key) {
				IndexWriter indexWriter = writerCache.remove(key);
				if( indexWriter != null )
					indexWriter.close();
				new IndexWriter(dir,analyzer,true,IndexWriter.MaxFieldLength.LIMITED).close(); // clear dir
			}
		} finally {
			unique.free(key);
		}
		Thread thread = new Thread(new Runnable(){public void run(){
			try {
				logger.info("starting lucene index "+key);
				builder.build(key);
				IoUtils.write(checkFile,Integer.toString(version));
				buildSet.remove(key);
				logger.info("finished lucene index "+key);
			} catch(Exception e) {
				logger.error("lucene build failed for "+key,e);
				buildSet.remove(key);
				dirs.remove(key);
			}
		}},"lucene build "+key);
		thread.setDaemon(true);
		thread.start();
	}

	public boolean isReady(K key) {
		dirs.get(key);
		return !buildSet.contains(key);
	}

	public void rebuild(K key) throws IOException {
		build( key, dirs.get(key) );
	}

	public void shutdown() {
		isShutdown = true;
		try {
			while( !writerCache.isEmpty() ) {
				List<K> keys = new ArrayList<K>(writerCache.keySet());
				for( K key : keys ) {
					IndexWriter indexWriter = writerCache.remove(key);
					if( indexWriter != null )
						indexWriter.close();
				}
			}
		} catch(IOException e) {
			logger.error("",e);
		}
	}
/*
	public void delete(K key) throws IOException {
		key = unique.get(key);
		try {
			synchronized(key) {
				MyIndexSearcher searcher = searcherCache.remove(key);
				if( searcher != null )
					searcher.getIndexReader().close();
				IndexWriter indexWriter = writerCache.remove(key);
				if( indexWriter == null )
					indexWriter = new IndexWriter(dirs.get(key),analyzer,IndexWriter.MaxFieldLength.LIMITED);
				indexWriter.deleteAll();
				dirs.remove(key);
				File dirFile = new File(dir,key.toString());
				File versionFile = new File(dirFile, "version");
				if( !versionFile.delete() )
					logger.error("couldn't delete "+versionFile+" for site delete");
				if( !dirFile.delete() )
					logger.error("couldn't delete "+dirFile);
			}
		} finally {
			unique.free(key);
		}
	}
*/
	void deleteUnusedIndexes() {
		File[] files = dir.listFiles();
		if( files == null )
			return;  // not made yet
		for( File indexDir : files ) {
			if( Executors.isShuttingDown() )
				return;
			if( !builder.exists( indexDir.getName() ) ) {
				if( !IoUtils.delete(indexDir) )
					logger.error("couldn't delete "+indexDir);
			}
		}
	}
}