comparison src/luan/modules/lucene/LuceneIndex.java @ 1453:928be2a4d565

fix postgres backup
author Franklin Schmidt <fschmidt@gmail.com>
date Mon, 02 Mar 2020 15:09:10 -0700
parents 851b9a48cc44
children 219f2b937f2b
comparison
equal deleted inserted replaced
1452:c15444f4da05 1453:928be2a4d565
132 private FSDirectory fsDir; 132 private FSDirectory fsDir;
133 private int writeCount; 133 private int writeCount;
134 private AtomicInteger writeCounter = new AtomicInteger(); 134 private AtomicInteger writeCounter = new AtomicInteger();
135 135
136 private Set<String> indexOnly = new HashSet<String>(); 136 private Set<String> indexOnly = new HashSet<String>();
137 private final FieldParser defaultFieldParser; 137 // private final FieldParser defaultFieldParser;
138 private final String[] defaultFields; 138 // private final String[] defaultFields;
139 139
140 private final PostgresBackup postgresBackup; 140 private final PostgresBackup postgresBackup;
141 private boolean wasCreated; 141 private boolean wasCreated;
142 142
143 private LuceneIndex(Luan luan,File indexDir,LuanTable options) 143 private LuceneIndex(Luan luan,File indexDir,LuanTable options)
146 options = new LuanTable(options); 146 options = new LuanTable(options);
147 this.version = options.remove("version"); 147 this.version = options.remove("version");
148 FieldParser defaultFieldParser = (FieldParser)options.remove("default_type"); 148 FieldParser defaultFieldParser = (FieldParser)options.remove("default_type");
149 LuanTable defaultFieldsTbl = Utils.removeTable(options,"default_fields"); 149 LuanTable defaultFieldsTbl = Utils.removeTable(options,"default_fields");
150 String[] defaultFields = defaultFieldsTbl==null ? null : (String[])defaultFieldsTbl.asList().toArray(new String[0]); 150 String[] defaultFields = defaultFieldsTbl==null ? null : (String[])defaultFieldsTbl.asList().toArray(new String[0]);
151 LuanFunction completer = Utils.removeFunction(options,"completer");
152 LuanTable postgresSpec = Utils.removeTable(options,"postgres_spec"); 151 LuanTable postgresSpec = Utils.removeTable(options,"postgres_spec");
153 Utils.checkEmpty(options); 152 Utils.checkEmpty(options);
154 153
155 this.luanLogger = luan.getLogger(LuceneIndex.class); 154 this.luanLogger = luan.getLogger(LuceneIndex.class);
156 this.defaultFieldParser = defaultFieldParser; 155 // this.defaultFieldParser = defaultFieldParser;
157 this.defaultFields = defaultFields; 156 // this.defaultFields = defaultFields;
158 mfp = defaultFieldParser==null ? new MultiFieldParser() : new MultiFieldParser(defaultFieldParser,defaultFields); 157 mfp = defaultFieldParser==null ? new MultiFieldParser() : new MultiFieldParser(defaultFieldParser,defaultFields);
159 mfp.fields.put( "type", STRING_FIELD_PARSER ); 158 mfp.fields.put( "type", STRING_FIELD_PARSER );
160 mfp.fields.put( "id", NumberFieldParser.LONG ); 159 mfp.fields.put( "id", NumberFieldParser.LONG );
161 this.indexDir = indexDir; 160 this.indexDir = indexDir;
162 Analyzer analyzer = STRING_FIELD_PARSER.analyzer; 161 Analyzer analyzer = STRING_FIELD_PARSER.analyzer;
167 this.analyzer = analyzer; 166 this.analyzer = analyzer;
168 wasCreated = reopen(); 167 wasCreated = reopen();
169 if( postgresSpec == null ) { 168 if( postgresSpec == null ) {
170 postgresBackup = null; 169 postgresBackup = null;
171 } else { 170 } else {
172 if( completer == null )
173 throw new LuanException("completer is required for postgres_spec");
174 postgresBackup = new PostgresBackup(luan,postgresSpec); 171 postgresBackup = new PostgresBackup(luan,postgresSpec);
175 if( !wasCreated && postgresBackup.wasCreated ) { 172 if( !wasCreated && postgresBackup.wasCreated ) {
176 luanLogger.error("rebuilding postgres backup"); 173 luanLogger.error("rebuilding postgres backup");
177 rebuild_postgres_backup(completer); 174 rebuild_postgres_backup(luan);
178 /* 175 /*
179 } else if( wasCreated && !postgresBackup.wasCreated ) { 176 } else if( wasCreated && !postgresBackup.wasCreated ) {
180 luanLogger.error("restoring from postgres"); 177 luanLogger.error("restoring from postgres");
181 restore_from_postgres(); 178 restore_from_postgres();
182 */ 179 */
268 265
269 public void indexed_only_fields(List<String> fields) { 266 public void indexed_only_fields(List<String> fields) {
270 indexOnly.addAll(fields); 267 indexOnly.addAll(fields);
271 } 268 }
272 269
273 public void save(LuanTable doc,LuanTable boosts) 270 public void save(LuanFunction completer,LuanTable doc,LuanTable boosts)
274 throws LuanException, IOException, SQLException 271 throws LuanException, IOException, SQLException
275 { 272 {
276 if( boosts!=null && postgresBackup!=null ) 273 if( boosts!=null && postgresBackup!=null )
277 throw new LuanException("boosts are not saved to postgres backup"); 274 throw new LuanException("boosts are not saved to postgres backup");
278 275
290 if( id == null ) { 287 if( id == null ) {
291 id = nextId(); 288 id = nextId();
292 doc.put("id",id); 289 doc.put("id",id);
293 if( postgresBackup != null ) 290 if( postgresBackup != null )
294 postgresBackup.add(doc); 291 postgresBackup.add(doc);
295 writer.addDocument(toLucene(doc,boosts)); 292 writer.addDocument(toLucene(completer,doc,boosts));
296 } else { 293 } else {
297 if( postgresBackup != null ) 294 if( postgresBackup != null )
298 postgresBackup.update(doc); 295 postgresBackup.update(doc);
299 writer.updateDocument( term("id",id), toLucene(doc,boosts) ); 296 writer.updateDocument( term("id",id), toLucene(completer,doc,boosts) );
300 } 297 }
301 if(commit) writer.commit(); 298 if(commit) writer.commit();
302 } finally { 299 } finally {
303 wrote(); 300 wrote();
304 writeLock.unlock(); 301 writeLock.unlock();
646 return new StoredField(name, b); 643 return new StoredField(name, b);
647 } else 644 } else
648 throw new LuanException("invalid value type "+value.getClass()+"' for '"+name+"'"); 645 throw new LuanException("invalid value type "+value.getClass()+"' for '"+name+"'");
649 } 646 }
650 647
651 private Document toLucene(LuanTable table,LuanTable boosts) throws LuanException { 648 private Document toLucene(LuanFunction completer,LuanTable table,LuanTable boosts) throws LuanException {
649 if( completer != null )
650 table = (LuanTable)completer.call(table);
652 return toLucene(table.iterable(),boosts); 651 return toLucene(table.iterable(),boosts);
653 } 652 }
654 653
655 private Document toLucene(Iterable<Map.Entry> iterable,LuanTable boosts) throws LuanException { 654 private Document toLucene(Iterable<Map.Entry> iterable,LuanTable boosts) throws LuanException {
656 Set<String> indexed = mfp.fields.keySet(); 655 Set<String> indexed = mfp.fields.keySet();
802 801
803 public boolean hasPostgresBackup() { 802 public boolean hasPostgresBackup() {
804 return postgresBackup != null; 803 return postgresBackup != null;
805 } 804 }
806 805
807 public void rebuild_postgres_backup(LuanFunction completer) 806 public void rebuild_postgres_backup(Luan luan)
808 throws IOException, LuanException, SQLException 807 throws IOException, LuanException, SQLException
809 { 808 {
810 luanLogger.info("start rebuild_postgres_backup"); 809 luanLogger.info("start rebuild_postgres_backup");
811 writeLock.lock(); 810 writeLock.lock();
812 IndexSearcher searcher = openSearcher(); 811 IndexSearcher searcher = openSearcher();
817 Query query = new PrefixQuery(new Term("id")); 816 Query query = new PrefixQuery(new Term("id"));
818 MyCollector col = new MyCollector() { 817 MyCollector col = new MyCollector() {
819 @Override public void collect(int iDoc) throws IOException { 818 @Override public void collect(int iDoc) throws IOException {
820 try { 819 try {
821 Document doc = searcher.doc( docBase + iDoc ); 820 Document doc = searcher.doc( docBase + iDoc );
822 LuanTable tbl = toTable(completer.luan(),doc); 821 LuanTable tbl = toTable(luan,doc);
823 tbl = (LuanTable)completer.call(tbl);
824 postgresBackup.add(tbl); 822 postgresBackup.add(tbl);
825 } catch(LuanException e) { 823 } catch(LuanException e) {
826 throw new LuanRuntimeException(e); 824 throw new LuanRuntimeException(e);
827 } catch(SQLException e) { 825 } catch(SQLException e) {
828 throw new RuntimeException(e); 826 throw new RuntimeException(e);
843 writeLock.unlock(); 841 writeLock.unlock();
844 } 842 }
845 luanLogger.info("end rebuild_postgres_backup"); 843 luanLogger.info("end rebuild_postgres_backup");
846 } 844 }
847 845
848 public void restore_from_postgres() 846 public void restore_from_postgres(LuanFunction completer)
849 throws IOException, LuanException, SQLException, ParseException 847 throws IOException, LuanException, SQLException, ParseException
850 { 848 {
851 if( postgresBackup!=null && wasCreated && !postgresBackup.wasCreated ) { 849 if( postgresBackup!=null && wasCreated && !postgresBackup.wasCreated ) {
852 luanLogger.error("restoring from postgres"); 850 luanLogger.error("restoring from postgres");
853 force_restore_from_postgres(); 851 force_restore_from_postgres(completer);
854 } 852 }
855 } 853 }
856 854
857 public void force_restore_from_postgres() 855 public void force_restore_from_postgres(LuanFunction completer)
858 throws IOException, LuanException, SQLException, ParseException 856 throws IOException, LuanException, SQLException, ParseException
859 { 857 {
860 luanLogger.warn("start restore_from_postgres"); 858 luanLogger.warn("start restore_from_postgres");
861 if( postgresBackup==null ) 859 if( postgresBackup==null )
862 throw new NullPointerException(); 860 throw new NullPointerException();
865 writeLock.lock(); 863 writeLock.lock();
866 boolean ok = false; 864 boolean ok = false;
867 try { 865 try {
868 writer.deleteAll(); 866 writer.deleteAll();
869 long nextId = postgresBackup.maxId() + 1; 867 long nextId = postgresBackup.maxId() + 1;
870 postgresBackup.restoreLucene(this); 868 postgresBackup.restoreLucene(this,completer);
871 id = idLim = nextId; 869 id = idLim = nextId;
872 saveNextId(nextId); 870 saveNextId(nextId);
873 ok = true; 871 ok = true;
874 writer.commit(); 872 writer.commit();
875 wasCreated = false; 873 wasCreated = false;
882 writeLock.unlock(); 880 writeLock.unlock();
883 } 881 }
884 luanLogger.warn("end restore_from_postgres"); 882 luanLogger.warn("end restore_from_postgres");
885 } 883 }
886 884
887 void restore(LuanTable doc) 885 void restore(LuanFunction completer,LuanTable doc)
888 throws LuanException, IOException 886 throws LuanException, IOException
889 { 887 {
890 writer.addDocument(toLucene(doc,null)); 888 writer.addDocument(toLucene(completer,doc,null));
891 } 889 }
892 890
893 public void check(LuanFunction completer) throws IOException, SQLException, LuanException, ParseException { 891 public void check(Luan luan) throws IOException, SQLException, LuanException, ParseException {
894 boolean hasPostgres = postgresBackup != null; 892 boolean hasPostgres = postgresBackup != null;
895 String msg = "start check"; 893 String msg = "start check";
896 if( hasPostgres ) 894 if( hasPostgres )
897 msg += " with postgres"; 895 msg += " with postgres";
898 luanLogger.info(msg); 896 luanLogger.info(msg);
899 CheckIndex.Status status = new CheckIndex(fsDir).checkIndex(); 897 CheckIndex.Status status = new CheckIndex(fsDir).checkIndex();
900 if( !status.clean ) 898 if( !status.clean )
901 luanLogger.error("index not clean"); 899 luanLogger.error("index not clean");
902 if( hasPostgres ) 900 if( hasPostgres )
903 checkPostgres(completer); 901 checkPostgres(luan);
904 luanLogger.info("end check"); 902 luanLogger.info("end check");
905 } 903 }
906 904
907 private void checkPostgres(LuanFunction completer) 905 private void checkPostgres(Luan luan)
908 throws IOException, SQLException, LuanException, ParseException 906 throws IOException, SQLException, LuanException, ParseException
909 { 907 {
910 //luanLogger.info("start postgres check"); 908 //luanLogger.info("start postgres check");
911 final PostgresBackup.Checker postgresChecker = postgresBackup.newChecker(); 909 final PostgresBackup.Checker postgresChecker = postgresBackup.newChecker();
912 final IndexSearcher searcher = openSearcher(); 910 final IndexSearcher searcher = openSearcher();
933 while( iLucene < nLucene && iPostgres < nPostgres ) { 931 while( iLucene < nLucene && iPostgres < nPostgres ) {
934 long idLucene = idsLucene.get(iLucene); 932 long idLucene = idsLucene.get(iLucene);
935 long idPostgres = idsPostgres.get(iPostgres); 933 long idPostgres = idsPostgres.get(iPostgres);
936 if( idLucene < idPostgres ) { 934 if( idLucene < idPostgres ) {
937 iLucene++; 935 iLucene++;
938 checkPostgres(completer,postgresChecker,lts,idLucene); 936 checkPostgres(luan,postgresChecker,lts,idLucene);
939 } else if( idLucene > idPostgres ) { 937 } else if( idLucene > idPostgres ) {
940 iPostgres++; 938 iPostgres++;
941 checkPostgres(completer,postgresChecker,lts,idPostgres); 939 checkPostgres(luan,postgresChecker,lts,idPostgres);
942 } else { // == 940 } else { // ==
943 LuanTable docPostgres = postgresChecker.getDoc(idPostgres); 941 LuanTable docPostgres = postgresChecker.getDoc(idPostgres);
944 TopDocs td = searcher.search(new TermQuery(term("id",idLucene)),1); 942 TopDocs td = searcher.search(new TermQuery(term("id",idLucene)),1);
945 if( td.totalHits != 1 ) throw new RuntimeException(); 943 if( td.totalHits != 1 ) throw new RuntimeException();
946 Document doc = searcher.doc( td.scoreDocs[0].doc ); 944 Document doc = searcher.doc( td.scoreDocs[0].doc );
947 LuanTable docLucene = toTable(completer.luan(),doc); 945 LuanTable docLucene = toTable(luan,doc);
948 docLucene = (LuanTable)completer.call(docLucene);
949 if( !equal(docPostgres,docLucene) ) { 946 if( !equal(docPostgres,docLucene) ) {
950 checkPostgres(completer,postgresChecker,lts,idPostgres); 947 checkPostgres(luan,postgresChecker,lts,idPostgres);
951 } 948 }
952 iLucene++; 949 iLucene++;
953 iPostgres++; 950 iPostgres++;
954 } 951 }
955 } 952 }
956 while( iLucene < nLucene ) { 953 while( iLucene < nLucene ) {
957 long idLucene = idsLucene.get(iLucene++); 954 long idLucene = idsLucene.get(iLucene++);
958 checkPostgres(completer,postgresChecker,lts,idLucene); 955 checkPostgres(luan,postgresChecker,lts,idLucene);
959 } 956 }
960 while( iPostgres < nPostgres ) { 957 while( iPostgres < nPostgres ) {
961 long idPostgres = idsPostgres.get(iPostgres++); 958 long idPostgres = idsPostgres.get(iPostgres++);
962 checkPostgres(completer,postgresChecker,lts,idPostgres); 959 checkPostgres(luan,postgresChecker,lts,idPostgres);
963 } 960 }
964 } finally { 961 } finally {
965 close(searcher); 962 close(searcher);
966 postgresChecker.close(); 963 postgresChecker.close();
967 } 964 }
968 } 965 }
969 966
970 private void checkPostgres(LuanFunction completer,PostgresBackup.Checker postgresChecker,LuanToString lts,long id) 967 private void checkPostgres(Luan luan,PostgresBackup.Checker postgresChecker,LuanToString lts,long id)
971 throws IOException, SQLException, LuanException, ParseException 968 throws IOException, SQLException, LuanException, ParseException
972 { 969 {
973 //luanLogger.info("check id "+id); 970 //luanLogger.info("check id "+id);
974 writeLock.lock(); 971 writeLock.lock();
975 try { 972 try {
980 LuanTable docLucene; 977 LuanTable docLucene;
981 if( td.totalHits == 0 ) { 978 if( td.totalHits == 0 ) {
982 docLucene = null; 979 docLucene = null;
983 } else if( td.totalHits == 1 ) { 980 } else if( td.totalHits == 1 ) {
984 Document doc = searcher.doc( td.scoreDocs[0].doc ); 981 Document doc = searcher.doc( td.scoreDocs[0].doc );
985 docLucene = toTable(completer.luan(),doc); 982 docLucene = toTable(luan,doc);
986 docLucene = (LuanTable)completer.call(docLucene);
987 } else 983 } else
988 throw new RuntimeException(); 984 throw new RuntimeException();
989 if( docPostgres == null ) { 985 if( docPostgres == null ) {
990 if( docLucene != null ) 986 if( docLucene != null )
991 luanLogger.error("id "+id+" found in lucene but not postgres"); 987 luanLogger.error("id "+id+" found in lucene but not postgres");
1006 } finally { 1002 } finally {
1007 writeLock.unlock(); 1003 writeLock.unlock();
1008 } 1004 }
1009 } 1005 }
1010 1006
1011 private boolean equal(LuanTable t1,LuanTable t2) throws LuanException { 1007 private static boolean equal(LuanTable t1,LuanTable t2) throws LuanException {
1012 return t1!=null && t2!=null && t1.asMap().equals(t2.asMap()); 1008 return t1!=null && t2!=null && toJava(t1).equals(toJava(t2));
1013 } 1009 }
1014 1010
1011 private static Map toJava(LuanTable t) throws LuanException {
1012 Map map = t.asMap();
1013 for( Object obj : map.entrySet() ) {
1014 Map.Entry entry = (Map.Entry)obj;
1015 Object value = entry.getValue();
1016 if( value instanceof LuanTable ) {
1017 LuanTable v = (LuanTable)value;
1018 if( !v.isList() )
1019 sysLogger.error("not list");
1020 entry.setValue(v.asList());
1021 }
1022 }
1023 return map;
1024 }
1015 } 1025 }