0
|
1 package nabble.model.lucene;
|
|
2
|
|
3 import java.io.File;
|
|
4 import java.io.IOException;
|
|
5 import java.util.Map;
|
|
6 import java.util.HashMap;
|
|
7 import java.util.Set;
|
|
8 import java.util.Collections;
|
|
9 import java.util.List;
|
|
10 import java.util.ArrayList;
|
|
11 import java.util.concurrent.ConcurrentHashMap;
|
|
12 import java.util.concurrent.TimeUnit;
|
|
13 import org.apache.lucene.analysis.Analyzer;
|
|
14 import org.apache.lucene.index.IndexReader;
|
|
15 import org.apache.lucene.index.IndexWriter;
|
|
16 import org.apache.lucene.search.IndexSearcher;
|
|
17 import org.apache.lucene.store.FSDirectory;
|
|
18 import org.apache.lucene.store.NoSuchDirectoryException;
|
|
19 import org.slf4j.Logger;
|
|
20 import org.slf4j.LoggerFactory;
|
|
21 import fschmidt.db.util.Unique;
|
|
22 import fschmidt.util.java.Computable;
|
|
23 import fschmidt.util.java.Memoizer;
|
|
24 import fschmidt.util.java.IoUtils;
|
|
25 import nabble.model.Init;
|
|
26 import nabble.model.Executors;
|
|
27
|
|
28
|
|
29 public final class IndexCache<K> {
|
|
30 private static final Logger logger = LoggerFactory.getLogger(IndexCache.class);
|
|
31
|
|
32 public interface Builder<K> {
|
|
33 public void build(K key) throws Exception;
|
|
34 public boolean exists(String keyString);
|
|
35 }
|
|
36
|
|
37 private final Unique<K> unique = new Unique<K>();
|
|
38 private final File dir;
|
|
39 private final Analyzer analyzer;
|
|
40 private final int version;
|
|
41 private final Builder<K> builder;
|
|
42 private volatile boolean isShutdown = false;
|
|
43
|
|
44 public IndexCache(File dir, Analyzer analyzer, int version, Builder<K> builder) {
|
|
45 this.dir = dir;
|
|
46 this.analyzer = analyzer;
|
|
47 this.version = version;
|
|
48 this.builder = builder;
|
|
49 Executors.executeSometime(new Runnable(){
|
|
50 public void run() {
|
|
51 deleteUnusedIndexes();
|
|
52 }
|
|
53 });
|
|
54 }
|
|
55
|
|
56 private final Memoizer<K,FSDirectory> dirs = new Memoizer<K,FSDirectory>(new Computable<K,FSDirectory>() {
|
|
57 public FSDirectory get(K key) {
|
|
58 File dirFile = new File(dir,key.toString());
|
|
59 dirFile.mkdirs();
|
|
60 try {
|
|
61 FSDirectory dir = FSDirectory.open(dirFile);
|
|
62 if( IndexWriter.isLocked(dir) ) {
|
|
63 logger.error("Lucene index "+dir+" was locked");
|
|
64 IndexWriter.unlock(dir);
|
|
65 }
|
|
66 check(key,dir);
|
|
67 return dir;
|
|
68 } catch(IOException e) {
|
|
69 logger.error(e.toString());
|
|
70 System.exit(-1);
|
|
71 throw new RuntimeException(); // never
|
|
72 } catch(RuntimeException e) {
|
|
73 logger.error(e.toString());
|
|
74 System.exit(-1);
|
|
75 throw new RuntimeException(); // never
|
|
76 }
|
|
77 }
|
|
78 });
|
|
79
|
|
80
|
|
81 private final Map<K,MyIndexSearcher> searcherCache = new HashMap<K,MyIndexSearcher>();
|
|
82
|
|
83 private class MyIndexSearcher extends IndexSearcher implements LuceneSearcher {
|
|
84 private final K key;
|
|
85 private int opens = 0;
|
|
86 private int closes = 0;
|
|
87 private boolean isRemoved = false;
|
|
88
|
|
89 MyIndexSearcher(K key,IndexReader reader) throws IOException {
|
|
90 super(reader);
|
|
91 this.key = key;
|
|
92 }
|
|
93
|
|
94 @Override public void close() throws IOException {
|
|
95 K k = unique.get(key);
|
|
96 try {
|
|
97 synchronized(k) {
|
|
98 if( ++closes == opens ) {
|
|
99 final int oldOpens = opens;
|
|
100 Executors.schedule(new Runnable() {
|
|
101 public void run() {
|
|
102 K k = unique.get(key);
|
|
103 try {
|
|
104 synchronized(k) {
|
|
105 if( oldOpens == opens ) {
|
|
106 if( !isRemoved )
|
|
107 searcherCache.remove(key);
|
|
108 try {
|
|
109 MyIndexSearcher.super.close();
|
|
110 getIndexReader().close();
|
|
111 } catch(IOException e) {
|
|
112 logger.error("",e);
|
|
113 }
|
|
114 }
|
|
115 }
|
|
116 } finally {
|
|
117 unique.free(k);
|
|
118 }
|
|
119 }
|
|
120 }, 5, TimeUnit.SECONDS );
|
|
121 }
|
|
122 }
|
|
123 } finally {
|
|
124 unique.free(k);
|
|
125 }
|
|
126 }
|
|
127 }
|
|
128
|
|
129 public LuceneSearcher openSearcher(K key) throws IOException {
|
|
130 K k = unique.get(key);
|
|
131 try {
|
|
132 synchronized(k) {
|
|
133 MyIndexSearcher searcher = searcherCache.get(key);
|
|
134 if( searcher == null ) {
|
|
135 searcher = new MyIndexSearcher( key, IndexReader.open(dirs.get(key)) );
|
|
136 searcherCache.put(key,searcher);
|
|
137 } else {
|
|
138 IndexReader indexReader = searcher.getIndexReader();
|
|
139 IndexReader newReader = indexReader.reopen();
|
|
140 if( newReader != indexReader ) {
|
|
141 searcherCache.remove(key);
|
|
142 searcher.isRemoved = true;
|
|
143 searcher = new MyIndexSearcher( key, newReader );
|
|
144 searcherCache.put(key,searcher);
|
|
145 }
|
|
146 }
|
|
147 searcher.opens++;
|
|
148 return new LuceneSearcherImpl(searcher);
|
|
149 }
|
|
150 } finally {
|
|
151 unique.free(k);
|
|
152 }
|
|
153 }
|
|
154
|
|
155
|
|
156 private final Map<K,MyIndexWriter> writerCache = new HashMap<K,MyIndexWriter>();
|
|
157
|
|
158 private class MyIndexWriter extends IndexWriter {
|
|
159 private final K key;
|
|
160 private int opens = 0;
|
|
161 private int closes = 0;
|
|
162 private boolean willCommit = false;
|
|
163
|
|
164 MyIndexWriter(K key) throws IOException {
|
|
165 super(dirs.get(key),analyzer,IndexWriter.MaxFieldLength.LIMITED);
|
|
166 this.key = key;
|
|
167 }
|
|
168
|
|
169 @Override public void close() throws IOException {
|
|
170 K k = unique.get(key);
|
|
171 try {
|
|
172 synchronized(k) {
|
|
173 if( ++closes == opens ) {
|
|
174 final int oldOpens = opens;
|
|
175 Executors.schedule(new Runnable() {
|
|
176 public void run() {
|
|
177 K k = unique.get(key);
|
|
178 try {
|
|
179 synchronized(k) {
|
|
180 if( oldOpens == opens ) {
|
|
181 writerCache.remove(key);
|
|
182 try {
|
|
183 MyIndexWriter.super.close();
|
|
184 } catch(NoSuchDirectoryException e) {
|
|
185 logger.info("",e); // site could be deleted
|
|
186 } catch(IOException e) {
|
|
187 logger.error("",e);
|
|
188 }
|
|
189 willCommit = false;
|
|
190 }
|
|
191 }
|
|
192 } finally {
|
|
193 unique.free(k);
|
|
194 }
|
|
195 }
|
|
196 }, 5, TimeUnit.SECONDS );
|
|
197 }
|
|
198 if( !willCommit ) {
|
|
199 willCommit = true;
|
|
200 Executors.schedule(new Runnable() {
|
|
201 public void run() {
|
|
202 K k = unique.get(key);
|
|
203 try {
|
|
204 synchronized(k) {
|
|
205 if( willCommit ) {
|
|
206 try {
|
|
207 commit();
|
|
208 } catch(IOException e) {
|
|
209 logger.error("",e);
|
|
210 }
|
|
211 willCommit = false;
|
|
212 }
|
|
213 }
|
|
214 } finally {
|
|
215 unique.free(k);
|
|
216 }
|
|
217 }
|
|
218 }, 5, TimeUnit.SECONDS );
|
|
219 }
|
|
220 }
|
|
221 } finally {
|
|
222 unique.free(k);
|
|
223 }
|
|
224 }
|
|
225 }
|
|
226
|
|
227 public IndexWriter openIndexWriter(K key) throws IOException {
|
|
228 if( isShutdown )
|
|
229 throw new RuntimeException("shutdown");
|
|
230 key = unique.get(key);
|
|
231 try {
|
|
232 synchronized(key) {
|
|
233 MyIndexWriter indexWriter = writerCache.get(key);
|
|
234 if( indexWriter == null ) {
|
|
235 indexWriter = new MyIndexWriter(key);
|
|
236 writerCache.put(key,indexWriter);
|
|
237 }
|
|
238 indexWriter.opens++;
|
|
239 return indexWriter;
|
|
240 }
|
|
241 } finally {
|
|
242 unique.free(key);
|
|
243 }
|
|
244 }
|
|
245
|
|
246
|
|
247 private final Set<K> buildSet = Collections.newSetFromMap(new ConcurrentHashMap<K,Boolean>());
|
|
248
|
|
249 private void check(K key,FSDirectory dir) throws IOException {
|
|
250 if( !Init.hasDaemons )
|
|
251 return;
|
|
252 File checkFile = checkFile(dir);
|
|
253 int currentVersion = 0;
|
|
254 try {
|
|
255 currentVersion = Integer.parseInt(IoUtils.read(checkFile).trim());
|
|
256 } catch (Exception e) {}
|
|
257 if (currentVersion > version) {
|
|
258 throw new RuntimeException("Lucene index version for "+key+" is "+version
|
|
259 +", found version "+currentVersion+" in "+dir.getFile());
|
|
260 }
|
|
261 if( currentVersion == version )
|
|
262 return; // ok
|
|
263
|
|
264 if( !buildSet.add(key) )
|
|
265 throw new RuntimeException("already building "+key);
|
|
266 build(key,dir);
|
|
267 }
|
|
268
|
|
269 private File checkFile(FSDirectory dir) {
|
|
270 return new File(dir.getFile(), "version");
|
|
271 }
|
|
272
|
|
273 private void build(K k,FSDirectory dir) throws IOException {
|
|
274 final File checkFile = checkFile(dir);
|
|
275 if( checkFile.exists() && !checkFile.delete() )
|
|
276 logger.error("couldn't delete "+checkFile+" for build");
|
|
277 final K key = unique.get(k);
|
|
278 try {
|
|
279 synchronized(key) {
|
|
280 IndexWriter indexWriter = writerCache.remove(key);
|
|
281 if( indexWriter != null )
|
|
282 indexWriter.close();
|
|
283 new IndexWriter(dir,analyzer,true,IndexWriter.MaxFieldLength.LIMITED).close(); // clear dir
|
|
284 }
|
|
285 } finally {
|
|
286 unique.free(key);
|
|
287 }
|
|
288 Thread thread = new Thread(new Runnable(){public void run(){
|
|
289 try {
|
|
290 logger.info("starting lucene index "+key);
|
|
291 builder.build(key);
|
|
292 IoUtils.write(checkFile,Integer.toString(version));
|
|
293 buildSet.remove(key);
|
|
294 logger.info("finished lucene index "+key);
|
|
295 } catch(Exception e) {
|
|
296 logger.error("lucene build failed for "+key,e);
|
|
297 buildSet.remove(key);
|
|
298 dirs.remove(key);
|
|
299 }
|
|
300 }},"lucene build "+key);
|
|
301 thread.setDaemon(true);
|
|
302 thread.start();
|
|
303 }
|
|
304
|
|
305 public boolean isReady(K key) {
|
|
306 dirs.get(key);
|
|
307 return !buildSet.contains(key);
|
|
308 }
|
|
309
|
|
310 public void rebuild(K key) throws IOException {
|
|
311 build( key, dirs.get(key) );
|
|
312 }
|
|
313
|
|
314 public void shutdown() {
|
|
315 isShutdown = true;
|
|
316 try {
|
|
317 while( !writerCache.isEmpty() ) {
|
|
318 List<K> keys = new ArrayList<K>(writerCache.keySet());
|
|
319 for( K key : keys ) {
|
|
320 IndexWriter indexWriter = writerCache.remove(key);
|
|
321 if( indexWriter != null )
|
|
322 indexWriter.close();
|
|
323 }
|
|
324 }
|
|
325 } catch(IOException e) {
|
|
326 logger.error("",e);
|
|
327 }
|
|
328 }
|
|
329 /*
|
|
330 public void delete(K key) throws IOException {
|
|
331 key = unique.get(key);
|
|
332 try {
|
|
333 synchronized(key) {
|
|
334 MyIndexSearcher searcher = searcherCache.remove(key);
|
|
335 if( searcher != null )
|
|
336 searcher.getIndexReader().close();
|
|
337 IndexWriter indexWriter = writerCache.remove(key);
|
|
338 if( indexWriter == null )
|
|
339 indexWriter = new IndexWriter(dirs.get(key),analyzer,IndexWriter.MaxFieldLength.LIMITED);
|
|
340 indexWriter.deleteAll();
|
|
341 dirs.remove(key);
|
|
342 File dirFile = new File(dir,key.toString());
|
|
343 File versionFile = new File(dirFile, "version");
|
|
344 if( !versionFile.delete() )
|
|
345 logger.error("couldn't delete "+versionFile+" for site delete");
|
|
346 if( !dirFile.delete() )
|
|
347 logger.error("couldn't delete "+dirFile);
|
|
348 }
|
|
349 } finally {
|
|
350 unique.free(key);
|
|
351 }
|
|
352 }
|
|
353 */
|
|
354 void deleteUnusedIndexes() {
|
|
355 File[] files = dir.listFiles();
|
|
356 if( files == null )
|
|
357 return; // not made yet
|
|
358 for( File indexDir : files ) {
|
|
359 if( Executors.isShuttingDown() )
|
|
360 return;
|
|
361 if( !builder.exists( indexDir.getName() ) ) {
|
|
362 if( !IoUtils.delete(indexDir) )
|
|
363 logger.error("couldn't delete "+indexDir);
|
|
364 }
|
|
365 }
|
|
366 }
|
|
367 }
|