Mercurial Hosting > luan
comparison src/luan/modules/lucene/LuceneIndex.java @ 1453:928be2a4d565
fix postgres backup
author | Franklin Schmidt <fschmidt@gmail.com> |
---|---|
date | Mon, 02 Mar 2020 15:09:10 -0700 |
parents | 851b9a48cc44 |
children | 219f2b937f2b |
comparison
equal
deleted
inserted
replaced
1452:c15444f4da05 | 1453:928be2a4d565 |
---|---|
132 private FSDirectory fsDir; | 132 private FSDirectory fsDir; |
133 private int writeCount; | 133 private int writeCount; |
134 private AtomicInteger writeCounter = new AtomicInteger(); | 134 private AtomicInteger writeCounter = new AtomicInteger(); |
135 | 135 |
136 private Set<String> indexOnly = new HashSet<String>(); | 136 private Set<String> indexOnly = new HashSet<String>(); |
137 private final FieldParser defaultFieldParser; | 137 // private final FieldParser defaultFieldParser; |
138 private final String[] defaultFields; | 138 // private final String[] defaultFields; |
139 | 139 |
140 private final PostgresBackup postgresBackup; | 140 private final PostgresBackup postgresBackup; |
141 private boolean wasCreated; | 141 private boolean wasCreated; |
142 | 142 |
143 private LuceneIndex(Luan luan,File indexDir,LuanTable options) | 143 private LuceneIndex(Luan luan,File indexDir,LuanTable options) |
146 options = new LuanTable(options); | 146 options = new LuanTable(options); |
147 this.version = options.remove("version"); | 147 this.version = options.remove("version"); |
148 FieldParser defaultFieldParser = (FieldParser)options.remove("default_type"); | 148 FieldParser defaultFieldParser = (FieldParser)options.remove("default_type"); |
149 LuanTable defaultFieldsTbl = Utils.removeTable(options,"default_fields"); | 149 LuanTable defaultFieldsTbl = Utils.removeTable(options,"default_fields"); |
150 String[] defaultFields = defaultFieldsTbl==null ? null : (String[])defaultFieldsTbl.asList().toArray(new String[0]); | 150 String[] defaultFields = defaultFieldsTbl==null ? null : (String[])defaultFieldsTbl.asList().toArray(new String[0]); |
151 LuanFunction completer = Utils.removeFunction(options,"completer"); | |
152 LuanTable postgresSpec = Utils.removeTable(options,"postgres_spec"); | 151 LuanTable postgresSpec = Utils.removeTable(options,"postgres_spec"); |
153 Utils.checkEmpty(options); | 152 Utils.checkEmpty(options); |
154 | 153 |
155 this.luanLogger = luan.getLogger(LuceneIndex.class); | 154 this.luanLogger = luan.getLogger(LuceneIndex.class); |
156 this.defaultFieldParser = defaultFieldParser; | 155 // this.defaultFieldParser = defaultFieldParser; |
157 this.defaultFields = defaultFields; | 156 // this.defaultFields = defaultFields; |
158 mfp = defaultFieldParser==null ? new MultiFieldParser() : new MultiFieldParser(defaultFieldParser,defaultFields); | 157 mfp = defaultFieldParser==null ? new MultiFieldParser() : new MultiFieldParser(defaultFieldParser,defaultFields); |
159 mfp.fields.put( "type", STRING_FIELD_PARSER ); | 158 mfp.fields.put( "type", STRING_FIELD_PARSER ); |
160 mfp.fields.put( "id", NumberFieldParser.LONG ); | 159 mfp.fields.put( "id", NumberFieldParser.LONG ); |
161 this.indexDir = indexDir; | 160 this.indexDir = indexDir; |
162 Analyzer analyzer = STRING_FIELD_PARSER.analyzer; | 161 Analyzer analyzer = STRING_FIELD_PARSER.analyzer; |
167 this.analyzer = analyzer; | 166 this.analyzer = analyzer; |
168 wasCreated = reopen(); | 167 wasCreated = reopen(); |
169 if( postgresSpec == null ) { | 168 if( postgresSpec == null ) { |
170 postgresBackup = null; | 169 postgresBackup = null; |
171 } else { | 170 } else { |
172 if( completer == null ) | |
173 throw new LuanException("completer is required for postgres_spec"); | |
174 postgresBackup = new PostgresBackup(luan,postgresSpec); | 171 postgresBackup = new PostgresBackup(luan,postgresSpec); |
175 if( !wasCreated && postgresBackup.wasCreated ) { | 172 if( !wasCreated && postgresBackup.wasCreated ) { |
176 luanLogger.error("rebuilding postgres backup"); | 173 luanLogger.error("rebuilding postgres backup"); |
177 rebuild_postgres_backup(completer); | 174 rebuild_postgres_backup(luan); |
178 /* | 175 /* |
179 } else if( wasCreated && !postgresBackup.wasCreated ) { | 176 } else if( wasCreated && !postgresBackup.wasCreated ) { |
180 luanLogger.error("restoring from postgres"); | 177 luanLogger.error("restoring from postgres"); |
181 restore_from_postgres(); | 178 restore_from_postgres(); |
182 */ | 179 */ |
268 | 265 |
269 public void indexed_only_fields(List<String> fields) { | 266 public void indexed_only_fields(List<String> fields) { |
270 indexOnly.addAll(fields); | 267 indexOnly.addAll(fields); |
271 } | 268 } |
272 | 269 |
273 public void save(LuanTable doc,LuanTable boosts) | 270 public void save(LuanFunction completer,LuanTable doc,LuanTable boosts) |
274 throws LuanException, IOException, SQLException | 271 throws LuanException, IOException, SQLException |
275 { | 272 { |
276 if( boosts!=null && postgresBackup!=null ) | 273 if( boosts!=null && postgresBackup!=null ) |
277 throw new LuanException("boosts are not saved to postgres backup"); | 274 throw new LuanException("boosts are not saved to postgres backup"); |
278 | 275 |
290 if( id == null ) { | 287 if( id == null ) { |
291 id = nextId(); | 288 id = nextId(); |
292 doc.put("id",id); | 289 doc.put("id",id); |
293 if( postgresBackup != null ) | 290 if( postgresBackup != null ) |
294 postgresBackup.add(doc); | 291 postgresBackup.add(doc); |
295 writer.addDocument(toLucene(doc,boosts)); | 292 writer.addDocument(toLucene(completer,doc,boosts)); |
296 } else { | 293 } else { |
297 if( postgresBackup != null ) | 294 if( postgresBackup != null ) |
298 postgresBackup.update(doc); | 295 postgresBackup.update(doc); |
299 writer.updateDocument( term("id",id), toLucene(doc,boosts) ); | 296 writer.updateDocument( term("id",id), toLucene(completer,doc,boosts) ); |
300 } | 297 } |
301 if(commit) writer.commit(); | 298 if(commit) writer.commit(); |
302 } finally { | 299 } finally { |
303 wrote(); | 300 wrote(); |
304 writeLock.unlock(); | 301 writeLock.unlock(); |
646 return new StoredField(name, b); | 643 return new StoredField(name, b); |
647 } else | 644 } else |
648 throw new LuanException("invalid value type "+value.getClass()+"' for '"+name+"'"); | 645 throw new LuanException("invalid value type "+value.getClass()+"' for '"+name+"'"); |
649 } | 646 } |
650 | 647 |
651 private Document toLucene(LuanTable table,LuanTable boosts) throws LuanException { | 648 private Document toLucene(LuanFunction completer,LuanTable table,LuanTable boosts) throws LuanException { |
649 if( completer != null ) | |
650 table = (LuanTable)completer.call(table); | |
652 return toLucene(table.iterable(),boosts); | 651 return toLucene(table.iterable(),boosts); |
653 } | 652 } |
654 | 653 |
655 private Document toLucene(Iterable<Map.Entry> iterable,LuanTable boosts) throws LuanException { | 654 private Document toLucene(Iterable<Map.Entry> iterable,LuanTable boosts) throws LuanException { |
656 Set<String> indexed = mfp.fields.keySet(); | 655 Set<String> indexed = mfp.fields.keySet(); |
802 | 801 |
803 public boolean hasPostgresBackup() { | 802 public boolean hasPostgresBackup() { |
804 return postgresBackup != null; | 803 return postgresBackup != null; |
805 } | 804 } |
806 | 805 |
807 public void rebuild_postgres_backup(LuanFunction completer) | 806 public void rebuild_postgres_backup(Luan luan) |
808 throws IOException, LuanException, SQLException | 807 throws IOException, LuanException, SQLException |
809 { | 808 { |
810 luanLogger.info("start rebuild_postgres_backup"); | 809 luanLogger.info("start rebuild_postgres_backup"); |
811 writeLock.lock(); | 810 writeLock.lock(); |
812 IndexSearcher searcher = openSearcher(); | 811 IndexSearcher searcher = openSearcher(); |
817 Query query = new PrefixQuery(new Term("id")); | 816 Query query = new PrefixQuery(new Term("id")); |
818 MyCollector col = new MyCollector() { | 817 MyCollector col = new MyCollector() { |
819 @Override public void collect(int iDoc) throws IOException { | 818 @Override public void collect(int iDoc) throws IOException { |
820 try { | 819 try { |
821 Document doc = searcher.doc( docBase + iDoc ); | 820 Document doc = searcher.doc( docBase + iDoc ); |
822 LuanTable tbl = toTable(completer.luan(),doc); | 821 LuanTable tbl = toTable(luan,doc); |
823 tbl = (LuanTable)completer.call(tbl); | |
824 postgresBackup.add(tbl); | 822 postgresBackup.add(tbl); |
825 } catch(LuanException e) { | 823 } catch(LuanException e) { |
826 throw new LuanRuntimeException(e); | 824 throw new LuanRuntimeException(e); |
827 } catch(SQLException e) { | 825 } catch(SQLException e) { |
828 throw new RuntimeException(e); | 826 throw new RuntimeException(e); |
843 writeLock.unlock(); | 841 writeLock.unlock(); |
844 } | 842 } |
845 luanLogger.info("end rebuild_postgres_backup"); | 843 luanLogger.info("end rebuild_postgres_backup"); |
846 } | 844 } |
847 | 845 |
848 public void restore_from_postgres() | 846 public void restore_from_postgres(LuanFunction completer) |
849 throws IOException, LuanException, SQLException, ParseException | 847 throws IOException, LuanException, SQLException, ParseException |
850 { | 848 { |
851 if( postgresBackup!=null && wasCreated && !postgresBackup.wasCreated ) { | 849 if( postgresBackup!=null && wasCreated && !postgresBackup.wasCreated ) { |
852 luanLogger.error("restoring from postgres"); | 850 luanLogger.error("restoring from postgres"); |
853 force_restore_from_postgres(); | 851 force_restore_from_postgres(completer); |
854 } | 852 } |
855 } | 853 } |
856 | 854 |
857 public void force_restore_from_postgres() | 855 public void force_restore_from_postgres(LuanFunction completer) |
858 throws IOException, LuanException, SQLException, ParseException | 856 throws IOException, LuanException, SQLException, ParseException |
859 { | 857 { |
860 luanLogger.warn("start restore_from_postgres"); | 858 luanLogger.warn("start restore_from_postgres"); |
861 if( postgresBackup==null ) | 859 if( postgresBackup==null ) |
862 throw new NullPointerException(); | 860 throw new NullPointerException(); |
865 writeLock.lock(); | 863 writeLock.lock(); |
866 boolean ok = false; | 864 boolean ok = false; |
867 try { | 865 try { |
868 writer.deleteAll(); | 866 writer.deleteAll(); |
869 long nextId = postgresBackup.maxId() + 1; | 867 long nextId = postgresBackup.maxId() + 1; |
870 postgresBackup.restoreLucene(this); | 868 postgresBackup.restoreLucene(this,completer); |
871 id = idLim = nextId; | 869 id = idLim = nextId; |
872 saveNextId(nextId); | 870 saveNextId(nextId); |
873 ok = true; | 871 ok = true; |
874 writer.commit(); | 872 writer.commit(); |
875 wasCreated = false; | 873 wasCreated = false; |
882 writeLock.unlock(); | 880 writeLock.unlock(); |
883 } | 881 } |
884 luanLogger.warn("end restore_from_postgres"); | 882 luanLogger.warn("end restore_from_postgres"); |
885 } | 883 } |
886 | 884 |
887 void restore(LuanTable doc) | 885 void restore(LuanFunction completer,LuanTable doc) |
888 throws LuanException, IOException | 886 throws LuanException, IOException |
889 { | 887 { |
890 writer.addDocument(toLucene(doc,null)); | 888 writer.addDocument(toLucene(completer,doc,null)); |
891 } | 889 } |
892 | 890 |
893 public void check(LuanFunction completer) throws IOException, SQLException, LuanException, ParseException { | 891 public void check(Luan luan) throws IOException, SQLException, LuanException, ParseException { |
894 boolean hasPostgres = postgresBackup != null; | 892 boolean hasPostgres = postgresBackup != null; |
895 String msg = "start check"; | 893 String msg = "start check"; |
896 if( hasPostgres ) | 894 if( hasPostgres ) |
897 msg += " with postgres"; | 895 msg += " with postgres"; |
898 luanLogger.info(msg); | 896 luanLogger.info(msg); |
899 CheckIndex.Status status = new CheckIndex(fsDir).checkIndex(); | 897 CheckIndex.Status status = new CheckIndex(fsDir).checkIndex(); |
900 if( !status.clean ) | 898 if( !status.clean ) |
901 luanLogger.error("index not clean"); | 899 luanLogger.error("index not clean"); |
902 if( hasPostgres ) | 900 if( hasPostgres ) |
903 checkPostgres(completer); | 901 checkPostgres(luan); |
904 luanLogger.info("end check"); | 902 luanLogger.info("end check"); |
905 } | 903 } |
906 | 904 |
907 private void checkPostgres(LuanFunction completer) | 905 private void checkPostgres(Luan luan) |
908 throws IOException, SQLException, LuanException, ParseException | 906 throws IOException, SQLException, LuanException, ParseException |
909 { | 907 { |
910 //luanLogger.info("start postgres check"); | 908 //luanLogger.info("start postgres check"); |
911 final PostgresBackup.Checker postgresChecker = postgresBackup.newChecker(); | 909 final PostgresBackup.Checker postgresChecker = postgresBackup.newChecker(); |
912 final IndexSearcher searcher = openSearcher(); | 910 final IndexSearcher searcher = openSearcher(); |
933 while( iLucene < nLucene && iPostgres < nPostgres ) { | 931 while( iLucene < nLucene && iPostgres < nPostgres ) { |
934 long idLucene = idsLucene.get(iLucene); | 932 long idLucene = idsLucene.get(iLucene); |
935 long idPostgres = idsPostgres.get(iPostgres); | 933 long idPostgres = idsPostgres.get(iPostgres); |
936 if( idLucene < idPostgres ) { | 934 if( idLucene < idPostgres ) { |
937 iLucene++; | 935 iLucene++; |
938 checkPostgres(completer,postgresChecker,lts,idLucene); | 936 checkPostgres(luan,postgresChecker,lts,idLucene); |
939 } else if( idLucene > idPostgres ) { | 937 } else if( idLucene > idPostgres ) { |
940 iPostgres++; | 938 iPostgres++; |
941 checkPostgres(completer,postgresChecker,lts,idPostgres); | 939 checkPostgres(luan,postgresChecker,lts,idPostgres); |
942 } else { // == | 940 } else { // == |
943 LuanTable docPostgres = postgresChecker.getDoc(idPostgres); | 941 LuanTable docPostgres = postgresChecker.getDoc(idPostgres); |
944 TopDocs td = searcher.search(new TermQuery(term("id",idLucene)),1); | 942 TopDocs td = searcher.search(new TermQuery(term("id",idLucene)),1); |
945 if( td.totalHits != 1 ) throw new RuntimeException(); | 943 if( td.totalHits != 1 ) throw new RuntimeException(); |
946 Document doc = searcher.doc( td.scoreDocs[0].doc ); | 944 Document doc = searcher.doc( td.scoreDocs[0].doc ); |
947 LuanTable docLucene = toTable(completer.luan(),doc); | 945 LuanTable docLucene = toTable(luan,doc); |
948 docLucene = (LuanTable)completer.call(docLucene); | |
949 if( !equal(docPostgres,docLucene) ) { | 946 if( !equal(docPostgres,docLucene) ) { |
950 checkPostgres(completer,postgresChecker,lts,idPostgres); | 947 checkPostgres(luan,postgresChecker,lts,idPostgres); |
951 } | 948 } |
952 iLucene++; | 949 iLucene++; |
953 iPostgres++; | 950 iPostgres++; |
954 } | 951 } |
955 } | 952 } |
956 while( iLucene < nLucene ) { | 953 while( iLucene < nLucene ) { |
957 long idLucene = idsLucene.get(iLucene++); | 954 long idLucene = idsLucene.get(iLucene++); |
958 checkPostgres(completer,postgresChecker,lts,idLucene); | 955 checkPostgres(luan,postgresChecker,lts,idLucene); |
959 } | 956 } |
960 while( iPostgres < nPostgres ) { | 957 while( iPostgres < nPostgres ) { |
961 long idPostgres = idsPostgres.get(iPostgres++); | 958 long idPostgres = idsPostgres.get(iPostgres++); |
962 checkPostgres(completer,postgresChecker,lts,idPostgres); | 959 checkPostgres(luan,postgresChecker,lts,idPostgres); |
963 } | 960 } |
964 } finally { | 961 } finally { |
965 close(searcher); | 962 close(searcher); |
966 postgresChecker.close(); | 963 postgresChecker.close(); |
967 } | 964 } |
968 } | 965 } |
969 | 966 |
970 private void checkPostgres(LuanFunction completer,PostgresBackup.Checker postgresChecker,LuanToString lts,long id) | 967 private void checkPostgres(Luan luan,PostgresBackup.Checker postgresChecker,LuanToString lts,long id) |
971 throws IOException, SQLException, LuanException, ParseException | 968 throws IOException, SQLException, LuanException, ParseException |
972 { | 969 { |
973 //luanLogger.info("check id "+id); | 970 //luanLogger.info("check id "+id); |
974 writeLock.lock(); | 971 writeLock.lock(); |
975 try { | 972 try { |
980 LuanTable docLucene; | 977 LuanTable docLucene; |
981 if( td.totalHits == 0 ) { | 978 if( td.totalHits == 0 ) { |
982 docLucene = null; | 979 docLucene = null; |
983 } else if( td.totalHits == 1 ) { | 980 } else if( td.totalHits == 1 ) { |
984 Document doc = searcher.doc( td.scoreDocs[0].doc ); | 981 Document doc = searcher.doc( td.scoreDocs[0].doc ); |
985 docLucene = toTable(completer.luan(),doc); | 982 docLucene = toTable(luan,doc); |
986 docLucene = (LuanTable)completer.call(docLucene); | |
987 } else | 983 } else |
988 throw new RuntimeException(); | 984 throw new RuntimeException(); |
989 if( docPostgres == null ) { | 985 if( docPostgres == null ) { |
990 if( docLucene != null ) | 986 if( docLucene != null ) |
991 luanLogger.error("id "+id+" found in lucene but not postgres"); | 987 luanLogger.error("id "+id+" found in lucene but not postgres"); |
1006 } finally { | 1002 } finally { |
1007 writeLock.unlock(); | 1003 writeLock.unlock(); |
1008 } | 1004 } |
1009 } | 1005 } |
1010 | 1006 |
1011 private boolean equal(LuanTable t1,LuanTable t2) throws LuanException { | 1007 private static boolean equal(LuanTable t1,LuanTable t2) throws LuanException { |
1012 return t1!=null && t2!=null && t1.asMap().equals(t2.asMap()); | 1008 return t1!=null && t2!=null && toJava(t1).equals(toJava(t2)); |
1013 } | 1009 } |
1014 | 1010 |
1011 private static Map toJava(LuanTable t) throws LuanException { | |
1012 Map map = t.asMap(); | |
1013 for( Object obj : map.entrySet() ) { | |
1014 Map.Entry entry = (Map.Entry)obj; | |
1015 Object value = entry.getValue(); | |
1016 if( value instanceof LuanTable ) { | |
1017 LuanTable v = (LuanTable)value; | |
1018 if( !v.isList() ) | |
1019 sysLogger.error("not list"); | |
1020 entry.setValue(v.asList()); | |
1021 } | |
1022 } | |
1023 return map; | |
1024 } | |
1015 } | 1025 } |