Mercurial Hosting > nabble
diff src/nabble/model/lucene/IndexCache.java @ 0:7ecd1a4ef557
add content
author | Franklin Schmidt <fschmidt@gmail.com> |
---|---|
date | Thu, 21 Mar 2019 19:15:52 -0600 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/nabble/model/lucene/IndexCache.java Thu Mar 21 19:15:52 2019 -0600 @@ -0,0 +1,367 @@ +package nabble.model.lucene; + +import java.io.File; +import java.io.IOException; +import java.util.Map; +import java.util.HashMap; +import java.util.Set; +import java.util.Collections; +import java.util.List; +import java.util.ArrayList; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.store.FSDirectory; +import org.apache.lucene.store.NoSuchDirectoryException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import fschmidt.db.util.Unique; +import fschmidt.util.java.Computable; +import fschmidt.util.java.Memoizer; +import fschmidt.util.java.IoUtils; +import nabble.model.Init; +import nabble.model.Executors; + + +public final class IndexCache<K> { + private static final Logger logger = LoggerFactory.getLogger(IndexCache.class); + + public interface Builder<K> { + public void build(K key) throws Exception; + public boolean exists(String keyString); + } + + private final Unique<K> unique = new Unique<K>(); + private final File dir; + private final Analyzer analyzer; + private final int version; + private final Builder<K> builder; + private volatile boolean isShutdown = false; + + public IndexCache(File dir, Analyzer analyzer, int version, Builder<K> builder) { + this.dir = dir; + this.analyzer = analyzer; + this.version = version; + this.builder = builder; + Executors.executeSometime(new Runnable(){ + public void run() { + deleteUnusedIndexes(); + } + }); + } + + private final Memoizer<K,FSDirectory> dirs = new Memoizer<K,FSDirectory>(new Computable<K,FSDirectory>() { + public FSDirectory get(K key) { + File dirFile = new File(dir,key.toString()); + dirFile.mkdirs(); + try { + FSDirectory dir = FSDirectory.open(dirFile); + if( IndexWriter.isLocked(dir) ) { + logger.error("Lucene index "+dir+" was locked"); + IndexWriter.unlock(dir); + } + check(key,dir); + return dir; + } catch(IOException e) { + logger.error(e.toString()); + System.exit(-1); + throw new RuntimeException(); // never + } catch(RuntimeException e) { + logger.error(e.toString()); + System.exit(-1); + throw new RuntimeException(); // never + } + } + }); + + + private final Map<K,MyIndexSearcher> searcherCache = new HashMap<K,MyIndexSearcher>(); + + private class MyIndexSearcher extends IndexSearcher implements LuceneSearcher { + private final K key; + private int opens = 0; + private int closes = 0; + private boolean isRemoved = false; + + MyIndexSearcher(K key,IndexReader reader) throws IOException { + super(reader); + this.key = key; + } + + @Override public void close() throws IOException { + K k = unique.get(key); + try { + synchronized(k) { + if( ++closes == opens ) { + final int oldOpens = opens; + Executors.schedule(new Runnable() { + public void run() { + K k = unique.get(key); + try { + synchronized(k) { + if( oldOpens == opens ) { + if( !isRemoved ) + searcherCache.remove(key); + try { + MyIndexSearcher.super.close(); + getIndexReader().close(); + } catch(IOException e) { + logger.error("",e); + } + } + } + } finally { + unique.free(k); + } + } + }, 5, TimeUnit.SECONDS ); + } + } + } finally { + unique.free(k); + } + } + } + + public LuceneSearcher openSearcher(K key) throws IOException { + K k = unique.get(key); + try { + synchronized(k) { + MyIndexSearcher searcher = searcherCache.get(key); + if( searcher == null ) { + searcher = new MyIndexSearcher( key, IndexReader.open(dirs.get(key)) ); + searcherCache.put(key,searcher); + } else { + IndexReader indexReader = searcher.getIndexReader(); + IndexReader newReader = indexReader.reopen(); + if( newReader != indexReader ) { + searcherCache.remove(key); + searcher.isRemoved = true; + searcher = new MyIndexSearcher( key, newReader ); + searcherCache.put(key,searcher); + } + } + searcher.opens++; + return new LuceneSearcherImpl(searcher); + } + } finally { + unique.free(k); + } + } + + + private final Map<K,MyIndexWriter> writerCache = new HashMap<K,MyIndexWriter>(); + + private class MyIndexWriter extends IndexWriter { + private final K key; + private int opens = 0; + private int closes = 0; + private boolean willCommit = false; + + MyIndexWriter(K key) throws IOException { + super(dirs.get(key),analyzer,IndexWriter.MaxFieldLength.LIMITED); + this.key = key; + } + + @Override public void close() throws IOException { + K k = unique.get(key); + try { + synchronized(k) { + if( ++closes == opens ) { + final int oldOpens = opens; + Executors.schedule(new Runnable() { + public void run() { + K k = unique.get(key); + try { + synchronized(k) { + if( oldOpens == opens ) { + writerCache.remove(key); + try { + MyIndexWriter.super.close(); + } catch(NoSuchDirectoryException e) { + logger.info("",e); // site could be deleted + } catch(IOException e) { + logger.error("",e); + } + willCommit = false; + } + } + } finally { + unique.free(k); + } + } + }, 5, TimeUnit.SECONDS ); + } + if( !willCommit ) { + willCommit = true; + Executors.schedule(new Runnable() { + public void run() { + K k = unique.get(key); + try { + synchronized(k) { + if( willCommit ) { + try { + commit(); + } catch(IOException e) { + logger.error("",e); + } + willCommit = false; + } + } + } finally { + unique.free(k); + } + } + }, 5, TimeUnit.SECONDS ); + } + } + } finally { + unique.free(k); + } + } + } + + public IndexWriter openIndexWriter(K key) throws IOException { + if( isShutdown ) + throw new RuntimeException("shutdown"); + key = unique.get(key); + try { + synchronized(key) { + MyIndexWriter indexWriter = writerCache.get(key); + if( indexWriter == null ) { + indexWriter = new MyIndexWriter(key); + writerCache.put(key,indexWriter); + } + indexWriter.opens++; + return indexWriter; + } + } finally { + unique.free(key); + } + } + + + private final Set<K> buildSet = Collections.newSetFromMap(new ConcurrentHashMap<K,Boolean>()); + + private void check(K key,FSDirectory dir) throws IOException { + if( !Init.hasDaemons ) + return; + File checkFile = checkFile(dir); + int currentVersion = 0; + try { + currentVersion = Integer.parseInt(IoUtils.read(checkFile).trim()); + } catch (Exception e) {} + if (currentVersion > version) { + throw new RuntimeException("Lucene index version for "+key+" is "+version + +", found version "+currentVersion+" in "+dir.getFile()); + } + if( currentVersion == version ) + return; // ok + + if( !buildSet.add(key) ) + throw new RuntimeException("already building "+key); + build(key,dir); + } + + private File checkFile(FSDirectory dir) { + return new File(dir.getFile(), "version"); + } + + private void build(K k,FSDirectory dir) throws IOException { + final File checkFile = checkFile(dir); + if( checkFile.exists() && !checkFile.delete() ) + logger.error("couldn't delete "+checkFile+" for build"); + final K key = unique.get(k); + try { + synchronized(key) { + IndexWriter indexWriter = writerCache.remove(key); + if( indexWriter != null ) + indexWriter.close(); + new IndexWriter(dir,analyzer,true,IndexWriter.MaxFieldLength.LIMITED).close(); // clear dir + } + } finally { + unique.free(key); + } + Thread thread = new Thread(new Runnable(){public void run(){ + try { + logger.info("starting lucene index "+key); + builder.build(key); + IoUtils.write(checkFile,Integer.toString(version)); + buildSet.remove(key); + logger.info("finished lucene index "+key); + } catch(Exception e) { + logger.error("lucene build failed for "+key,e); + buildSet.remove(key); + dirs.remove(key); + } + }},"lucene build "+key); + thread.setDaemon(true); + thread.start(); + } + + public boolean isReady(K key) { + dirs.get(key); + return !buildSet.contains(key); + } + + public void rebuild(K key) throws IOException { + build( key, dirs.get(key) ); + } + + public void shutdown() { + isShutdown = true; + try { + while( !writerCache.isEmpty() ) { + List<K> keys = new ArrayList<K>(writerCache.keySet()); + for( K key : keys ) { + IndexWriter indexWriter = writerCache.remove(key); + if( indexWriter != null ) + indexWriter.close(); + } + } + } catch(IOException e) { + logger.error("",e); + } + } +/* + public void delete(K key) throws IOException { + key = unique.get(key); + try { + synchronized(key) { + MyIndexSearcher searcher = searcherCache.remove(key); + if( searcher != null ) + searcher.getIndexReader().close(); + IndexWriter indexWriter = writerCache.remove(key); + if( indexWriter == null ) + indexWriter = new IndexWriter(dirs.get(key),analyzer,IndexWriter.MaxFieldLength.LIMITED); + indexWriter.deleteAll(); + dirs.remove(key); + File dirFile = new File(dir,key.toString()); + File versionFile = new File(dirFile, "version"); + if( !versionFile.delete() ) + logger.error("couldn't delete "+versionFile+" for site delete"); + if( !dirFile.delete() ) + logger.error("couldn't delete "+dirFile); + } + } finally { + unique.free(key); + } + } +*/ + void deleteUnusedIndexes() { + File[] files = dir.listFiles(); + if( files == null ) + return; // not made yet + for( File indexDir : files ) { + if( Executors.isShuttingDown() ) + return; + if( !builder.exists( indexDir.getName() ) ) { + if( !IoUtils.delete(indexDir) ) + logger.error("couldn't delete "+indexDir); + } + } + } +}