comparison src/goodjava/lucene/logging/LoggingIndexWriter.java @ 1538:634f6765830e

use goodjava/lucene/logging
author Franklin Schmidt <fschmidt@gmail.com>
date Fri, 07 Aug 2020 21:42:16 -0600
parents 3bd4d7963456
children c27dc6af87ca
comparison
equal deleted inserted replaced
1537:f7649ad6e3e7 1538:634f6765830e
11 import java.util.Set; 11 import java.util.Set;
12 import java.util.HashSet; 12 import java.util.HashSet;
13 import java.util.List; 13 import java.util.List;
14 import java.util.ArrayList; 14 import java.util.ArrayList;
15 import java.util.Random; 15 import java.util.Random;
16 import java.util.concurrent.TimeUnit;
16 import org.apache.lucene.document.Document; 17 import org.apache.lucene.document.Document;
17 import org.apache.lucene.index.DirectoryReader; 18 import org.apache.lucene.index.DirectoryReader;
18 import org.apache.lucene.index.IndexReader; 19 import org.apache.lucene.index.IndexReader;
19 import org.apache.lucene.index.Term; 20 import org.apache.lucene.index.Term;
20 import org.apache.lucene.index.LiveIndexWriterConfig; 21 import org.apache.lucene.index.LiveIndexWriterConfig;
48 public final LuceneIndexWriter indexWriter; 49 public final LuceneIndexWriter indexWriter;
49 private final File logDir; 50 private final File logDir;
50 protected final List<LogFile> logs = new ArrayList<LogFile>(); 51 protected final List<LogFile> logs = new ArrayList<LogFile>();
51 private LogOutputStream log; 52 private LogOutputStream log;
52 private final File index; 53 private final File index;
53 private boolean isMerging = false; 54 private final SemaphoreLock mergeLock = new SemaphoreLock();
54 55
55 public LoggingIndexWriter(LuceneIndexWriter indexWriter,File logDir) throws IOException { 56 public LoggingIndexWriter(LuceneIndexWriter indexWriter,File logDir) throws IOException {
56 this.indexWriter = indexWriter; 57 this.indexWriter = indexWriter;
57 this.logDir = logDir; 58 this.logDir = logDir;
58 IoUtils.mkdirs(logDir); 59 IoUtils.mkdirs(logDir);
90 private void setLog() throws IOException { 91 private void setLog() throws IOException {
91 if( log != null ) 92 if( log != null )
92 log.close(); 93 log.close();
93 log = logs.get(logs.size()-1).output(); 94 log = logs.get(logs.size()-1).output();
94 } 95 }
95 96 /*
96 public synchronized boolean isMerging() { 97 public synchronized boolean isMerging() {
97 return isMerging; 98 return mergeLock.isLocked();
98 } 99 }
99 100 */
100 private synchronized void isNotMerging() { 101 private void getMergeLock() {
101 isMerging = false; 102 try {
103 if( !mergeLock.tryLock(1,TimeUnit.MINUTES) )
104 throw new RuntimeException("failed to acquire lock");
105 } catch(InterruptedException e) {
106 throw new RuntimeException(e);
107 }
102 } 108 }
103 109
104 public synchronized void newLogs() throws IOException { 110 public synchronized void newLogs() throws IOException {
105 if( isMerging ) 111 getMergeLock();
106 throw new RuntimeException("merging"); 112 try {
113 newLogs2();
114 } finally {
115 mergeLock.unlock();
116 }
117 }
118
119 private void newLogs2() throws IOException {
107 logger.info("building new logs"); 120 logger.info("building new logs");
108 logs.clear(); 121 logs.clear();
109 for( int i=0; i<2; i++ ) { 122 for( int i=0; i<2; i++ ) {
110 logs.add( newLogFile() ); 123 logs.add( newLogFile() );
111 } 124 }
180 //logger.info("writeIndex "+logs.toString()); 193 //logger.info("writeIndex "+logs.toString());
181 } 194 }
182 195
183 private void mergeLogs() throws IOException { 196 private void mergeLogs() throws IOException {
184 //logger.info("merge"); 197 //logger.info("merge");
198 if( logs.size() <= 3 )
199 return;
185 LogFile first = logs.get(0); 200 LogFile first = logs.get(0);
186 LogFile second = logs.get(1); 201 LogFile second = logs.get(1);
187 long lastTime = second.file.lastModified(); 202 long lastTime = second.file.lastModified();
188 File dirFile = new File(logDir,"merge"); 203 File dirFile = new File(logDir,"merge");
189 if( dirFile.exists() ) 204 if( dirFile.exists() )
208 try { 223 try {
209 mergeLogs(); 224 mergeLogs();
210 } catch(IOException e) { 225 } catch(IOException e) {
211 throw new RuntimeException(e); 226 throw new RuntimeException(e);
212 } finally { 227 } finally {
213 isNotMerging(); 228 mergeLock.unlock();
214 } 229 }
215 } }; 230 } };
216 231
217 private static class DocIter { 232 private static class DocIter {
218 final IndexReader reader; 233 final IndexReader reader;
234 } 249 }
235 } 250 }
236 251
237 private volatile boolean isChecking = false; 252 private volatile boolean isChecking = false;
238 253
239 public void check(SortField sortField) throws IOException { 254 public boolean check(SortField sortField) throws IOException {
240 if( isChecking ) 255 if( isChecking )
241 throw new RuntimeException("another check is running"); 256 throw new RuntimeException("another check is running");
242 isChecking = true; 257 isChecking = true;
243 try { 258 try {
244 doCheck(sortField); 259 return doCheck(sortField);
245 } finally { 260 } finally {
246 isChecking = false; 261 isChecking = false;
247 } 262 }
248 } 263 }
249 264
250 protected void doCheck(SortField sortField) throws IOException { 265 protected boolean doCheck(SortField sortField) throws IOException {
266 boolean ok = true;
251 IndexReader indexReader; 267 IndexReader indexReader;
252 List<LogInputStream> logReaders; 268 List<LogInputStream> logReaders;
253 synchronized(this) { 269 synchronized(this) {
254 indexReader = indexWriter.openReader(); 270 indexReader = indexWriter.openReader();
255 logReaders = logReaders(logs); 271 logReaders = logReaders(logs);
256 } 272 }
257 try { 273 try {
258 logger.info("check start"); 274 //logger.info("check start");
259 indexWriter.check(); 275 indexWriter.check();
260 File dirFile = new File(logDir,"check"); 276 File dirFile = new File(logDir,"check");
261 IoUtils.deleteRecursively(dirFile); 277 IoUtils.deleteRecursively(dirFile);
262 Directory dir = FSDirectory.open(dirFile); 278 Directory dir = FSDirectory.open(dirFile);
263 LuceneIndexWriter checkWriter = new LuceneIndexWriter( dir, indexWriter.goodConfig ); 279 LuceneIndexWriter checkWriter = new LuceneIndexWriter( dir, indexWriter.goodConfig );
264 playLogs(logReaders,checkWriter); 280 playLogs(logReaders,checkWriter);
265 logger.info("check lucene"); 281 //logger.info("check lucene");
266 IndexReader checkReader = checkWriter.openReader(); 282 IndexReader checkReader = checkWriter.openReader();
267 if( sortField == null ) { 283 if( sortField == null ) {
268 int nCheck = checkReader.numDocs(); 284 int nCheck = checkReader.numDocs();
269 int nOrig = indexReader.numDocs(); 285 int nOrig = indexReader.numDocs();
270 if( nCheck != nOrig ) { 286 if( nCheck != nOrig ) {
271 logger.error("numDocs mismatch: lucene="+nOrig+" logs="+nCheck); 287 logger.error("numDocs mismatch: lucene="+nOrig+" logs="+nCheck);
288 ok = false;
272 } 289 }
273 logger.info("numDocs="+nOrig); 290 //logger.info("numDocs="+nOrig);
274 if( hash(indexReader) != hash(checkReader) ) { 291 if( hash(indexReader) != hash(checkReader) ) {
275 logger.error("hash mismatch"); 292 logger.error("hash mismatch");
293 ok = false;
276 } 294 }
277 } else { 295 } else {
278 Sort sort = new Sort(sortField); 296 Sort sort = new Sort(sortField);
279 String sortFieldName = sortField.getField(); 297 String sortFieldName = sortField.getField();
280 Query query = new PrefixQuery(new Term(sortFieldName)); 298 Query query = new PrefixQuery(new Term(sortFieldName));
289 if( cmp==0 ) { 307 if( cmp==0 ) {
290 if( !origFields.equals(checkFields) ) { 308 if( !origFields.equals(checkFields) ) {
291 logger.error(sortFieldName+" "+origFld+" not equal"); 309 logger.error(sortFieldName+" "+origFld+" not equal");
292 logger.error("lucene = "+origFields); 310 logger.error("lucene = "+origFields);
293 logger.error("logs = "+checkFields); 311 logger.error("logs = "+checkFields);
312 ok = false;
294 } 313 }
295 origFields = LuceneUtils.toMap(origIter.next()); 314 origFields = LuceneUtils.toMap(origIter.next());
296 checkFields = LuceneUtils.toMap(checkIter.next()); 315 checkFields = LuceneUtils.toMap(checkIter.next());
297 } else if( cmp < 0 ) { 316 } else if( cmp < 0 ) {
298 logger.error(sortFieldName+" "+origFld+" found in lucene but not logs"); 317 logger.error(sortFieldName+" "+origFld+" found in lucene but not logs");
318 ok = false;
299 origFields = LuceneUtils.toMap(origIter.next()); 319 origFields = LuceneUtils.toMap(origIter.next());
300 } else { // > 320 } else { // >
301 logger.error(sortFieldName+" "+checkFld+" found in logs but not lucene"); 321 logger.error(sortFieldName+" "+checkFld+" found in logs but not lucene");
322 ok = false;
302 checkFields = LuceneUtils.toMap(checkIter.next()); 323 checkFields = LuceneUtils.toMap(checkIter.next());
303 } 324 }
304 } 325 }
305 while( origFields!=null ) { 326 while( origFields!=null ) {
306 Comparable origFld = (Comparable)origFields.get(sortFieldName); 327 Comparable origFld = (Comparable)origFields.get(sortFieldName);
307 logger.error(sortFieldName+" "+origFld+" found in lucene but not logs"); 328 logger.error(sortFieldName+" "+origFld+" found in lucene but not logs");
329 ok = false;
308 origFields = LuceneUtils.toMap(origIter.next()); 330 origFields = LuceneUtils.toMap(origIter.next());
309 } 331 }
310 while( checkFields!=null ) { 332 while( checkFields!=null ) {
311 Comparable checkFld = (Comparable)checkFields.get(sortFieldName); 333 Comparable checkFld = (Comparable)checkFields.get(sortFieldName);
312 logger.error(sortFieldName+" "+checkFld+" found in logs but not lucene"); 334 logger.error(sortFieldName+" "+checkFld+" found in logs but not lucene");
335 ok = false;
313 checkFields = LuceneUtils.toMap(checkIter.next()); 336 checkFields = LuceneUtils.toMap(checkIter.next());
314 } 337 }
315 //logger.info("check done"); 338 //logger.info("check done");
316 } 339 }
317 checkReader.close(); 340 checkReader.close();
318 checkWriter.close(); 341 checkWriter.close();
319 IoUtils.deleteRecursively(dirFile); 342 IoUtils.deleteRecursively(dirFile);
320 logger.info("check done"); 343 //logger.info("check done");
321 } finally { 344 } finally {
322 indexReader.close(); 345 indexReader.close();
323 } 346 }
347 return ok;
324 } 348 }
325 349
326 private static abstract class HashCollector extends GoodCollector { 350 private static abstract class HashCollector extends GoodCollector {
327 int total = 0; 351 int total = 0;
328 } 352 }
348 } 372 }
349 373
350 public synchronized void commit() throws IOException { 374 public synchronized void commit() throws IOException {
351 indexWriter.commit(); 375 indexWriter.commit();
352 log.commit(); 376 log.commit();
353 if( isMerging ) 377 if( mergeLock.isLocked() )
354 return; 378 return;
355 if( log.logFile.end() > logs.get(0).end() ) { 379 if( log.logFile.end() > logs.get(0).end() ) {
356 logs.add( newLogFile() ); 380 logs.add( newLogFile() );
357 writeIndex(); 381 writeIndex();
358 setLog(); 382 setLog();
359 } 383 }
360 if( logs.size() > 3 ) { 384 if( logs.size() > 3 ) {
361 isMerging = true; 385 getMergeLock();
362 new Thread(mergeLogs).start(); 386 new Thread(mergeLogs).start();
363 // mergeLogs.run(); 387 // mergeLogs.run();
364 } 388 }
365 } 389 }
366 390