Mercurial Hosting > luan
comparison src/goodjava/lucene/logging/LoggingIndexWriter.java @ 1465:5e3870618377
lucene.logging dir
author | Franklin Schmidt <fschmidt@gmail.com> |
---|---|
date | Sun, 12 Apr 2020 15:59:57 -0600 |
parents | e5d48b85351c |
children | 6c6ce14db6a8 |
comparison
equal
deleted
inserted
replaced
1464:465b4a0dae4a | 1465:5e3870618377 |
---|---|
1 package goodjava.lucene.logging; | 1 package goodjava.lucene.logging; |
2 | 2 |
3 import java.io.File; | |
4 import java.io.RandomAccessFile; | |
5 import java.io.ByteArrayOutputStream; | |
6 import java.io.DataOutputStream; | |
7 import java.io.DataInputStream; | |
8 import java.io.FileInputStream; | |
3 import java.io.IOException; | 9 import java.io.IOException; |
4 import java.util.Map; | 10 import java.util.Map; |
11 import java.util.Set; | |
12 import java.util.HashSet; | |
13 import java.util.List; | |
14 import java.util.ArrayList; | |
15 import java.util.Random; | |
16 import org.apache.lucene.document.Document; | |
17 import org.apache.lucene.index.DirectoryReader; | |
18 import org.apache.lucene.index.IndexReader; | |
19 import org.apache.lucene.search.IndexSearcher; | |
5 import org.apache.lucene.search.Query; | 20 import org.apache.lucene.search.Query; |
21 import org.apache.lucene.search.MatchAllDocsQuery; | |
22 import org.apache.lucene.search.TopDocs; | |
23 import org.apache.lucene.store.Directory; | |
24 import org.apache.lucene.store.FSDirectory; | |
6 import goodjava.lucene.api.GoodIndexWriter; | 25 import goodjava.lucene.api.GoodIndexWriter; |
7 | 26 import goodjava.lucene.api.LuceneIndexWriter; |
8 | 27 import goodjava.lucene.api.GoodCollector; |
9 public class LoggingIndexWriter implements GoodIndexWriter { | 28 import goodjava.lucene.api.LuceneUtils; |
29 import goodjava.logging.Logger; | |
30 import goodjava.logging.LoggerFactory; | |
31 | |
32 | |
33 public final class LoggingIndexWriter implements GoodIndexWriter { | |
34 private static final Logger logger = LoggerFactory.getLogger(LoggingIndexWriter.class); | |
35 private static final int version = 1; | |
10 private static final int OP_DELETE_ALL = 1; | 36 private static final int OP_DELETE_ALL = 1; |
11 private static final int OP_DELETE_DOCUMENTS = 2; | 37 private static final int OP_DELETE_DOCUMENTS = 2; |
12 private static final int OP_ADD_DOCUMENT = 3; | 38 private static final int OP_ADD_DOCUMENT = 3; |
13 private static final int OP_UPDATE_DOCUMENT = 4; | 39 private static final int OP_UPDATE_DOCUMENT = 4; |
14 | 40 private static final Random rnd = new Random(); |
15 public final GoodIndexWriter indexWriter; | 41 |
16 private final LogFile logFile; | 42 public final LuceneIndexWriter indexWriter; |
17 | 43 private final File logDir; |
18 public LoggingIndexWriter(GoodIndexWriter indexWriter) throws IOException { | 44 private final List<LogFile> logs = new ArrayList<LogFile>(); |
45 private final File index; | |
46 private boolean isMerging = false; | |
47 | |
48 public LoggingIndexWriter(LuceneIndexWriter indexWriter,File logDir) throws IOException { | |
19 this.indexWriter = indexWriter; | 49 this.indexWriter = indexWriter; |
20 logFile = new LogFile("lucene.log","rw"); | 50 this.logDir = logDir; |
21 logFile.gotoStart(); // for now | 51 logDir.mkdirs(); |
22 } | 52 if( !logDir.isDirectory() ) |
23 | 53 throw new RuntimeException(); |
24 public void close() throws IOException { | 54 index = new File(logDir,"index"); |
55 if( index.exists() ) { | |
56 DataInputStream dis = new DataInputStream(new FileInputStream(index)); | |
57 try { | |
58 if( dis.readInt() == version ) { | |
59 final int n = dis.readInt(); | |
60 for( int i=0; i<n; i++ ) { | |
61 File file = new File( logDir, dis.readUTF() ); | |
62 logs.add( new LogFile(file,"rwd") ); | |
63 } | |
64 deleteUnusedFiles(); | |
65 log().gotoEnd(); | |
66 return; | |
67 } | |
68 } finally { | |
69 dis.close(); | |
70 } | |
71 } | |
72 for( int i=0; i<2; i++ ) { | |
73 logs.add( newLogFile() ); | |
74 } | |
75 isMerging = true; | |
76 new Thread(new Runnable(){public void run(){ | |
77 try { | |
78 logLucene( System.currentTimeMillis(), logs.get(0), indexWriter ); | |
79 synchronized(LoggingIndexWriter.this) { | |
80 writeIndex(); | |
81 } | |
82 } catch(IOException e) { | |
83 throw new RuntimeException(e); | |
84 } finally { | |
85 synchronized(LoggingIndexWriter.this) { | |
86 isMerging = false; | |
87 } | |
88 } | |
89 }}).start(); | |
90 } | |
91 | |
92 private static void logLucene(long time,LogFile log,LuceneIndexWriter indexWriter) throws IOException { | |
93 IndexReader reader = indexWriter.openReader(); | |
94 final IndexSearcher searcher = new IndexSearcher(reader); | |
95 Query query = new MatchAllDocsQuery(); | |
96 searcher.search( query, new GoodCollector(){ | |
97 public void collectDoc(int iDoc) throws IOException { | |
98 Document doc = searcher.doc(iDoc); | |
99 Map<String,Object> storedFields = LuceneUtils.toMap(doc); | |
100 log.writeLong(time); | |
101 log.writeByte(OP_ADD_DOCUMENT); | |
102 log.writeMap(storedFields); | |
103 } | |
104 }); | |
105 reader.close(); | |
106 log.commit(); | |
107 } | |
108 | |
109 private LogFile newLogFile() throws IOException { | |
110 File file; | |
111 do { | |
112 file = new File(logDir,"_"+rnd.nextInt(100)+".log"); | |
113 } while( file.exists() ); | |
114 return new LogFile(file,"rwd"); | |
115 } | |
116 | |
117 private void deleteUnusedFiles() { | |
118 Set<String> used = new HashSet<String>(); | |
119 used.add( index.getName() ); | |
120 for( LogFile lf : logs ) { | |
121 used.add( lf.file.getName() ); | |
122 } | |
123 for( File f : logDir.listFiles() ) { | |
124 if( !used.contains(f.getName()) ) { | |
125 deleteFile(f); | |
126 } | |
127 } | |
128 } | |
129 | |
130 private static void deleteFile(File file) { | |
131 if( file.isDirectory() ) { | |
132 for( File f : file.listFiles() ) { | |
133 deleteFile(f); | |
134 } | |
135 } | |
136 if( !file.delete() ) | |
137 throw new RuntimeException(file.getName()); | |
138 } | |
139 | |
140 private void writeIndex() throws IOException { | |
141 ByteArrayOutputStream baos = new ByteArrayOutputStream(); | |
142 DataOutputStream dos = new DataOutputStream(baos); | |
143 dos.writeInt(version); | |
144 dos.writeInt(logs.size()); | |
145 for( LogFile lf : logs ) { | |
146 String fileName = lf.file.getName(); | |
147 dos.writeUTF(fileName); | |
148 } | |
149 dos.close(); | |
150 RandomAccessFile raf = new RandomAccessFile( index, "rwd" ); | |
151 raf.write( baos.toByteArray() ); | |
152 raf.close(); | |
153 deleteUnusedFiles(); | |
154 logger.info("writeIndex "+logs.toString()); | |
155 } | |
156 | |
157 private void mergeLogs() throws IOException { | |
158 logger.info("merge"); | |
159 LogFile first = logs.get(0); | |
160 LogFile second = logs.get(1); | |
161 second.gotoEnd(); | |
162 long lastTime = second.readLong(); | |
163 File dirFile = new File(logDir,"merge"); | |
164 if( dirFile.exists() ) | |
165 throw new RuntimeException(); | |
166 Directory dir = FSDirectory.open(dirFile); | |
167 LuceneIndexWriter mergeWriter = new LuceneIndexWriter( indexWriter.luceneVersion, dir, indexWriter.goodConfig ); | |
168 playLog(first,mergeWriter); | |
169 playLog(second,mergeWriter); | |
170 mergeWriter.commit(); | |
171 LogFile merge = newLogFile(); | |
172 logLucene( lastTime, merge, mergeWriter ); | |
173 mergeWriter.close(); | |
174 synchronized(this) { | |
175 check(); | |
176 logs.remove(0); | |
177 logs.set(0,merge); | |
178 writeIndex(); | |
179 check(); | |
180 } | |
181 } | |
182 private final Runnable mergeLogs = new Runnable() { public void run() { | |
183 try { | |
184 mergeLogs(); | |
185 /* | |
186 } catch(IOException e) { | |
187 throw new RuntimeException(e); | |
188 */ | |
189 } catch(Exception e) { | |
190 e.printStackTrace(); | |
191 System.exit(-1); | |
192 } finally { | |
193 synchronized(LoggingIndexWriter.this) { | |
194 isMerging = false; | |
195 } | |
196 } | |
197 } }; | |
198 | |
199 private void check() throws IOException { | |
200 File dirFile = new File(logDir,"check"); | |
201 if( dirFile.exists() ) | |
202 throw new RuntimeException(); | |
203 Directory dir = FSDirectory.open(dirFile); | |
204 LuceneIndexWriter checkWriter = new LuceneIndexWriter( indexWriter.luceneVersion, dir, indexWriter.goodConfig ); | |
205 playLog(checkWriter); | |
206 int nCheck = numDocs(checkWriter); | |
207 int nOrig = numDocs(indexWriter); | |
208 if( nCheck != nOrig ) { | |
209 logger.error("nCheck = "+nCheck); | |
210 logger.error("nOrig = "+nOrig); | |
211 //new Exception().printStackTrace(); | |
212 Thread.dumpStack(); | |
213 System.out.println(); | |
214 System.out.println("indexWriter"); | |
215 dump(indexWriter); | |
216 System.out.println("checkWriter"); | |
217 dump(checkWriter); | |
218 System.exit(-1); | |
219 } | |
220 checkWriter.close(); | |
221 deleteFile(dirFile); | |
222 } | |
223 | |
224 private LogFile log() { | |
225 return logs.get(logs.size()-1); | |
226 } | |
227 | |
228 public synchronized void close() throws IOException { | |
25 indexWriter.close(); | 229 indexWriter.close(); |
26 logFile.commit(); | 230 LogFile log = log(); |
27 } | 231 log.commit(); |
28 | 232 } |
29 public void commit() throws IOException { | 233 |
234 public synchronized void commit() throws IOException { | |
30 indexWriter.commit(); | 235 indexWriter.commit(); |
31 logFile.commit(); | 236 LogFile log = log(); |
32 } | 237 log.commit(); |
33 | 238 if( isMerging ) |
34 public void rollback() throws IOException { | 239 return; |
240 if( log.length() > logs.get(0).length() ) { | |
241 log.writeLong( System.currentTimeMillis() ); | |
242 logs.add( newLogFile() ); | |
243 writeIndex(); | |
244 } | |
245 if( logs.size() > 3 ) { | |
246 isMerging = true; | |
247 // new Thread(mergeLogs).start(); | |
248 mergeLogs.run(); | |
249 } | |
250 } | |
251 | |
252 public synchronized void rollback() throws IOException { | |
35 indexWriter.rollback(); | 253 indexWriter.rollback(); |
36 logFile.gotoEnd(); | 254 LogFile log = log(); |
37 } | 255 log.gotoEnd(); |
38 | 256 } |
39 public void deleteAll() throws IOException { | 257 |
258 public synchronized void deleteAll() throws IOException { | |
40 indexWriter.deleteAll(); | 259 indexWriter.deleteAll(); |
41 logFile.writeByte(OP_DELETE_ALL); | 260 LogFile log = log(); |
42 } | 261 writeOp(log,OP_DELETE_ALL); |
43 | 262 } |
44 public void deleteDocuments(Query query) throws IOException { | 263 |
264 public synchronized void deleteDocuments(Query query) throws IOException { | |
45 indexWriter.deleteDocuments(query); | 265 indexWriter.deleteDocuments(query); |
46 logFile.writeByte(OP_DELETE_DOCUMENTS); | 266 LogFile log = log(); |
47 logFile.writeQuery(query); | 267 writeOp(log,OP_DELETE_DOCUMENTS); |
48 } | 268 log.writeQuery(query); |
49 | 269 } |
50 public void addDocument(Map<String,Object> storedFields) throws IOException { | 270 |
271 public synchronized void addDocument(Map<String,Object> storedFields) throws IOException { | |
51 indexWriter.addDocument(storedFields); | 272 indexWriter.addDocument(storedFields); |
52 logFile.writeByte(OP_ADD_DOCUMENT); | 273 LogFile log = log(); |
53 logFile.writeMap(storedFields); | 274 writeOp(log,OP_ADD_DOCUMENT); |
54 } | 275 log.writeMap(storedFields); |
55 | 276 } |
56 public void updateDocument(String keyFieldName,Map<String,Object> storedFields) throws IOException { | 277 |
278 public synchronized void updateDocument(String keyFieldName,Map<String,Object> storedFields) throws IOException { | |
57 indexWriter.updateDocument(keyFieldName,storedFields); | 279 indexWriter.updateDocument(keyFieldName,storedFields); |
58 logFile.writeByte(OP_UPDATE_DOCUMENT); | 280 LogFile log = log(); |
59 logFile.writeUTF(keyFieldName); | 281 writeOp(log,OP_UPDATE_DOCUMENT); |
60 logFile.writeMap(storedFields); | 282 log.writeUTF(keyFieldName); |
61 } | 283 log.writeMap(storedFields); |
62 | 284 } |
63 public void reindexDocuments(String keyFieldName,Query query) throws IOException { | 285 |
286 public synchronized void reindexDocuments(String keyFieldName,Query query) throws IOException { | |
64 indexWriter.reindexDocuments(keyFieldName,query); | 287 indexWriter.reindexDocuments(keyFieldName,query); |
65 } | 288 } |
66 | 289 |
67 private void playOp() throws IOException { | 290 private void writeOp(LogFile log,int op) throws IOException { |
68 int op = logFile.readByte(); | 291 log.writeLong(System.currentTimeMillis()); |
292 log.writeByte(op); | |
293 } | |
294 | |
295 public synchronized void playLog() throws IOException { | |
296 playLog(indexWriter); | |
297 } | |
298 | |
299 private void playLog(LuceneIndexWriter indexWriter) throws IOException { | |
300 if( numDocs(indexWriter) != 0 ) | |
301 throw new RuntimeException ("not empty"); | |
302 for( LogFile log : logs ) { | |
303 playLog(log,indexWriter); | |
304 } | |
305 indexWriter.commit(); | |
306 } | |
307 | |
308 private static int numDocs(LuceneIndexWriter indexWriter) throws IOException { | |
309 IndexReader reader = indexWriter.openReader(); | |
310 int n = reader.numDocs(); | |
311 reader.close(); | |
312 return n; | |
313 } | |
314 | |
315 private static void playLog(LogFile log,LuceneIndexWriter indexWriter) throws IOException { | |
316 log.gotoStart(); | |
317 while( log.hasMore() ) { | |
318 playOp(log,indexWriter); | |
319 } | |
320 } | |
321 | |
322 private static void playOp(LogFile log,LuceneIndexWriter indexWriter) throws IOException { | |
323 log.readLong(); // time | |
324 int op = log.readByte(); | |
69 switch(op) { | 325 switch(op) { |
70 case OP_DELETE_ALL: | 326 case OP_DELETE_ALL: |
71 indexWriter.deleteAll(); | 327 indexWriter.deleteAll(); |
72 return; | 328 return; |
73 case OP_DELETE_DOCUMENTS: | 329 case OP_DELETE_DOCUMENTS: |
74 indexWriter.deleteDocuments( logFile.readQuery() ); | 330 indexWriter.deleteDocuments( log.readQuery() ); |
75 return; | 331 return; |
76 case OP_ADD_DOCUMENT: | 332 case OP_ADD_DOCUMENT: |
77 indexWriter.addDocument( logFile.readMap() ); | 333 { |
78 return; | 334 Map storedFields = log.readMap(); |
335 indexWriter.addDocument(storedFields); | |
336 return; | |
337 } | |
79 case OP_UPDATE_DOCUMENT: | 338 case OP_UPDATE_DOCUMENT: |
80 indexWriter.updateDocument( logFile.readUTF(), logFile.readMap() ); | 339 { |
81 return; | 340 String keyFieldName = log.readUTF(); |
341 Map storedFields = log.readMap(); | |
342 indexWriter.updateDocument(keyFieldName,storedFields); | |
343 return; | |
344 } | |
82 default: | 345 default: |
83 throw new RuntimeException("invalid op "+op); | 346 throw new RuntimeException("invalid op "+op); |
84 } | 347 } |
85 } | 348 } |
86 | 349 |
87 public void playLog() throws IOException { | 350 private static void dump(LuceneIndexWriter indexWriter) throws IOException { |
88 logFile.gotoStart(); | 351 IndexReader reader = indexWriter.openReader(); |
89 while( logFile.hasMore() ) { | 352 IndexSearcher searcher = new IndexSearcher(reader); |
90 playOp(); | 353 Query query = new MatchAllDocsQuery(); |
91 } | 354 TopDocs td = searcher.search(query,100); |
92 indexWriter.commit(); | 355 System.out.println("totalHits = "+td.totalHits); |
93 } | 356 for( int i=0; i<td.scoreDocs.length; i++ ) { |
357 Document doc = searcher.doc(td.scoreDocs[i].doc); | |
358 System.out.println(LuceneUtils.toMap(doc)); | |
359 } | |
360 System.out.println(); | |
361 reader.close(); | |
362 } | |
363 | |
94 } | 364 } |