comparison src/luan/modules/lucene/LuceneIndex.java @ 1696:2958cf04d844

remove postgres backup
author Franklin Schmidt <fschmidt@gmail.com>
date Sun, 26 Jun 2022 14:40:08 -0600
parents 973d3039c421
children d1e7564a9ce5
comparison
equal deleted inserted replaced
1695:25833dd89844 1696:2958cf04d844
5 import java.io.FileOutputStream; 5 import java.io.FileOutputStream;
6 import java.io.FileInputStream; 6 import java.io.FileInputStream;
7 import java.io.IOException; 7 import java.io.IOException;
8 import java.lang.ref.Reference; 8 import java.lang.ref.Reference;
9 import java.lang.ref.WeakReference; 9 import java.lang.ref.WeakReference;
10 import java.sql.SQLException;
11 import java.util.Arrays; 10 import java.util.Arrays;
12 import java.util.Iterator; 11 import java.util.Iterator;
13 import java.util.Map; 12 import java.util.Map;
14 import java.util.HashMap; 13 import java.util.HashMap;
15 import java.util.List; 14 import java.util.List;
98 private static final Logger logger = LoggerFactory.getLogger(LuceneIndex.class); 97 private static final Logger logger = LoggerFactory.getLogger(LuceneIndex.class);
99 98
100 private static Map<String,Reference<LuceneIndex>> indexes = new HashMap<String,Reference<LuceneIndex>>(); 99 private static Map<String,Reference<LuceneIndex>> indexes = new HashMap<String,Reference<LuceneIndex>>();
101 100
102 public static LuceneIndex getLuceneIndex(Luan luan,File indexDir,LuanTable options) 101 public static LuceneIndex getLuceneIndex(Luan luan,File indexDir,LuanTable options)
103 throws LuanException, IOException, ClassNotFoundException, SQLException 102 throws LuanException, IOException, ClassNotFoundException
104 { 103 {
105 String key = indexDir.getCanonicalPath(); 104 String key = indexDir.getCanonicalPath();
106 synchronized(indexes) { 105 synchronized(indexes) {
107 Reference<LuceneIndex> ref = indexes.get(key); 106 Reference<LuceneIndex> ref = indexes.get(key);
108 if( ref != null ) { 107 if( ref != null ) {
141 private FSDirectory fsDir; 140 private FSDirectory fsDir;
142 private int writeCount; 141 private int writeCount;
143 private AtomicInteger writeCounter = new AtomicInteger(); 142 private AtomicInteger writeCounter = new AtomicInteger();
144 private final GoodIndexWriterConfig config; 143 private final GoodIndexWriterConfig config;
145 144
146 private final PostgresBackup postgresBackup;
147 private boolean wasCreated; 145 private boolean wasCreated;
148 private final File logDir; 146 private final File logDir;
149 private final long logTime; 147 private final long logTime;
150 private final String name; 148 private final String name;
151 private final String domain; 149 private final String domain;
152 150
153 private LuceneIndex(Luan luan,File indexDir,LuanTable options) 151 private LuceneIndex(Luan luan,File indexDir,LuanTable options)
154 throws LuanException, IOException, ClassNotFoundException, SQLException 152 throws LuanException, IOException, ClassNotFoundException
155 { 153 {
156 options = new LuanTable(options); 154 options = new LuanTable(options);
157 this.version = options.remove("version"); 155 this.version = options.remove("version");
158 FieldParser defaultFieldParser = (FieldParser)options.remove("default_type"); 156 FieldParser defaultFieldParser = (FieldParser)options.remove("default_type");
159 LuanTable defaultFieldsTbl = Utils.removeTable(options,"default_fields"); 157 LuanTable defaultFieldsTbl = Utils.removeTable(options,"default_fields");
160 String[] defaultFields = defaultFieldsTbl==null ? null : (String[])defaultFieldsTbl.asList().toArray(new String[0]); 158 String[] defaultFields = defaultFieldsTbl==null ? null : (String[])defaultFieldsTbl.asList().toArray(new String[0]);
161 LuanTable postgresSpec = Utils.removeTable(options,"postgres_spec");
162 LuanFunction supplementer = Utils.removeFunction(options,"supplementer"); 159 LuanFunction supplementer = Utils.removeFunction(options,"supplementer");
163 logDir = (File)options.remove("log_dir"); 160 logDir = (File)options.remove("log_dir");
164 logTime = (Long)options.remove("log_time"); 161 logTime = (Long)options.remove("log_time");
165 name = (String)options.remove("name"); 162 name = (String)options.remove("name");
166 Utils.checkEmpty(options); 163 Utils.checkEmpty(options);
183 analyzer = sfp.analyzer; 180 analyzer = sfp.analyzer;
184 } 181 }
185 this.analyzer = analyzer; 182 this.analyzer = analyzer;
186 this.config = new SupplementingConfig(luceneVersion,mfp,supplementer); 183 this.config = new SupplementingConfig(luceneVersion,mfp,supplementer);
187 wasCreated = reopen(); 184 wasCreated = reopen();
188 if( postgresSpec == null ) {
189 postgresBackup = null;
190 } else {
191 postgresBackup = new PostgresBackup(postgresSpec);
192 if( !wasCreated && postgresBackup.wasCreated ) {
193 logger.error("rebuilding postgres backup");
194 rebuild_postgres_backup(luan);
195 /*
196 } else if( wasCreated && !postgresBackup.wasCreated ) {
197 logger.error("restoring from postgres");
198 restore_from_postgres();
199 */
200 }
201 }
202 } 185 }
203 186
204 public boolean reopen() throws IOException { 187 public boolean reopen() throws IOException {
205 fsDir = FSDirectory.open(indexDir); 188 fsDir = FSDirectory.open(indexDir);
206 boolean wasCreated = !fsDir.getDirectory().exists(); 189 boolean wasCreated = !fsDir.getDirectory().exists();
220 203
221 private void wrote() { 204 private void wrote() {
222 writeCounter.incrementAndGet(); 205 writeCounter.incrementAndGet();
223 } 206 }
224 207
225 public void delete_all() throws IOException, SQLException { 208 public void delete_all() throws IOException {
226 boolean commit = !writeLock.isHeldByCurrentThread(); 209 boolean commit = !writeLock.isHeldByCurrentThread();
227 writeLock.lock(); 210 writeLock.lock();
228 try { 211 try {
229 writer.deleteAll(); 212 writer.deleteAll();
230 id = 0; 213 id = 0;
231 if( postgresBackup != null )
232 postgresBackup.deleteAll();
233 if(commit) writer.commit(); 214 if(commit) writer.commit();
234 } finally { 215 } finally {
235 wrote(); 216 wrote();
236 writeLock.unlock(); 217 writeLock.unlock();
237 } 218 }
241 BytesRef br = new BytesRef(); 222 BytesRef br = new BytesRef();
242 NumericUtils.longToPrefixCoded(value,0,br); 223 NumericUtils.longToPrefixCoded(value,0,br);
243 return new Term(key,br); 224 return new Term(key,br);
244 } 225 }
245 226
246 private static final Set<String> ID_SET = Collections.singleton("id");
247
248 private void backupDelete(Query query)
249 throws IOException, SQLException, LuanException
250 {
251 if( postgresBackup != null ) {
252 final List<Long> ids = new ArrayList<Long>();
253 IndexSearcher searcher = openSearcher();
254 try {
255 MyCollector col = new MyCollector() {
256 @Override public void collect(int iDoc) throws IOException {
257 Document doc = searcher.doc( docBase + iDoc, ID_SET );
258 Long id = (Long)doc.getField("id").numericValue();
259 ids.add(id);
260 }
261 };
262 searcher.search(query,col);
263 } finally {
264 close(searcher);
265 }
266 postgresBackup.begin();
267 for( Long id : ids ) {
268 postgresBackup.delete(id);
269 }
270 postgresBackup.commit();
271 }
272 }
273
274 public void delete(String queryStr) 227 public void delete(String queryStr)
275 throws IOException, ParseException, SQLException, LuanException 228 throws IOException, ParseException, LuanException
276 { 229 {
277 Query query = GoodQueryParser.parseQuery(mfp,queryStr); 230 Query query = GoodQueryParser.parseQuery(mfp,queryStr);
278 231
279 boolean commit = !writeLock.isHeldByCurrentThread(); 232 boolean commit = !writeLock.isHeldByCurrentThread();
280 writeLock.lock(); 233 writeLock.lock();
281 try { 234 try {
282 backupDelete(query);
283 writer.deleteDocuments(query); 235 writer.deleteDocuments(query);
284 if(commit) writer.commit(); 236 if(commit) writer.commit();
285 } finally { 237 } finally {
286 wrote(); 238 wrote();
287 writeLock.unlock(); 239 writeLock.unlock();
303 writeLock.unlock(); 255 writeLock.unlock();
304 } 256 }
305 } 257 }
306 258
307 public void save( Luan luan, LuanTable doc, LuanTable unstored, Map<String,Float> boosts ) 259 public void save( Luan luan, LuanTable doc, LuanTable unstored, Map<String,Float> boosts )
308 throws LuanException, IOException, SQLException 260 throws LuanException, IOException
309 { 261 {
310 Object obj = doc.get(luan,"id"); 262 Object obj = doc.get(luan,"id");
311 Long id; 263 Long id;
312 try { 264 try {
313 id = (Long)obj; 265 id = (Long)obj;
323 throw new LuanException("unstored required with boosts"); 275 throw new LuanException("unstored required with boosts");
324 if( boosts == null ) 276 if( boosts == null )
325 throw new LuanException("boosts required with unstored"); 277 throw new LuanException("boosts required with unstored");
326 if( id != null ) 278 if( id != null )
327 throw new LuanException("update not supported"); 279 throw new LuanException("update not supported");
328 if( postgresBackup != null )
329 throw new LuanException("not supported with postgres backup");
330 if( !(writer instanceof LuceneIndexWriter) ) 280 if( !(writer instanceof LuceneIndexWriter) )
331 throw new LuanException("not supported with index logging"); 281 throw new LuanException("not supported with index logging");
332 id = ++this.id; 282 id = ++this.id;
333 doc.put(luan,"id",id); 283 doc.put(luan,"id",id);
334 LuceneIndexWriter liw = (LuceneIndexWriter)writer; 284 LuceneIndexWriter liw = (LuceneIndexWriter)writer;
335 liw.addDocument( toLucene(doc), toLucene(unstored), boosts ); 285 liw.addDocument( toLucene(doc), toLucene(unstored), boosts );
336 } else if( id == null ) { 286 } else if( id == null ) {
337 id = ++this.id; 287 id = ++this.id;
338 doc.put(luan,"id",id); 288 doc.put(luan,"id",id);
339 if( postgresBackup != null )
340 postgresBackup.add(luan,doc);
341 writer.addDocument(toLucene(doc)); 289 writer.addDocument(toLucene(doc));
342 } else { 290 } else {
343 if( postgresBackup != null )
344 postgresBackup.update(luan,doc);
345 writer.updateDocument( "id", toLucene(doc) ); 291 writer.updateDocument( "id", toLucene(doc) );
346 } 292 }
347 if(commit) writer.commit(); 293 if(commit) writer.commit();
348 } finally { 294 } finally {
349 wrote(); 295 wrote();
354 public boolean is_in_transaction() { 300 public boolean is_in_transaction() {
355 return writeLock.isHeldByCurrentThread(); 301 return writeLock.isHeldByCurrentThread();
356 } 302 }
357 303
358 public Object run_in_transaction(Luan luan,LuanFunction fn) 304 public Object run_in_transaction(Luan luan,LuanFunction fn)
359 throws IOException, LuanException, SQLException 305 throws IOException, LuanException
360 { 306 {
361 boolean commit = !writeLock.isHeldByCurrentThread(); 307 boolean commit = !writeLock.isHeldByCurrentThread();
362 writeLock.lock(); 308 writeLock.lock();
363 boolean ok = false; 309 boolean ok = false;
364 try { 310 try {
365 if( commit && postgresBackup != null )
366 postgresBackup.begin();
367 Object rtn = fn.call(luan); 311 Object rtn = fn.call(luan);
368 ok = true; 312 ok = true;
369 if(commit) { 313 if(commit) {
370 if( postgresBackup != null )
371 postgresBackup.commit();
372 writer.commit(); 314 writer.commit();
373 } 315 }
374 return rtn; 316 return rtn;
375 } finally { 317 } finally {
376 if( !ok && commit ) { 318 if( !ok && commit ) {
377 if( postgresBackup != null )
378 postgresBackup.rollback();
379 writer.rollback(); 319 writer.rollback();
380 reopen(); 320 reopen();
381 } 321 }
382 wrote(); 322 wrote();
383 writeLock.unlock(); 323 writeLock.unlock();
472 protected void finalize() throws Throwable { 412 protected void finalize() throws Throwable {
473 close(); 413 close();
474 super.finalize(); 414 super.finalize();
475 } 415 }
476 416
477 public void close() throws IOException, SQLException { 417 public void close() throws IOException {
478 closeWriter(); 418 closeWriter();
479 reader.close(); 419 reader.close();
480 } 420 }
481 421
482 private void closeWriter() throws IOException, SQLException { 422 private void closeWriter() throws IOException {
483 writeLock.lock(); 423 writeLock.lock();
484 try { 424 try {
485 writer.close(); 425 writer.close();
486 if( postgresBackup != null )
487 postgresBackup.close();
488 } finally { 426 } finally {
489 writeLock.unlock(); 427 writeLock.unlock();
490 } 428 }
491 } 429 }
492 430
715 ts.close(); 653 ts.close();
716 return n; 654 return n;
717 } 655 }
718 656
719 657
720
721 public boolean hasPostgresBackup() {
722 return postgresBackup != null;
723 }
724
725 public void rebuild_postgres_backup(Luan luan)
726 throws IOException, LuanException, SQLException
727 {
728 logger.info("start rebuild_postgres_backup");
729 writeLock.lock();
730 IndexSearcher searcher = openSearcher();
731 boolean ok = false;
732 try {
733 postgresBackup.begin();
734 postgresBackup.deleteAll();
735 Query query = new PrefixQuery(new Term("id"));
736 MyCollector col = new MyCollector() {
737 @Override public void collect(int iDoc) throws IOException {
738 try {
739 Document doc = searcher.doc( docBase + iDoc );
740 LuanTable tbl = toTable(doc);
741 postgresBackup.add(luan,tbl);
742 } catch(LuanException e) {
743 throw new LuanRuntimeException(e);
744 } catch(SQLException e) {
745 throw new RuntimeException(e);
746 }
747 }
748 };
749 try {
750 searcher.search(query,col);
751 } catch(LuanRuntimeException e) {
752 throw (LuanException)e.getCause();
753 }
754 ok = true;
755 postgresBackup.commit();
756 } finally {
757 close(searcher);
758 if( !ok )
759 postgresBackup.rollback();
760 writeLock.unlock();
761 }
762 logger.info("end rebuild_postgres_backup");
763 }
764
765 public void restore_from_postgres()
766 throws IOException, LuanException, SQLException, ParseException
767 {
768 if( postgresBackup!=null && wasCreated && !postgresBackup.wasCreated ) {
769 logger.error("restoring from postgres");
770 force_restore_from_postgres();
771 }
772 }
773
774 public void force_restore_from_postgres()
775 throws IOException, LuanException, SQLException, ParseException
776 {
777 logger.warn("start restore_from_postgres");
778 if( postgresBackup==null )
779 throw new NullPointerException();
780 if( writeLock.isHeldByCurrentThread() )
781 throw new RuntimeException();
782 writeLock.lock();
783 boolean ok = false;
784 try {
785 writer.tag("restore_from_postgres");
786 writer.deleteAll();
787 postgresBackup.restoreLucene(this);
788 ok = true;
789 writer.commit();
790 wrote();
791 ensure_open(); // refresh searcher
792 initId();
793 wasCreated = false;
794 } finally {
795 if( !ok ) {
796 writer.rollback();
797 reopen();
798 }
799 wrote();
800 writeLock.unlock();
801 }
802 logger.warn("end restore_from_postgres");
803 }
804
805 void restore(LuanTable doc) 658 void restore(LuanTable doc)
806 throws LuanException, IOException 659 throws LuanException, IOException
807 { 660 {
808 writer.addDocument(toLucene(doc)); 661 writer.addDocument(toLucene(doc));
809 } 662 }
821 } 674 }
822 logger.info("end relog"); 675 logger.info("end relog");
823 } 676 }
824 677
825 public void restore_from_log(Luan luan,LuanFunction handler) 678 public void restore_from_log(Luan luan,LuanFunction handler)
826 throws IOException, LuanException, SQLException, ParseException 679 throws IOException, LuanException, ParseException
827 { 680 {
828 LoggingIndexWriter loggingWriter = (LoggingIndexWriter)writer; 681 LoggingIndexWriter loggingWriter = (LoggingIndexWriter)writer;
829 if( wasCreated && !loggingWriter.wasCreated ) { 682 if( wasCreated && !loggingWriter.wasCreated ) {
830 logger.error("restoring from log"); 683 logger.error("restoring from log");
831 force_restore_from_log(luan,handler); 684 force_restore_from_log(luan,handler);
858 writeLock.unlock(); 711 writeLock.unlock();
859 } 712 }
860 logger.warn("end force_restore_from_log"); 713 logger.warn("end force_restore_from_log");
861 } 714 }
862 715
863 public void check() throws IOException, SQLException, LuanException, ParseException { 716 public void check() throws IOException, LuanException, ParseException {
864 boolean hasPostgres = postgresBackup != null;
865 String msg = "start check"; 717 String msg = "start check";
866 if( hasPostgres )
867 msg += " with postgres";
868 logger.info(msg); 718 logger.info(msg);
869 CheckIndex.Status status = new CheckIndex(fsDir).checkIndex(); 719 CheckIndex.Status status = new CheckIndex(fsDir).checkIndex();
870 if( !status.clean ) 720 if( !status.clean )
871 logger.error("index not clean"); 721 logger.error("index not clean");
872 if( writer instanceof LoggingIndexWriter ) { 722 if( writer instanceof LoggingIndexWriter ) {
873 LoggingIndexWriter loggingWriter = (LoggingIndexWriter)writer; 723 LoggingIndexWriter loggingWriter = (LoggingIndexWriter)writer;
874 logger.info("log check"); 724 logger.info("log check");
875 boolean ok = loggingWriter.check(ID_SORT); 725 boolean ok = loggingWriter.check(ID_SORT);
876 } 726 }
877 if( hasPostgres ) {
878 logger.info("postgres check");
879 checkPostgres();
880 }
881 logger.info("end check"); 727 logger.info("end check");
882 } 728 }
883 729
884 private void checkPostgres()
885 throws IOException, SQLException, LuanException, ParseException
886 {
887 final PostgresBackup.Checker postgresChecker = postgresBackup.newChecker();
888 final IndexSearcher searcher = openSearcher();
889 try {
890 final List<Long> idsLucene = new ArrayList<Long>();
891 Query query = new PrefixQuery(new Term("id"));
892 MyCollector col = new MyCollector() {
893 @Override public void collect(int iDoc) throws IOException {
894 Document doc = searcher.doc( docBase + iDoc );
895 Long id = (Long)doc.getField("id").numericValue();
896 idsLucene.add(id);
897 }
898 };
899 searcher.search(query,col);
900 Collections.sort(idsLucene);
901 final List<Long> idsPostgres = postgresChecker.getIds();
902 final int nLucene = idsLucene.size();
903 final int nPostgres = idsPostgres.size();
904 int iLucene = 0;
905 int iPostgres = 0;
906 LuanToString lts = new LuanToString();
907 lts.settingsInit.strict = true;
908 lts.settingsInit.numberTypes = true;
909 while( iLucene < nLucene && iPostgres < nPostgres ) {
910 long idLucene = idsLucene.get(iLucene);
911 long idPostgres = idsPostgres.get(iPostgres);
912 if( idLucene < idPostgres ) {
913 iLucene++;
914 checkPostgres(postgresChecker,lts,idLucene);
915 } else if( idLucene > idPostgres ) {
916 iPostgres++;
917 checkPostgres(postgresChecker,lts,idPostgres);
918 } else { // ==
919 LuanTable docPostgres = postgresChecker.getDoc(idPostgres);
920 TopDocs td = searcher.search(new TermQuery(term("id",idLucene)),1);
921 if( td.totalHits != 1 ) throw new RuntimeException();
922 Document doc = searcher.doc( td.scoreDocs[0].doc );
923 LuanTable docLucene = toTable(doc);
924 if( !equal(docPostgres,docLucene) ) {
925 checkPostgres(postgresChecker,lts,idPostgres);
926 }
927 iLucene++;
928 iPostgres++;
929 }
930 }
931 while( iLucene < nLucene ) {
932 long idLucene = idsLucene.get(iLucene++);
933 checkPostgres(postgresChecker,lts,idLucene);
934 }
935 while( iPostgres < nPostgres ) {
936 long idPostgres = idsPostgres.get(iPostgres++);
937 checkPostgres(postgresChecker,lts,idPostgres);
938 }
939 } finally {
940 close(searcher);
941 postgresChecker.close();
942 }
943 }
944
945 private void checkPostgres(PostgresBackup.Checker postgresChecker,LuanToString lts,long id)
946 throws IOException, SQLException, LuanException, ParseException
947 {
948 //logger.info("check id "+id);
949 writeLock.lock();
950 try {
951 final IndexSearcher searcher = openSearcher();
952 try {
953 LuanTable docPostgres = postgresChecker.getDoc(id);
954 TopDocs td = searcher.search(new TermQuery(term("id",id)),1);
955 LuanTable docLucene;
956 if( td.totalHits == 0 ) {
957 docLucene = null;
958 } else if( td.totalHits == 1 ) {
959 Document doc = searcher.doc( td.scoreDocs[0].doc );
960 docLucene = toTable(doc);
961 } else
962 throw new RuntimeException();
963 if( docPostgres == null ) {
964 if( docLucene != null )
965 logger.error("id "+id+" found in lucene but not postgres");
966 return;
967 }
968 if( docLucene == null ) {
969 logger.error("id "+id+" found in postgres but not lucene");
970 return;
971 }
972 if( !equal(docPostgres,docLucene) ) {
973 logger.error("id "+id+" not equal");
974 logger.error("lucene = "+lts.toString(docLucene));
975 logger.error("postgres = "+lts.toString(docPostgres));
976 }
977 } finally {
978 close(searcher);
979 }
980 } finally {
981 writeLock.unlock();
982 }
983 }
984
985 private static boolean equal(LuanTable t1,LuanTable t2) throws LuanException {
986 return t1!=null && t2!=null && toJava(t1).equals(toJava(t2));
987 }
988
989 private static Map toJava(LuanTable t) throws LuanException {
990 Map map = t.asMap();
991 for( Iterator iter = map.entrySet().iterator(); iter.hasNext(); ) {
992 Map.Entry entry = (Map.Entry)iter.next();
993 Object value = entry.getValue();
994 if( value instanceof LuanTable ) {
995 LuanTable v = (LuanTable)value;
996 if( !v.isList() )
997 logger.error("not list");
998 List list = v.asList();
999 if( list.isEmpty() ) {
1000 iter.remove();
1001 } else if( list.size() == 1 ) {
1002 entry.setValue(list.get(0));
1003 } else {
1004 entry.setValue(list);
1005 }
1006 }
1007 }
1008 return map;
1009 }
1010 } 730 }