comparison src/luan/modules/lucene/LuceneIndex.java @ 1392:002152af497a

hosted postgres
author Franklin Schmidt <fschmidt@gmail.com>
date Fri, 06 Sep 2019 00:19:47 -0600
parents 94f48cc76de8
children cc0dbca576dc
comparison
equal deleted inserted replaced
1391:94f48cc76de8 1392:002152af497a
77 import luan.LuanTable; 77 import luan.LuanTable;
78 import luan.LuanFunction; 78 import luan.LuanFunction;
79 import luan.LuanException; 79 import luan.LuanException;
80 import luan.LuanRuntimeException; 80 import luan.LuanRuntimeException;
81 import luan.modules.parsers.LuanToString; 81 import luan.modules.parsers.LuanToString;
82 import luan.modules.logging.LuanLogger;
82 import luan.lib.logging.Logger; 83 import luan.lib.logging.Logger;
83 import luan.lib.logging.LoggerFactory; 84 import luan.lib.logging.LoggerFactory;
84 85
85 86
86 public final class LuceneIndex { 87 public final class LuceneIndex {
96 luan.onClose(this); 97 luan.onClose(this);
97 } 98 }
98 99
99 public void close() throws IOException { 100 public void close() throws IOException {
100 if( !isClosed ) { 101 if( !isClosed ) {
101 li.close(); 102 try {
103 li.close();
104 } catch(SQLException e) {
105 throw new RuntimeException(e);
106 }
102 isClosed = true; 107 isClosed = true;
103 } 108 }
104 } 109 }
105 110
106 protected void finalize() throws Throwable { 111 protected void finalize() throws Throwable {
112 } 117 }
113 } 118 }
114 119
115 private static Map<String,LuceneIndex> indexes = new HashMap<String,LuceneIndex>(); 120 private static Map<String,LuceneIndex> indexes = new HashMap<String,LuceneIndex>();
116 121
117 public static Object[] getLuceneIndex(Luan luan,File indexDir,FieldParser defaultFieldParser,String[] defaultFields,LuanFunction completer) 122 public static Object[] getLuceneIndex(Luan luan,File indexDir,FieldParser defaultFieldParser,String[] defaultFields,LuanTable postgresSpec)
118 throws LuanException, IOException 123 throws LuanException, IOException, ClassNotFoundException, SQLException
119 { 124 {
120 String key = indexDir.getCanonicalPath(); 125 String key = indexDir.getCanonicalPath();
121 synchronized(indexes) { 126 synchronized(indexes) {
122 LuceneIndex li = indexes.get(key); 127 LuceneIndex li = indexes.get(key);
123 if( li == null ) { 128 if( li == null ) {
124 li = new LuceneIndex(indexDir,defaultFieldParser,defaultFields,key,completer); 129 li = new LuceneIndex(luan,indexDir,defaultFieldParser,defaultFields,key,postgresSpec);
125 li.openCount = 1; 130 li.openCount = 1;
126 indexes.put(key,li); 131 indexes.put(key,li);
127 } else { 132 } else {
128 if( defaultFieldParser != li.defaultFieldParser ) 133 if( defaultFieldParser != li.defaultFieldParser )
129 throw new LuanException("default_type doesn't match previous use"); 134 throw new LuanException("default_type doesn't match previous use");
161 private final FieldParser defaultFieldParser; 166 private final FieldParser defaultFieldParser;
162 private final String[] defaultFields; 167 private final String[] defaultFields;
163 168
164 private final PostgresBackup postgresBackup; 169 private final PostgresBackup postgresBackup;
165 170
166 private LuceneIndex(File indexDir,FieldParser defaultFieldParser,String[] defaultFields,String key,LuanFunction completer) 171 private LuceneIndex(Luan luan,File indexDir,FieldParser defaultFieldParser,String[] defaultFields,String key,LuanTable postgresSpec)
167 throws LuanException, IOException 172 throws LuanException, IOException, ClassNotFoundException, SQLException
168 { 173 {
174 final Logger logger = LuanLogger.getLogger(luan,LuceneIndex.class);
169 this.key = key; 175 this.key = key;
170 this.defaultFieldParser = defaultFieldParser; 176 this.defaultFieldParser = defaultFieldParser;
171 this.defaultFields = defaultFields; 177 this.defaultFields = defaultFields;
172 mfp = defaultFieldParser==null ? new MultiFieldParser() : new MultiFieldParser(defaultFieldParser,defaultFields); 178 mfp = defaultFieldParser==null ? new MultiFieldParser() : new MultiFieldParser(defaultFieldParser,defaultFields);
173 mfp.fields.put( "type", STRING_FIELD_PARSER ); 179 mfp.fields.put( "type", STRING_FIELD_PARSER );
178 StringFieldParser sfp = (StringFieldParser)defaultFieldParser; 184 StringFieldParser sfp = (StringFieldParser)defaultFieldParser;
179 analyzer = sfp.analyzer; 185 analyzer = sfp.analyzer;
180 } 186 }
181 this.analyzer = analyzer; 187 this.analyzer = analyzer;
182 boolean wasCreated = reopen(); 188 boolean wasCreated = reopen();
183 postgresBackup = completer!=null ? PostgresBackup.newInstance() : null; 189 if( postgresSpec == null ) {
184 if( postgresBackup != null ) { 190 postgresBackup = null;
185 if( !wasCreated && postgresBackup.wasCreated ) { 191 } else {
186 logger.error("rebuilding postgres backup"); 192 Map spec = postgresSpec.asMap();
187 rebuild_postgres_backup(completer); 193 LuanFunction completer = Utils.removeRequiredFunction(spec,"completer");
188 } else if( wasCreated && !postgresBackup.wasCreated ) { 194 postgresBackup = new PostgresBackup(spec);
189 logger.error("restoring from postgres"); 195 if( postgresBackup != null ) {
190 restore_from_postgres(); 196 if( !wasCreated && postgresBackup.wasCreated ) {
197 logger.error("rebuilding postgres backup");
198 rebuild_postgres_backup(completer);
199 } else if( wasCreated && !postgresBackup.wasCreated ) {
200 logger.error("restoring from postgres");
201 restore_from_postgres(luan);
202 }
191 } 203 }
192 } 204 }
193 } 205 }
194 206
195 public boolean reopen() throws IOException { 207 public boolean reopen() throws IOException {
208 220
209 private void wrote() { 221 private void wrote() {
210 writeCounter.incrementAndGet(); 222 writeCounter.incrementAndGet();
211 } 223 }
212 224
213 public void delete_all() throws IOException { 225 public void delete_all() throws IOException, SQLException {
214 boolean commit = !writeLock.isHeldByCurrentThread(); 226 boolean commit = !writeLock.isHeldByCurrentThread();
215 writeLock.lock(); 227 writeLock.lock();
216 try { 228 try {
217 writer.deleteAll(); 229 writer.deleteAll();
218 id = idLim = 0; 230 id = idLim = 0;
230 NumericUtils.longToPrefixCoded(value,0,br); 242 NumericUtils.longToPrefixCoded(value,0,br);
231 return new Term(key,br); 243 return new Term(key,br);
232 } 244 }
233 245
234 private void backupDelete(Query query) 246 private void backupDelete(Query query)
235 throws IOException 247 throws IOException, SQLException, LuanException
236 { 248 {
237 if( postgresBackup != null ) { 249 if( postgresBackup != null ) {
238 final List<Long> ids = new ArrayList<Long>(); 250 final List<Long> ids = new ArrayList<Long>();
239 IndexSearcher searcher = openSearcher(); 251 IndexSearcher searcher = openSearcher();
240 try { 252 try {
256 postgresBackup.commit(); 268 postgresBackup.commit();
257 } 269 }
258 } 270 }
259 271
260 public void delete(String queryStr) 272 public void delete(String queryStr)
261 throws IOException, ParseException 273 throws IOException, ParseException, SQLException, LuanException
262 { 274 {
263 Query query = SaneQueryParser.parseQuery(mfp,queryStr); 275 Query query = SaneQueryParser.parseQuery(mfp,queryStr);
264 276
265 boolean commit = !writeLock.isHeldByCurrentThread(); 277 boolean commit = !writeLock.isHeldByCurrentThread();
266 writeLock.lock(); 278 writeLock.lock();
277 public void indexed_only_fields(List<String> fields) { 289 public void indexed_only_fields(List<String> fields) {
278 indexOnly.addAll(fields); 290 indexOnly.addAll(fields);
279 } 291 }
280 292
281 public void save(LuanTable doc,LuanTable boosts) 293 public void save(LuanTable doc,LuanTable boosts)
282 throws LuanException, IOException 294 throws LuanException, IOException, SQLException
283 { 295 {
284 if( boosts!=null && postgresBackup!=null ) 296 if( boosts!=null && postgresBackup!=null )
285 logger.error("boosts are not saved to postgres backup"); 297 throw new LuanException("boosts are not saved to postgres backup");
286 298
287 Object obj = doc.get("id"); 299 Object obj = doc.get("id");
288 Long id; 300 Long id;
289 try { 301 try {
290 id = (Long)obj; 302 id = (Long)obj;
311 wrote(); 323 wrote();
312 writeLock.unlock(); 324 writeLock.unlock();
313 } 325 }
314 } 326 }
315 327
316 public Object run_in_transaction(LuanFunction fn) throws IOException, LuanException { 328 public Object run_in_transaction(LuanFunction fn)
329 throws IOException, LuanException, SQLException
330 {
317 boolean commit = !writeLock.isHeldByCurrentThread(); 331 boolean commit = !writeLock.isHeldByCurrentThread();
318 writeLock.lock(); 332 writeLock.lock();
319 boolean ok = false; 333 boolean ok = false;
320 try { 334 try {
321 if( commit && postgresBackup != null ) 335 if( commit && postgresBackup != null )
431 445
432 public String to_string() { 446 public String to_string() {
433 return writer.getDirectory().toString(); 447 return writer.getDirectory().toString();
434 } 448 }
435 449
436 private synchronized void close() throws IOException { 450 private synchronized void close() throws IOException, SQLException {
437 if( openCount > 0 ) { 451 if( openCount > 0 ) {
438 if( --openCount == 0 ) { 452 if( --openCount == 0 ) {
439 doClose(); 453 doClose();
440 synchronized(indexes) { 454 synchronized(indexes) {
441 indexes.remove(key); 455 indexes.remove(key);
442 } 456 }
443 } 457 }
444 } 458 }
445 } 459 }
446 460
447 public void doClose() throws IOException { 461 public void doClose() throws IOException, SQLException {
462 writer.close();
463 reader.close();
448 if( postgresBackup != null ) 464 if( postgresBackup != null )
449 postgresBackup.close(); 465 postgresBackup.close();
450 writer.close();
451 reader.close();
452 } 466 }
453 467
454 468
455 private static class DocFn extends LuanFunction { 469 private static class DocFn extends LuanFunction {
456 final IndexSearcher searcher; 470 final IndexSearcher searcher;
804 public boolean hasPostgresBackup() { 818 public boolean hasPostgresBackup() {
805 return postgresBackup != null; 819 return postgresBackup != null;
806 } 820 }
807 821
808 public void rebuild_postgres_backup(LuanFunction completer) 822 public void rebuild_postgres_backup(LuanFunction completer)
809 throws IOException, LuanException 823 throws IOException, LuanException, SQLException
810 { 824 {
825 final Logger logger = LuanLogger.getLogger(completer.luan(),LuceneIndex.class);
826 logger.info("start rebuild_postgres_backup");
811 writeLock.lock(); 827 writeLock.lock();
812 IndexSearcher searcher = openSearcher(); 828 IndexSearcher searcher = openSearcher();
813 boolean ok = false; 829 boolean ok = false;
814 try { 830 try {
815 postgresBackup.begin(); 831 postgresBackup.begin();
822 LuanTable tbl = toTable(completer.luan(),doc); 838 LuanTable tbl = toTable(completer.luan(),doc);
823 tbl = (LuanTable)completer.call(tbl); 839 tbl = (LuanTable)completer.call(tbl);
824 postgresBackup.add(tbl); 840 postgresBackup.add(tbl);
825 } catch(LuanException e) { 841 } catch(LuanException e) {
826 throw new LuanRuntimeException(e); 842 throw new LuanRuntimeException(e);
843 } catch(SQLException e) {
844 throw new RuntimeException(e);
827 } 845 }
828 } 846 }
829 }; 847 };
830 try { 848 try {
831 searcher.search(query,col); 849 searcher.search(query,col);
838 close(searcher); 856 close(searcher);
839 if( !ok ) 857 if( !ok )
840 postgresBackup.rollback(); 858 postgresBackup.rollback();
841 writeLock.unlock(); 859 writeLock.unlock();
842 } 860 }
843 } 861 logger.info("end rebuild_postgres_backup");
844 862 }
845 public void restore_from_postgres() 863
846 throws IOException, LuanException 864 public void restore_from_postgres(Luan luan)
847 { 865 throws IOException, LuanException, SQLException
866 {
867 final Logger logger = LuanLogger.getLogger(luan,LuceneIndex.class);
868 logger.warn("start restore_from_postgres");
848 if( postgresBackup==null ) 869 if( postgresBackup==null )
849 throw new NullPointerException(); 870 throw new NullPointerException();
850 if( writeLock.isHeldByCurrentThread() ) 871 if( writeLock.isHeldByCurrentThread() )
851 throw new RuntimeException(); 872 throw new RuntimeException();
852 writeLock.lock(); 873 writeLock.lock();
864 reopen(); 885 reopen();
865 } 886 }
866 wrote(); 887 wrote();
867 writeLock.unlock(); 888 writeLock.unlock();
868 } 889 }
890 logger.warn("end restore_from_postgres");
869 } 891 }
870 892
871 void restore(LuanTable doc) 893 void restore(LuanTable doc)
872 throws LuanException, IOException 894 throws LuanException, IOException
873 { 895 {
874 writer.addDocument(toLucene(doc,null)); 896 writer.addDocument(toLucene(doc,null));
875 } 897 }
876 898
877 public void check(LuanFunction completer) throws IOException, SQLException, LuanException { 899 public void check(LuanFunction completer) throws IOException, SQLException, LuanException {
900 final Logger logger = LuanLogger.getLogger(completer.luan(),LuceneIndex.class);
878 logger.info("start check"); 901 logger.info("start check");
879 CheckIndex.Status status = new CheckIndex(fsDir).checkIndex(); 902 CheckIndex.Status status = new CheckIndex(fsDir).checkIndex();
880 if( !status.clean ) 903 if( !status.clean )
881 logger.error("index not clean"); 904 logger.error("index not clean");
882 if( postgresBackup != null ) 905 if( postgresBackup != null )
883 checkPostgres(completer); 906 checkPostgres(completer);
884 logger.info("end check"); 907 logger.info("end check");
885 } 908 }
886 909
887 private void checkPostgres(LuanFunction completer) throws IOException, SQLException, LuanException { 910 private void checkPostgres(LuanFunction completer) throws IOException, SQLException, LuanException {
911 final Logger logger = LuanLogger.getLogger(completer.luan(),LuceneIndex.class);
888 final PostgresBackup.Checker postgresChecker; 912 final PostgresBackup.Checker postgresChecker;
889 final IndexSearcher searcher; 913 final IndexSearcher searcher;
890 writeLock.lock(); 914 writeLock.lock();
891 try { 915 try {
892 postgresChecker = postgresBackup.newChecker(); 916 postgresChecker = postgresBackup.newChecker();