Mercurial Hosting > luan
comparison src/goodjava/lucene/logging/LoggingIndexWriter.java @ 1548:736ec76bbf42
lucene log work
author | Franklin Schmidt <fschmidt@gmail.com> |
---|---|
date | Sun, 27 Sep 2020 22:07:18 -0600 |
parents | 35601f15ecc3 |
children | 41c32da4cbd1 |
comparison
equal
deleted
inserted
replaced
1547:f24a9ba7551e | 1548:736ec76bbf42 |
---|---|
8 import java.io.FileInputStream; | 8 import java.io.FileInputStream; |
9 import java.io.IOException; | 9 import java.io.IOException; |
10 import java.util.Map; | 10 import java.util.Map; |
11 import java.util.Set; | 11 import java.util.Set; |
12 import java.util.HashSet; | 12 import java.util.HashSet; |
13 import java.util.List; | |
14 import java.util.ArrayList; | |
15 import java.util.Random; | 13 import java.util.Random; |
16 import java.util.concurrent.TimeUnit; | 14 import java.util.concurrent.TimeUnit; |
17 import org.apache.lucene.document.Document; | 15 import org.apache.lucene.document.Document; |
18 import org.apache.lucene.index.DirectoryReader; | 16 import org.apache.lucene.index.DirectoryReader; |
19 import org.apache.lucene.index.IndexReader; | 17 import org.apache.lucene.index.IndexReader; |
27 import org.apache.lucene.search.SortField; | 25 import org.apache.lucene.search.SortField; |
28 import org.apache.lucene.search.Sort; | 26 import org.apache.lucene.search.Sort; |
29 import org.apache.lucene.store.Directory; | 27 import org.apache.lucene.store.Directory; |
30 import org.apache.lucene.store.FSDirectory; | 28 import org.apache.lucene.store.FSDirectory; |
31 import goodjava.io.IoUtils; | 29 import goodjava.io.IoUtils; |
30 import goodjava.lucene.api.GoodWriter; | |
32 import goodjava.lucene.api.GoodIndexWriter; | 31 import goodjava.lucene.api.GoodIndexWriter; |
33 import goodjava.lucene.api.LuceneIndexWriter; | 32 import goodjava.lucene.api.LuceneIndexWriter; |
34 import goodjava.lucene.api.GoodCollector; | 33 import goodjava.lucene.api.GoodCollector; |
35 import goodjava.lucene.api.LuceneUtils; | 34 import goodjava.lucene.api.LuceneUtils; |
36 import goodjava.logging.Logger; | 35 import goodjava.logging.Logger; |
37 import goodjava.logging.LoggerFactory; | 36 import goodjava.logging.LoggerFactory; |
38 | 37 |
39 | 38 |
40 public class LoggingIndexWriter implements GoodIndexWriter { | 39 public class LoggingIndexWriter implements GoodIndexWriter { |
41 private static final Logger logger = LoggerFactory.getLogger(LoggingIndexWriter.class); | 40 private static final Logger logger = LoggerFactory.getLogger(LoggingIndexWriter.class); |
42 private static final int version = 1; | 41 private static final int version = 2; |
43 private static final int OP_DELETE_ALL = 1; | 42 private static final int OP_DELETE_ALL = 1; |
44 private static final int OP_DELETE_DOCUMENTS = 2; | 43 private static final int OP_DELETE_DOCUMENTS = 2; |
45 private static final int OP_ADD_DOCUMENT = 3; | 44 private static final int OP_ADD_DOCUMENT = 3; |
46 private static final int OP_UPDATE_DOCUMENT = 4; | 45 private static final int OP_UPDATE_DOCUMENT = 4; |
47 private static final int OP_TAG = 5; | 46 private static final int OP_TAG = 5; |
48 private static final Random rnd = new Random(); | 47 private static final Random rnd = new Random(); |
49 | 48 |
50 public final LuceneIndexWriter indexWriter; | 49 public final LuceneIndexWriter indexWriter; |
51 public boolean wasCreated; | 50 public boolean wasCreated; |
52 private final File logDir; | 51 private final File logDir; |
53 protected final List<LogFile> logs = new ArrayList<LogFile>(); | 52 private final long logTime; |
53 protected final LogFile[] logs = new LogFile[3]; | |
54 private LogOutputStream log; | 54 private LogOutputStream log; |
55 private final File index; | 55 private final File index; |
56 private final SemaphoreLock mergeLock = new SemaphoreLock(); | 56 private final SemaphoreLock mergeLock = new SemaphoreLock(); |
57 | 57 |
58 public LoggingIndexWriter(LuceneIndexWriter indexWriter,File logDir) throws IOException { | 58 public LoggingIndexWriter(LuceneIndexWriter indexWriter,File logDir,long logTime) |
59 throws IOException | |
60 { | |
59 this.indexWriter = indexWriter; | 61 this.indexWriter = indexWriter; |
60 this.logDir = logDir; | 62 this.logDir = logDir; |
63 this.logTime = logTime; | |
61 IoUtils.mkdirs(logDir); | 64 IoUtils.mkdirs(logDir); |
62 if( !logDir.isDirectory() ) | 65 if( !logDir.isDirectory() ) |
63 throw new RuntimeException(); | 66 throw new RuntimeException(); |
64 index = new File(logDir,"index"); | 67 index = new File(logDir,"index"); |
65 if( index.exists() ) { | 68 if( index.exists() ) { |
66 DataInputStream dis = new DataInputStream(new FileInputStream(index)); | 69 DataInputStream dis = new DataInputStream(new FileInputStream(index)); |
67 try { | 70 try { |
68 if( dis.readInt() == version ) { | 71 if( dis.readInt() == version ) { |
69 final int n = dis.readInt(); | 72 for( int i=0; i<logs.length; i++ ) { |
70 for( int i=0; i<n; i++ ) { | |
71 File file = new File( logDir, dis.readUTF() ); | 73 File file = new File( logDir, dis.readUTF() ); |
72 logs.add( new LogFile(file) ); | 74 logs[i] = new LogFile(file); |
73 } | 75 } |
74 deleteUnusedFiles(); | 76 deleteUnusedFiles(); |
75 setLog(); | 77 setLog(); |
76 wasCreated = false; | 78 wasCreated = false; |
77 return; | 79 return; |
82 } | 84 } |
83 newLogs(); | 85 newLogs(); |
84 wasCreated = true; | 86 wasCreated = true; |
85 } | 87 } |
86 | 88 |
89 public IndexReader openReader() throws IOException { | |
90 return indexWriter.openReader(); | |
91 } | |
92 | |
87 public IndexWriter getLuceneIndexWriter() { | 93 public IndexWriter getLuceneIndexWriter() { |
88 return indexWriter.getLuceneIndexWriter(); | 94 return indexWriter.getLuceneIndexWriter(); |
89 } | 95 } |
90 | 96 |
91 private void setLog() throws IOException { | 97 private void setLog() throws IOException { |
92 if( log != null ) | 98 if( log != null ) |
93 log.close(); | 99 log.close(); |
94 log = logs.get(logs.size()-1).output(); | 100 log = logs[2].output(); |
95 } | 101 } |
96 /* | 102 /* |
97 public synchronized boolean isMerging() { | 103 public synchronized boolean isMerging() { |
98 return mergeLock.isLocked(); | 104 return mergeLock.isLocked(); |
99 } | 105 } |
116 } | 122 } |
117 } | 123 } |
118 | 124 |
119 private void newLogs2() throws IOException { | 125 private void newLogs2() throws IOException { |
120 logger.info("building new logs"); | 126 logger.info("building new logs"); |
121 logs.clear(); | 127 for( int i=0; i<logs.length; i++ ) { |
122 for( int i=0; i<2; i++ ) { | 128 logs[i] = newLogFile(); |
123 logs.add( newLogFile() ); | 129 } |
124 } | 130 LogOutputStream log = logs[0].output(); |
125 logLucene( System.currentTimeMillis(), logs.get(0), indexWriter ); | 131 logLucene( System.currentTimeMillis(), log, indexWriter ); |
132 log.close(); | |
126 writeIndex(); | 133 writeIndex(); |
127 setLog(); | 134 setLog(); |
128 logger.info("done building new logs"); | 135 logger.info("done building new logs"); |
129 } | 136 } |
130 | 137 |
131 private static void logLucene(long time,LogFile logLucene,LuceneIndexWriter indexWriter) throws IOException { | 138 public synchronized void logLucene() |
132 LogOutputStream log = logLucene.output(); | 139 throws IOException |
140 { | |
141 //log.rollback(); ? | |
142 logLucene( System.currentTimeMillis(), log, indexWriter ); | |
143 } | |
144 | |
145 private static void logLucene(long time,LogOutputStream log,LuceneIndexWriter indexWriter) | |
146 throws IOException | |
147 { | |
148 log.writeLong(time); | |
149 log.writeByte(OP_DELETE_ALL); | |
133 IndexReader reader = indexWriter.openReader(); | 150 IndexReader reader = indexWriter.openReader(); |
134 final IndexSearcher searcher = new IndexSearcher(reader); | 151 final IndexSearcher searcher = new IndexSearcher(reader); |
135 Query query = new MatchAllDocsQuery(); | 152 Query query = new MatchAllDocsQuery(); |
136 searcher.search( query, new GoodCollector(){ | 153 searcher.search( query, new GoodCollector(){ |
137 public void collectDoc(int iDoc) throws IOException { | 154 public void collectDoc(int iDoc) throws IOException { |
142 log.writeMap(storedFields); | 159 log.writeMap(storedFields); |
143 } | 160 } |
144 }); | 161 }); |
145 reader.close(); | 162 reader.close(); |
146 log.commit(); | 163 log.commit(); |
147 log.close(); | |
148 } | 164 } |
149 | 165 |
150 private LogFile newLogFile() throws IOException { | 166 private LogFile newLogFile() throws IOException { |
151 File file; | 167 File file; |
152 do { | 168 do { |
157 | 173 |
158 private void deleteUnusedFiles() throws IOException { | 174 private void deleteUnusedFiles() throws IOException { |
159 deleteUnusedFiles(logs,index); | 175 deleteUnusedFiles(logs,index); |
160 } | 176 } |
161 | 177 |
162 private static void deleteUnusedFiles(List<LogFile> logs,File index) throws IOException { | 178 private static void deleteUnusedFiles(LogFile[] logs,File index) throws IOException { |
163 Set<String> used = new HashSet<String>(); | 179 Set<String> used = new HashSet<String>(); |
164 used.add( index.getName() ); | 180 used.add( index.getName() ); |
165 for( LogFile lf : logs ) { | 181 for( LogFile lf : logs ) { |
166 used.add( lf.file.getName() ); | 182 used.add( lf.file.getName() ); |
167 } | 183 } |
174 | 190 |
175 private void writeIndex() throws IOException { | 191 private void writeIndex() throws IOException { |
176 writeIndex(logs,index); | 192 writeIndex(logs,index); |
177 } | 193 } |
178 | 194 |
179 public static void writeIndex(List<LogFile> logs,File index) throws IOException { | 195 public static void writeIndex(LogFile[] logs,File index) throws IOException { |
196 if( logs.length != 3 ) | |
197 throw new RuntimeException(); | |
180 ByteArrayOutputStream baos = new ByteArrayOutputStream(); | 198 ByteArrayOutputStream baos = new ByteArrayOutputStream(); |
181 DataOutputStream dos = new DataOutputStream(baos); | 199 DataOutputStream dos = new DataOutputStream(baos); |
182 dos.writeInt(version); | 200 dos.writeInt(version); |
183 dos.writeInt(logs.size()); | |
184 for( LogFile lf : logs ) { | 201 for( LogFile lf : logs ) { |
185 String fileName = lf.file.getName(); | 202 String fileName = lf.file.getName(); |
186 dos.writeUTF(fileName); | 203 dos.writeUTF(fileName); |
187 } | 204 } |
188 dos.close(); | 205 dos.close(); |
192 deleteUnusedFiles(logs,index); | 209 deleteUnusedFiles(logs,index); |
193 //logger.info("writeIndex "+logs.toString()); | 210 //logger.info("writeIndex "+logs.toString()); |
194 } | 211 } |
195 | 212 |
196 private void mergeLogs() throws IOException { | 213 private void mergeLogs() throws IOException { |
197 //logger.info("merge"); | 214 logger.info("merge"); |
198 if( logs.size() <= 3 ) | 215 if( !mergeLock.isLocked() ) { |
216 logger.error("merge without lock"); | |
199 return; | 217 return; |
200 LogFile first = logs.get(0); | 218 } |
201 LogFile second = logs.get(1); | 219 LogFile first = logs[0]; |
220 LogFile second = logs[1]; | |
202 long lastTime = second.file.lastModified(); | 221 long lastTime = second.file.lastModified(); |
203 File dirFile = new File(logDir,"merge"); | 222 File dirFile = new File(logDir,"merge"); |
204 if( dirFile.exists() ) | 223 if( dirFile.exists() ) |
205 throw new RuntimeException(); | 224 throw new RuntimeException(); |
206 Directory dir = FSDirectory.open(dirFile); | 225 Directory dir = FSDirectory.open(dirFile); |
207 LuceneIndexWriter mergeWriter = new LuceneIndexWriter( dir, indexWriter.goodConfig ); | 226 LuceneIndexWriter mergeWriter = new LuceneIndexWriter( dir, indexWriter.goodConfig ); |
208 playLog( first.input(), mergeWriter, null ); | 227 playLog( first.input(), mergeWriter ); |
209 playLog( second.input(), mergeWriter, null ); | 228 playLog( second.input(), mergeWriter ); |
210 mergeWriter.commit(); | 229 mergeWriter.commit(); |
211 LogFile merge = newLogFile(); | 230 LogFile merge = newLogFile(); |
212 logLucene( lastTime, merge, mergeWriter ); | 231 LogOutputStream log = merge.output(); |
232 logLucene( lastTime, log, mergeWriter ); | |
233 log.close(); | |
213 mergeWriter.close(); | 234 mergeWriter.close(); |
214 synchronized(this) { | 235 synchronized(this) { |
215 //check(); | 236 //check(); |
216 logs.remove(0); | 237 logs[0] = merge; |
217 logs.set(0,merge); | 238 logs[1] = logs[2]; |
239 logs[2] = newLogFile(); | |
218 writeIndex(); | 240 writeIndex(); |
241 setLog(); | |
219 //check(null); | 242 //check(null); |
220 } | 243 } |
221 } | 244 } |
222 private final Runnable mergeLogs = new Runnable() { public void run() { | 245 private final Runnable mergeLogs = new Runnable() { public void run() { |
223 try { | 246 try { |
263 } | 286 } |
264 | 287 |
265 protected boolean doCheck(SortField sortField) throws IOException { | 288 protected boolean doCheck(SortField sortField) throws IOException { |
266 boolean ok = true; | 289 boolean ok = true; |
267 IndexReader indexReader; | 290 IndexReader indexReader; |
268 List<LogInputStream> logReaders; | 291 LogInputStream[] logReaders; |
269 synchronized(this) { | 292 synchronized(this) { |
270 indexReader = indexWriter.openReader(); | 293 indexReader = indexWriter.openReader(); |
271 logReaders = logReaders(logs); | 294 logReaders = logReaders(logs); |
272 } | 295 } |
273 try { | 296 try { |
275 indexWriter.check(); | 298 indexWriter.check(); |
276 File dirFile = new File(logDir,"check"); | 299 File dirFile = new File(logDir,"check"); |
277 IoUtils.deleteRecursively(dirFile); | 300 IoUtils.deleteRecursively(dirFile); |
278 Directory dir = FSDirectory.open(dirFile); | 301 Directory dir = FSDirectory.open(dirFile); |
279 LuceneIndexWriter checkWriter = new LuceneIndexWriter( dir, indexWriter.goodConfig ); | 302 LuceneIndexWriter checkWriter = new LuceneIndexWriter( dir, indexWriter.goodConfig ); |
280 playLogs(logReaders,checkWriter,null); | 303 playLogs(logReaders,checkWriter); |
281 //logger.info("check lucene"); | 304 //logger.info("check lucene"); |
282 IndexReader checkReader = checkWriter.openReader(); | 305 IndexReader checkReader = checkWriter.openReader(); |
283 int nCheck = checkReader.numDocs(); | 306 int nCheck = checkReader.numDocs(); |
284 int nOrig = indexReader.numDocs(); | 307 int nOrig = indexReader.numDocs(); |
285 if( nCheck != nOrig ) { | 308 if( nCheck != nOrig ) { |
373 public synchronized void commit() throws IOException { | 396 public synchronized void commit() throws IOException { |
374 indexWriter.commit(); | 397 indexWriter.commit(); |
375 log.commit(); | 398 log.commit(); |
376 if( mergeLock.isLocked() ) | 399 if( mergeLock.isLocked() ) |
377 return; | 400 return; |
378 if( log.logFile.end() > logs.get(0).end() ) { | 401 if( logs[1].file.lastModified() < System.currentTimeMillis() - logTime ) { |
379 logs.add( newLogFile() ); | |
380 writeIndex(); | |
381 setLog(); | |
382 } | |
383 if( logs.size() > 3 ) { | |
384 getMergeLock(); | 402 getMergeLock(); |
385 new Thread(mergeLogs).start(); | 403 new Thread(mergeLogs).start(); |
386 // mergeLogs.run(); | 404 // mergeLogs.run(); |
387 } | 405 } |
388 } | 406 } |
429 log.writeLong(System.currentTimeMillis()); | 447 log.writeLong(System.currentTimeMillis()); |
430 log.writeByte(op); | 448 log.writeByte(op); |
431 } | 449 } |
432 | 450 |
433 // return whether stopped at tag | 451 // return whether stopped at tag |
434 public synchronized boolean playLogs(String upToTag) throws IOException { | 452 public synchronized void playLogs(GoodWriter writer) throws IOException { |
435 return playLogs( logReaders(logs), indexWriter, upToTag ); | 453 if( writer == null ) |
436 } | 454 writer = indexWriter; |
437 | 455 playLogs( logReaders(logs), writer ); |
438 private static List<LogInputStream> logReaders(List<LogFile> logs) throws IOException { | 456 } |
439 List<LogInputStream> logReaders = new ArrayList<LogInputStream>(); | 457 |
440 for( LogFile log : logs ) { | 458 private static LogInputStream[] logReaders(LogFile[] logs) throws IOException { |
441 logReaders.add( log.input() ); | 459 LogInputStream[] logReaders = new LogInputStream[logs.length]; |
460 for( int i=0; i<logs.length; i++ ) { | |
461 logReaders[i] = logs[i].input(); | |
442 } | 462 } |
443 return logReaders; | 463 return logReaders; |
444 } | 464 } |
445 | 465 |
446 private static boolean playLogs(List<LogInputStream> logReaders,LuceneIndexWriter indexWriter,String upToTag) | 466 private static void playLogs(LogInputStream[] logReaders,GoodWriter indexWriter) |
447 throws IOException | 467 throws IOException |
448 { | 468 { |
449 if( numDocs(indexWriter) != 0 ) | 469 if( numDocs(indexWriter) != 0 ) |
450 throw new RuntimeException ("not empty"); | 470 throw new RuntimeException ("not empty"); |
451 boolean rtn = false; | |
452 for( LogInputStream reader : logReaders ) { | 471 for( LogInputStream reader : logReaders ) { |
453 if( playLog(reader,indexWriter,upToTag) ) { | 472 playLog(reader,indexWriter); |
454 rtn = true; | |
455 break; | |
456 } | |
457 } | 473 } |
458 indexWriter.commit(); | 474 indexWriter.commit(); |
459 return rtn; | 475 } |
460 } | 476 |
461 | 477 private static int numDocs(GoodWriter indexWriter) throws IOException { |
462 private static int numDocs(LuceneIndexWriter indexWriter) throws IOException { | |
463 IndexReader reader = indexWriter.openReader(); | 478 IndexReader reader = indexWriter.openReader(); |
464 int n = reader.numDocs(); | 479 int n = reader.numDocs(); |
465 reader.close(); | 480 reader.close(); |
466 return n; | 481 return n; |
467 } | 482 } |
468 | 483 |
469 private static boolean playLog(LogInputStream in,LuceneIndexWriter indexWriter,String upToTag) | 484 private static void playLog(LogInputStream in,GoodWriter indexWriter) |
470 throws IOException | 485 throws IOException |
471 { | 486 { |
472 boolean rtn = false; | |
473 while( in.available() > 0 ) { | 487 while( in.available() > 0 ) { |
474 if( playOp(in,indexWriter,upToTag) ) { | 488 playOp(in,indexWriter); |
475 rtn = true; | |
476 break; | |
477 } | |
478 } | 489 } |
479 in.close(); | 490 in.close(); |
480 return rtn; | 491 } |
481 } | 492 |
482 | 493 private static void playOp(LogInputStream in,GoodWriter indexWriter) |
483 private static boolean playOp(LogInputStream in,LuceneIndexWriter indexWriter,String upToTag) throws IOException { | 494 throws IOException |
495 { | |
484 in.readLong(); // time | 496 in.readLong(); // time |
485 int op = in.readByte(); | 497 int op = in.readByte(); |
486 switch(op) { | 498 switch(op) { |
487 case OP_DELETE_ALL: | 499 case OP_DELETE_ALL: |
488 indexWriter.deleteAll(); | 500 indexWriter.deleteAll(); |
489 return false; | 501 return; |
490 case OP_DELETE_DOCUMENTS: | 502 case OP_DELETE_DOCUMENTS: |
491 { | 503 { |
492 Query query = in.readQuery(); | 504 Query query = in.readQuery(); |
493 //System.out.println("OP_DELETE_DOCUMENTS "+query); | 505 //System.out.println("OP_DELETE_DOCUMENTS "+query); |
494 indexWriter.deleteDocuments(query); | 506 indexWriter.deleteDocuments(query); |
495 return false; | 507 return; |
496 } | 508 } |
497 case OP_ADD_DOCUMENT: | 509 case OP_ADD_DOCUMENT: |
498 { | 510 { |
499 Map storedFields = in.readMap(); | 511 Map storedFields = in.readMap(); |
500 indexWriter.addDocument(storedFields); | 512 indexWriter.addDocument(storedFields); |
501 return false; | 513 return; |
502 } | 514 } |
503 case OP_UPDATE_DOCUMENT: | 515 case OP_UPDATE_DOCUMENT: |
504 { | 516 { |
505 String keyFieldName = in.readUTF(); | 517 String keyFieldName = in.readUTF(); |
506 Map storedFields = in.readMap(); | 518 Map storedFields = in.readMap(); |
507 indexWriter.updateDocument(keyFieldName,storedFields); | 519 indexWriter.updateDocument(keyFieldName,storedFields); |
508 return false; | 520 return; |
509 } | 521 } |
510 case OP_TAG: | 522 case OP_TAG: |
511 { | 523 { |
512 String tag = in.readUTF(); | 524 String tag = in.readUTF(); |
513 return tag.equals(upToTag); | 525 indexWriter.tag(tag); |
526 return; | |
514 } | 527 } |
515 default: | 528 default: |
516 throw new RuntimeException("invalid op "+op); | 529 throw new RuntimeException("invalid op "+op); |
517 } | 530 } |
518 } | 531 } |