comparison src/goodjava/lucene/logging/LoggingIndexWriter.java @ 1476:7d145095cc0b

lucene.logging check
author Franklin Schmidt <fschmidt@gmail.com>
date Sun, 19 Apr 2020 20:42:26 -0600
parents c7b86342857f
children 1fa6e8ec2d53
comparison
equal deleted inserted replaced
1475:c7b86342857f 1476:7d145095cc0b
14 import java.util.ArrayList; 14 import java.util.ArrayList;
15 import java.util.Random; 15 import java.util.Random;
16 import org.apache.lucene.document.Document; 16 import org.apache.lucene.document.Document;
17 import org.apache.lucene.index.DirectoryReader; 17 import org.apache.lucene.index.DirectoryReader;
18 import org.apache.lucene.index.IndexReader; 18 import org.apache.lucene.index.IndexReader;
19 import org.apache.lucene.index.Term;
19 import org.apache.lucene.search.IndexSearcher; 20 import org.apache.lucene.search.IndexSearcher;
20 import org.apache.lucene.search.Query; 21 import org.apache.lucene.search.Query;
21 import org.apache.lucene.search.MatchAllDocsQuery; 22 import org.apache.lucene.search.MatchAllDocsQuery;
22 import org.apache.lucene.search.TopDocs; 23 import org.apache.lucene.search.TopDocs;
24 import org.apache.lucene.search.PrefixQuery;
25 import org.apache.lucene.search.SortField;
26 import org.apache.lucene.search.Sort;
23 import org.apache.lucene.store.Directory; 27 import org.apache.lucene.store.Directory;
24 import org.apache.lucene.store.FSDirectory; 28 import org.apache.lucene.store.FSDirectory;
25 import goodjava.io.IoUtils; 29 import goodjava.io.IoUtils;
26 import goodjava.lucene.api.GoodIndexWriter; 30 import goodjava.lucene.api.GoodIndexWriter;
27 import goodjava.lucene.api.LuceneIndexWriter; 31 import goodjava.lucene.api.LuceneIndexWriter;
61 for( int i=0; i<n; i++ ) { 65 for( int i=0; i<n; i++ ) {
62 File file = new File( logDir, dis.readUTF() ); 66 File file = new File( logDir, dis.readUTF() );
63 logs.add( new LogFile(file,"rwd") ); 67 logs.add( new LogFile(file,"rwd") );
64 } 68 }
65 deleteUnusedFiles(); 69 deleteUnusedFiles();
66 log().gotoEnd();
67 return; 70 return;
68 } 71 }
69 } finally { 72 } finally {
70 dis.close(); 73 dis.close();
71 } 74 }
72 } 75 }
76 newLogs();
77 }
78
79 public synchronized boolean isMerging() {
80 return isMerging;
81 }
82
83 private synchronized void isNotMerging() {
84 isMerging = false;
85 }
86
87 public synchronized void newLogs() throws IOException {
88 if( isMerging )
89 throw new RuntimeException("merging");
90 logger.info("building new logs");
91 logs.clear();
73 for( int i=0; i<2; i++ ) { 92 for( int i=0; i<2; i++ ) {
74 logs.add( newLogFile() ); 93 logs.add( newLogFile() );
75 } 94 }
76 isMerging = true; 95 logLucene( System.currentTimeMillis(), logs.get(0), indexWriter );
77 new Thread(new Runnable(){public void run(){ 96 writeIndex();
78 try { 97 logger.info("done building new logs");
79 logLucene( System.currentTimeMillis(), logs.get(0), indexWriter );
80 synchronized(LoggingIndexWriter.this) {
81 writeIndex();
82 }
83 } catch(IOException e) {
84 throw new RuntimeException(e);
85 } finally {
86 synchronized(LoggingIndexWriter.this) {
87 isMerging = false;
88 }
89 }
90 }}).start();
91 } 98 }
92 99
93 private static void logLucene(long time,LogFile log,LuceneIndexWriter indexWriter) throws IOException { 100 private static void logLucene(long time,LogFile log,LuceneIndexWriter indexWriter) throws IOException {
94 IndexReader reader = indexWriter.openReader(); 101 IndexReader reader = indexWriter.openReader();
95 final IndexSearcher searcher = new IndexSearcher(reader); 102 final IndexSearcher searcher = new IndexSearcher(reader);
147 154
148 private void mergeLogs() throws IOException { 155 private void mergeLogs() throws IOException {
149 logger.info("merge"); 156 logger.info("merge");
150 LogFile first = logs.get(0); 157 LogFile first = logs.get(0);
151 LogFile second = logs.get(1); 158 LogFile second = logs.get(1);
152 second.gotoEnd(); 159 long lastTime = second.file.lastModified();
153 long lastTime = second.readLong();
154 File dirFile = new File(logDir,"merge"); 160 File dirFile = new File(logDir,"merge");
155 if( dirFile.exists() ) 161 if( dirFile.exists() )
156 throw new RuntimeException(); 162 throw new RuntimeException();
157 Directory dir = FSDirectory.open(dirFile); 163 Directory dir = FSDirectory.open(dirFile);
158 LuceneIndexWriter mergeWriter = new LuceneIndexWriter( indexWriter.luceneVersion, dir, indexWriter.goodConfig ); 164 LuceneIndexWriter mergeWriter = new LuceneIndexWriter( indexWriter.luceneVersion, dir, indexWriter.goodConfig );
161 mergeWriter.commit(); 167 mergeWriter.commit();
162 LogFile merge = newLogFile(); 168 LogFile merge = newLogFile();
163 logLucene( lastTime, merge, mergeWriter ); 169 logLucene( lastTime, merge, mergeWriter );
164 mergeWriter.close(); 170 mergeWriter.close();
165 synchronized(this) { 171 synchronized(this) {
166 check(); 172 //check();
167 logs.remove(0); 173 logs.remove(0);
168 logs.set(0,merge); 174 logs.set(0,merge);
169 writeIndex(); 175 writeIndex();
170 check(); 176 //check(null);
171 } 177 }
172 } 178 }
173 private final Runnable mergeLogs = new Runnable() { public void run() { 179 private final Runnable mergeLogs = new Runnable() { public void run() {
174 try { 180 try {
175 mergeLogs(); 181 mergeLogs();
176 /*
177 } catch(IOException e) { 182 } catch(IOException e) {
178 throw new RuntimeException(e); 183 throw new RuntimeException(e);
179 */
180 } catch(Exception e) {
181 e.printStackTrace();
182 System.exit(-1);
183 } finally { 184 } finally {
184 synchronized(LoggingIndexWriter.this) { 185 isNotMerging();
185 isMerging = false;
186 }
187 } 186 }
188 } }; 187 } };
189 188
190 private void check() throws IOException { 189 private static class DocIter {
191 File dirFile = new File(logDir,"check"); 190 final IndexReader reader;
192 if( dirFile.exists() ) 191 final TopDocs td;
193 throw new RuntimeException(); 192 final int n;
194 Directory dir = FSDirectory.open(dirFile); 193 int i = 0;
195 LuceneIndexWriter checkWriter = new LuceneIndexWriter( indexWriter.luceneVersion, dir, indexWriter.goodConfig ); 194
196 playLog(checkWriter); 195 DocIter(IndexReader reader,Query query,Sort sort) throws IOException {
197 int nCheck = numDocs(checkWriter); 196 this.reader = reader;
198 int nOrig = numDocs(indexWriter); 197 IndexSearcher searcher = new IndexSearcher(reader);
199 if( nCheck != nOrig ) { 198 this.td = searcher.search(query,10000000,sort);
200 logger.error("nCheck = "+nCheck); 199 this.n = td.scoreDocs.length;
201 logger.error("nOrig = "+nOrig); 200 if( td.totalHits != n )
202 //new Exception().printStackTrace(); 201 throw new RuntimeException();
203 Thread.dumpStack(); 202 }
204 System.out.println(); 203
205 System.out.println("indexWriter"); 204 Document next() throws IOException {
206 dump(indexWriter); 205 return i < n ? reader.document(td.scoreDocs[i++].doc) : null;
207 System.out.println("checkWriter"); 206 }
208 dump(checkWriter); 207 }
209 System.exit(-1); 208
210 } 209 public void check(SortField sortField) throws IOException {
211 checkWriter.close(); 210 IndexReader indexReader;
212 IoUtils.deleteRecursively(dirFile); 211 List<LogFile> logs;
212 synchronized(this) {
213 if( isMerging ) {
214 logger.warn("is merging, check aborted");
215 return;
216 }
217 isMerging = true;
218 indexReader = indexWriter.openReader();
219 logs = new ArrayList<LogFile>(this.logs);
220 int i = logs.size() - 1;
221 LogFile last = logs.get(i);
222 logs.set(i,last.snapshot());
223 }
224 try {
225 logger.info("check start");
226 indexWriter.check();
227 File dirFile = new File(logDir,"check");
228 IoUtils.deleteRecursively(dirFile);
229 Directory dir = FSDirectory.open(dirFile);
230 LuceneIndexWriter checkWriter = new LuceneIndexWriter( indexWriter.luceneVersion, dir, indexWriter.goodConfig );
231 playLogs(logs,checkWriter);
232 logger.info("check lucene");
233 IndexReader checkReader = checkWriter.openReader();
234 if( sortField == null ) {
235 int nCheck = checkReader.numDocs();
236 int nOrig = indexReader.numDocs();
237 if( nCheck != nOrig ) {
238 logger.error("numDocs mismatch: lucene="+nOrig+" logs="+nCheck);
239 }
240 logger.info("numDocs="+nOrig);
241 if( hash(indexReader) != hash(checkReader) ) {
242 logger.error("hash mismatch");
243 }
244 } else {
245 Sort sort = new Sort(sortField);
246 String sortFieldName = sortField.getField();
247 Query query = new PrefixQuery(new Term(sortFieldName));
248 DocIter origIter = new DocIter(indexReader,query,sort);
249 DocIter checkIter = new DocIter(checkReader,query,sort);
250 Map<String,Object> origFields = LuceneUtils.toMap(origIter.next());
251 Map<String,Object> checkFields = LuceneUtils.toMap(checkIter.next());
252 while( origFields!=null && checkFields!=null ) {
253 Comparable origFld = (Comparable)origFields.get(sortFieldName);
254 Comparable checkFld = (Comparable)checkFields.get(sortFieldName);
255 int cmp = origFld.compareTo(checkFld);
256 if( cmp==0 ) {
257 if( !origFields.equals(checkFields) ) {
258 logger.error(sortFieldName+" "+origFld+" not equal");
259 logger.error("lucene = "+origFields);
260 logger.error("logs = "+checkFields);
261 }
262 origFields = LuceneUtils.toMap(origIter.next());
263 checkFields = LuceneUtils.toMap(checkIter.next());
264 } else if( cmp < 0 ) {
265 logger.error(sortFieldName+" "+origFld+" found in lucene but not logs");
266 origFields = LuceneUtils.toMap(origIter.next());
267 } else { // >
268 logger.error(sortFieldName+" "+checkFld+" found in logs but not lucene");
269 checkFields = LuceneUtils.toMap(checkIter.next());
270 }
271 }
272 while( origFields!=null ) {
273 Comparable origFld = (Comparable)origFields.get(sortFieldName);
274 logger.error(sortFieldName+" "+origFld+" found in lucene but not logs");
275 origFields = LuceneUtils.toMap(origIter.next());
276 }
277 while( checkFields!=null ) {
278 Comparable checkFld = (Comparable)checkFields.get(sortFieldName);
279 logger.error(sortFieldName+" "+checkFld+" found in logs but not lucene");
280 checkFields = LuceneUtils.toMap(checkIter.next());
281 }
282 //logger.info("check done");
283 }
284 checkReader.close();
285 checkWriter.close();
286 IoUtils.deleteRecursively(dirFile);
287 logger.info("check done");
288 } finally {
289 indexReader.close();
290 isNotMerging();
291 }
292 }
293
294 private static abstract class HashCollector extends GoodCollector {
295 int total = 0;
296 }
297
298 private static int hash(IndexReader reader) throws IOException {
299 final IndexSearcher searcher = new IndexSearcher(reader);
300 Query query = new MatchAllDocsQuery();
301 HashCollector col = new HashCollector() {
302 public void collectDoc(int iDoc) throws IOException {
303 Document doc = searcher.doc(iDoc);
304 Map<String,Object> storedFields = LuceneUtils.toMap(doc);
305 total += storedFields.hashCode();
306 }
307 };
308 searcher.search(query,col);
309 return col.total;
213 } 310 }
214 311
215 private LogFile log() { 312 private LogFile log() {
216 return logs.get(logs.size()-1); 313 return logs.get(logs.size()-1);
217 } 314 }
226 indexWriter.commit(); 323 indexWriter.commit();
227 LogFile log = log(); 324 LogFile log = log();
228 log.commit(); 325 log.commit();
229 if( isMerging ) 326 if( isMerging )
230 return; 327 return;
231 if( log.length() > logs.get(0).length() ) { 328 if( log.end() > logs.get(0).end() ) {
232 log.writeLong( System.currentTimeMillis() );
233 logs.add( newLogFile() ); 329 logs.add( newLogFile() );
234 writeIndex(); 330 writeIndex();
235 } 331 }
236 if( logs.size() > 3 ) { 332 if( logs.size() > 3 ) {
237 isMerging = true; 333 isMerging = true;
238 // new Thread(mergeLogs).start(); 334 new Thread(mergeLogs).start();
239 mergeLogs.run(); 335 // mergeLogs.run();
240 } 336 }
241 } 337 }
242 338
243 public synchronized void rollback() throws IOException { 339 public synchronized void rollback() throws IOException {
244 indexWriter.rollback(); 340 indexWriter.rollback();
245 LogFile log = log(); 341 LogFile log = log();
246 log.gotoEnd(); 342 log.rollback();
247 } 343 }
248 344
249 public synchronized void deleteAll() throws IOException { 345 public synchronized void deleteAll() throws IOException {
250 indexWriter.deleteAll(); 346 indexWriter.deleteAll();
251 LogFile log = log(); 347 LogFile log = log();
281 private void writeOp(LogFile log,int op) throws IOException { 377 private void writeOp(LogFile log,int op) throws IOException {
282 log.writeLong(System.currentTimeMillis()); 378 log.writeLong(System.currentTimeMillis());
283 log.writeByte(op); 379 log.writeByte(op);
284 } 380 }
285 381
286 public synchronized void playLog() throws IOException { 382 public synchronized void playLogs() throws IOException {
287 playLog(indexWriter); 383 playLogs(logs,indexWriter);
288 } 384 }
289 385
290 private void playLog(LuceneIndexWriter indexWriter) throws IOException { 386 private static void playLogs(List<LogFile> logs,LuceneIndexWriter indexWriter) throws IOException {
291 if( numDocs(indexWriter) != 0 ) 387 if( numDocs(indexWriter) != 0 )
292 throw new RuntimeException ("not empty"); 388 throw new RuntimeException ("not empty");
293 for( LogFile log : logs ) { 389 for( LogFile log : logs ) {
294 playLog(log,indexWriter); 390 playLog(log,indexWriter);
295 } 391 }
302 reader.close(); 398 reader.close();
303 return n; 399 return n;
304 } 400 }
305 401
306 private static void playLog(LogFile log,LuceneIndexWriter indexWriter) throws IOException { 402 private static void playLog(LogFile log,LuceneIndexWriter indexWriter) throws IOException {
307 log.gotoStart(); 403 LogInputStream in = log.input();
308 while( log.hasMore() ) { 404 while( in.available() > 0 ) {
309 playOp(log,indexWriter); 405 playOp(in,indexWriter);
310 } 406 }
311 } 407 }
312 408
313 private static void playOp(LogFile log,LuceneIndexWriter indexWriter) throws IOException { 409 private static void playOp(LogInputStream in,LuceneIndexWriter indexWriter) throws IOException {
314 log.readLong(); // time 410 in.readLong(); // time
315 int op = log.readByte(); 411 int op = in.readByte();
316 switch(op) { 412 switch(op) {
317 case OP_DELETE_ALL: 413 case OP_DELETE_ALL:
318 indexWriter.deleteAll(); 414 indexWriter.deleteAll();
319 return; 415 return;
320 case OP_DELETE_DOCUMENTS: 416 case OP_DELETE_DOCUMENTS:
321 indexWriter.deleteDocuments( log.readQuery() ); 417 indexWriter.deleteDocuments( in.readQuery() );
322 return; 418 return;
323 case OP_ADD_DOCUMENT: 419 case OP_ADD_DOCUMENT:
324 { 420 {
325 Map storedFields = log.readMap(); 421 Map storedFields = in.readMap();
326 indexWriter.addDocument(storedFields); 422 indexWriter.addDocument(storedFields);
327 return; 423 return;
328 } 424 }
329 case OP_UPDATE_DOCUMENT: 425 case OP_UPDATE_DOCUMENT:
330 { 426 {
331 String keyFieldName = log.readUTF(); 427 String keyFieldName = in.readUTF();
332 Map storedFields = log.readMap(); 428 Map storedFields = in.readMap();
333 indexWriter.updateDocument(keyFieldName,storedFields); 429 indexWriter.updateDocument(keyFieldName,storedFields);
334 return; 430 return;
335 } 431 }
336 default: 432 default:
337 throw new RuntimeException("invalid op "+op); 433 throw new RuntimeException("invalid op "+op);