comparison src/luan/modules/lucene/LuceneIndex.java @ 1387:bc40bc9aab3a

start postgres backup
author Franklin Schmidt <fschmidt@gmail.com>
date Mon, 02 Sep 2019 22:23:12 -0600
parents 87a3738d7cc5
children 2024d23ddd64
comparison
equal deleted inserted replaced
1386:dc36dd8bf839 1387:bc40bc9aab3a
43 import org.apache.lucene.store.FSDirectory; 43 import org.apache.lucene.store.FSDirectory;
44 import org.apache.lucene.util.Version; 44 import org.apache.lucene.util.Version;
45 import org.apache.lucene.util.BytesRef; 45 import org.apache.lucene.util.BytesRef;
46 import org.apache.lucene.util.NumericUtils; 46 import org.apache.lucene.util.NumericUtils;
47 import org.apache.lucene.search.Query; 47 import org.apache.lucene.search.Query;
48 import org.apache.lucene.search.PrefixQuery;
48 import org.apache.lucene.search.TermQuery; 49 import org.apache.lucene.search.TermQuery;
49 import org.apache.lucene.search.TopDocs; 50 import org.apache.lucene.search.TopDocs;
50 import org.apache.lucene.search.Sort; 51 import org.apache.lucene.search.Sort;
51 import org.apache.lucene.search.SortField; 52 import org.apache.lucene.search.SortField;
52 import org.apache.lucene.search.IndexSearcher; 53 import org.apache.lucene.search.IndexSearcher;
108 } 109 }
109 } 110 }
110 111
111 private static Map<String,LuceneIndex> indexes = new HashMap<String,LuceneIndex>(); 112 private static Map<String,LuceneIndex> indexes = new HashMap<String,LuceneIndex>();
112 113
113 public static Object[] getLuceneIndex(Luan luan,File indexDir,FieldParser defaultFieldParser,String[] defaultFields) 114 public static Object[] getLuceneIndex(Luan luan,File indexDir,FieldParser defaultFieldParser,String[] defaultFields,LuanFunction completer)
114 throws LuanException, IOException 115 throws LuanException, IOException
115 { 116 {
116 String key = indexDir.getCanonicalPath(); 117 String key = indexDir.getCanonicalPath();
117 synchronized(indexes) { 118 synchronized(indexes) {
118 LuceneIndex li = indexes.get(key); 119 LuceneIndex li = indexes.get(key);
119 if( li == null ) { 120 if( li == null ) {
120 li = new LuceneIndex(indexDir,defaultFieldParser,defaultFields,key); 121 li = new LuceneIndex(indexDir,defaultFieldParser,defaultFields,key,completer);
121 li.openCount = 1; 122 li.openCount = 1;
122 indexes.put(key,li); 123 indexes.put(key,li);
123 } else { 124 } else {
124 if( defaultFieldParser != li.defaultFieldParser ) 125 if( defaultFieldParser != li.defaultFieldParser )
125 throw new LuanException("default_type doesn't match previous use"); 126 throw new LuanException("default_type doesn't match previous use");
155 private int openCount; 156 private int openCount;
156 private final String key; 157 private final String key;
157 private final FieldParser defaultFieldParser; 158 private final FieldParser defaultFieldParser;
158 private final String[] defaultFields; 159 private final String[] defaultFields;
159 160
160 private LuceneIndex(File indexDir,FieldParser defaultFieldParser,String[] defaultFields,String key) 161 private final PostgresBackup postgresBackup;
162
163 private LuceneIndex(File indexDir,FieldParser defaultFieldParser,String[] defaultFields,String key,LuanFunction completer)
161 throws LuanException, IOException 164 throws LuanException, IOException
162 { 165 {
163 this.key = key; 166 this.key = key;
164 this.defaultFieldParser = defaultFieldParser; 167 this.defaultFieldParser = defaultFieldParser;
165 this.defaultFields = defaultFields; 168 this.defaultFields = defaultFields;
172 StringFieldParser sfp = (StringFieldParser)defaultFieldParser; 175 StringFieldParser sfp = (StringFieldParser)defaultFieldParser;
173 analyzer = sfp.analyzer; 176 analyzer = sfp.analyzer;
174 } 177 }
175 this.analyzer = analyzer; 178 this.analyzer = analyzer;
176 reopen(); 179 reopen();
180 postgresBackup = completer!=null ? PostgresBackup.newInstance() : null;
181 if( postgresBackup != null && postgresBackup.wasCreated )
182 rebuild_postgres_backup(completer);
177 } 183 }
178 184
179 public void reopen() throws LuanException, IOException { 185 public void reopen() throws LuanException, IOException {
180 IndexWriterConfig conf = new IndexWriterConfig(version,analyzer); 186 IndexWriterConfig conf = new IndexWriterConfig(version,analyzer);
181 snapshotDeletionPolicy = new SnapshotDeletionPolicy(conf.getIndexDeletionPolicy()); 187 snapshotDeletionPolicy = new SnapshotDeletionPolicy(conf.getIndexDeletionPolicy());
197 boolean commit = !writeLock.isHeldByCurrentThread(); 203 boolean commit = !writeLock.isHeldByCurrentThread();
198 writeLock.lock(); 204 writeLock.lock();
199 try { 205 try {
200 writer.deleteAll(); 206 writer.deleteAll();
201 id = idLim = 0; 207 id = idLim = 0;
208 if( postgresBackup != null )
209 postgresBackup.deleteAll();
202 if(commit) writer.commit(); 210 if(commit) writer.commit();
203 } finally { 211 } finally {
204 wrote(); 212 wrote();
205 writeLock.unlock(); 213 writeLock.unlock();
206 } 214 }
210 BytesRef br = new BytesRef(); 218 BytesRef br = new BytesRef();
211 NumericUtils.longToPrefixCoded(value,0,br); 219 NumericUtils.longToPrefixCoded(value,0,br);
212 return new Term(key,br); 220 return new Term(key,br);
213 } 221 }
214 222
223 private void backupDelete(Query query)
224 throws IOException
225 {
226 if( postgresBackup != null ) {
227 final List<Long> ids = new ArrayList<Long>();
228 IndexSearcher searcher = openSearcher();
229 MyCollector col = new MyCollector() {
230 @Override public void collect(int iDoc) throws IOException {
231 Document doc = searcher.doc( docBase + iDoc );
232 Long id = (Long)doc.getField("id").numericValue();
233 ids.add(id);
234 }
235 };
236 searcher.search(query,col);
237 postgresBackup.begin();
238 for( Long id : ids ) {
239 postgresBackup.delete(id);
240 }
241 postgresBackup.commit();
242 }
243 }
244
215 public void delete(String queryStr) 245 public void delete(String queryStr)
216 throws IOException, ParseException 246 throws IOException, ParseException
217 { 247 {
218 Query query = SaneQueryParser.parseQuery(mfp,queryStr); 248 Query query = SaneQueryParser.parseQuery(mfp,queryStr);
219 249
220 boolean commit = !writeLock.isHeldByCurrentThread(); 250 boolean commit = !writeLock.isHeldByCurrentThread();
221 writeLock.lock(); 251 writeLock.lock();
222 try { 252 try {
253 backupDelete(query);
223 writer.deleteDocuments(query); 254 writer.deleteDocuments(query);
224 if(commit) writer.commit(); 255 if(commit) writer.commit();
225 } finally { 256 } finally {
226 wrote(); 257 wrote();
227 writeLock.unlock(); 258 writeLock.unlock();
233 } 264 }
234 265
235 public void save(LuanTable doc,LuanTable boosts) 266 public void save(LuanTable doc,LuanTable boosts)
236 throws LuanException, IOException 267 throws LuanException, IOException
237 { 268 {
269 if( boosts!=null && postgresBackup!=null )
270 logger.error("boosts are not saved to postgres backup");
271
238 Object obj = doc.get("id"); 272 Object obj = doc.get("id");
239 Long id; 273 Long id;
240 try { 274 try {
241 id = (Long)obj; 275 id = (Long)obj;
242 } catch(ClassCastException e) { 276 } catch(ClassCastException e) {
248 try { 282 try {
249 if( id == null ) { 283 if( id == null ) {
250 id = nextId(); 284 id = nextId();
251 doc.put("id",id); 285 doc.put("id",id);
252 writer.addDocument(toLucene(doc,boosts)); 286 writer.addDocument(toLucene(doc,boosts));
287 if( postgresBackup != null )
288 postgresBackup.add(id,doc);
253 } else { 289 } else {
254 writer.updateDocument( term("id",id), toLucene(doc,boosts) ); 290 writer.updateDocument( term("id",id), toLucene(doc,boosts) );
291 if( postgresBackup != null )
292 postgresBackup.update(id,doc);
255 } 293 }
256 if(commit) writer.commit(); 294 if(commit) writer.commit();
257 } finally { 295 } finally {
258 wrote(); 296 wrote();
259 writeLock.unlock(); 297 writeLock.unlock();
261 } 299 }
262 300
263 public Object run_in_transaction(LuanFunction fn) throws IOException, LuanException { 301 public Object run_in_transaction(LuanFunction fn) throws IOException, LuanException {
264 boolean commit = !writeLock.isHeldByCurrentThread(); 302 boolean commit = !writeLock.isHeldByCurrentThread();
265 writeLock.lock(); 303 writeLock.lock();
266 try { 304 boolean ok = false;
305 try {
306 if( commit && postgresBackup != null )
307 postgresBackup.begin();
267 Object rtn = fn.call(); 308 Object rtn = fn.call();
268 if(commit) writer.commit(); 309 ok = true;
310 if(commit) {
311 if( postgresBackup != null )
312 postgresBackup.commit();
313 writer.commit();
314 }
269 return rtn; 315 return rtn;
270 } finally { 316 } finally {
317 if( !ok && commit ) {
318 if( postgresBackup != null )
319 postgresBackup.rollback();
320 writer.rollback();
321 reopen();
322 }
271 wrote(); 323 wrote();
272 writeLock.unlock(); 324 writeLock.unlock();
273 } 325 }
274 } 326 }
275 327
328 // ???
276 public Object run_in_lock(LuanFunction fn) throws IOException, LuanException { 329 public Object run_in_lock(LuanFunction fn) throws IOException, LuanException {
277 if( writeLock.isHeldByCurrentThread() ) 330 if( writeLock.isHeldByCurrentThread() )
278 throw new RuntimeException(); 331 throw new RuntimeException();
279 writeLock.lock(); 332 writeLock.lock();
280 try { 333 try {
371 } 424 }
372 } 425 }
373 } 426 }
374 427
375 public void doClose() throws IOException { 428 public void doClose() throws IOException {
429 if( postgresBackup != null )
430 postgresBackup.close();
376 writer.close(); 431 writer.close();
377 reader.close(); 432 reader.close();
378 } 433 }
379 434
380 435
723 } 778 }
724 ts.close(); 779 ts.close();
725 return n; 780 return n;
726 } 781 }
727 782
783
784
785 public boolean hasPostgresBackup() {
786 return postgresBackup != null;
787 }
788
789 public void rebuild_postgres_backup(LuanFunction completer)
790 throws IOException, LuanException
791 {
792 writeLock.lock();
793 boolean ok = false;
794 try {
795 postgresBackup.begin();
796 postgresBackup.deleteAll();
797 Query query = new PrefixQuery(new Term("id"));
798 IndexSearcher searcher = openSearcher();
799 MyCollector col = new MyCollector() {
800 @Override public void collect(int iDoc) throws IOException {
801 try {
802 Document doc = searcher.doc( docBase + iDoc );
803 LuanTable tbl = toTable(completer.luan(),doc);
804 tbl = (LuanTable)completer.call(tbl);
805 Long id = (Long)tbl.get("id");
806 //logger.info("id = "+id);
807 postgresBackup.add(id,tbl);
808 } catch(LuanException e) {
809 throw new LuanRuntimeException(e);
810 }
811 }
812 };
813 try {
814 searcher.search(query,col);
815 } catch(LuanRuntimeException e) {
816 throw (LuanException)e.getCause();
817 }
818 ok = true;
819 postgresBackup.commit();
820 } finally {
821 if( !ok )
822 postgresBackup.rollback();
823 writeLock.unlock();
824 }
825 }
826
728 } 827 }