comparison src/goodjava/lucene/logging/LoggingIndexWriter.java @ 1465:5e3870618377

lucene.logging dir
author Franklin Schmidt <fschmidt@gmail.com>
date Sun, 12 Apr 2020 15:59:57 -0600
parents e5d48b85351c
children 6c6ce14db6a8
comparison
equal deleted inserted replaced
1464:465b4a0dae4a 1465:5e3870618377
1 package goodjava.lucene.logging; 1 package goodjava.lucene.logging;
2 2
3 import java.io.File;
4 import java.io.RandomAccessFile;
5 import java.io.ByteArrayOutputStream;
6 import java.io.DataOutputStream;
7 import java.io.DataInputStream;
8 import java.io.FileInputStream;
3 import java.io.IOException; 9 import java.io.IOException;
4 import java.util.Map; 10 import java.util.Map;
11 import java.util.Set;
12 import java.util.HashSet;
13 import java.util.List;
14 import java.util.ArrayList;
15 import java.util.Random;
16 import org.apache.lucene.document.Document;
17 import org.apache.lucene.index.DirectoryReader;
18 import org.apache.lucene.index.IndexReader;
19 import org.apache.lucene.search.IndexSearcher;
5 import org.apache.lucene.search.Query; 20 import org.apache.lucene.search.Query;
21 import org.apache.lucene.search.MatchAllDocsQuery;
22 import org.apache.lucene.search.TopDocs;
23 import org.apache.lucene.store.Directory;
24 import org.apache.lucene.store.FSDirectory;
6 import goodjava.lucene.api.GoodIndexWriter; 25 import goodjava.lucene.api.GoodIndexWriter;
7 26 import goodjava.lucene.api.LuceneIndexWriter;
8 27 import goodjava.lucene.api.GoodCollector;
9 public class LoggingIndexWriter implements GoodIndexWriter { 28 import goodjava.lucene.api.LuceneUtils;
29 import goodjava.logging.Logger;
30 import goodjava.logging.LoggerFactory;
31
32
33 public final class LoggingIndexWriter implements GoodIndexWriter {
34 private static final Logger logger = LoggerFactory.getLogger(LoggingIndexWriter.class);
35 private static final int version = 1;
10 private static final int OP_DELETE_ALL = 1; 36 private static final int OP_DELETE_ALL = 1;
11 private static final int OP_DELETE_DOCUMENTS = 2; 37 private static final int OP_DELETE_DOCUMENTS = 2;
12 private static final int OP_ADD_DOCUMENT = 3; 38 private static final int OP_ADD_DOCUMENT = 3;
13 private static final int OP_UPDATE_DOCUMENT = 4; 39 private static final int OP_UPDATE_DOCUMENT = 4;
14 40 private static final Random rnd = new Random();
15 public final GoodIndexWriter indexWriter; 41
16 private final LogFile logFile; 42 public final LuceneIndexWriter indexWriter;
17 43 private final File logDir;
18 public LoggingIndexWriter(GoodIndexWriter indexWriter) throws IOException { 44 private final List<LogFile> logs = new ArrayList<LogFile>();
45 private final File index;
46 private boolean isMerging = false;
47
48 public LoggingIndexWriter(LuceneIndexWriter indexWriter,File logDir) throws IOException {
19 this.indexWriter = indexWriter; 49 this.indexWriter = indexWriter;
20 logFile = new LogFile("lucene.log","rw"); 50 this.logDir = logDir;
21 logFile.gotoStart(); // for now 51 logDir.mkdirs();
22 } 52 if( !logDir.isDirectory() )
23 53 throw new RuntimeException();
24 public void close() throws IOException { 54 index = new File(logDir,"index");
55 if( index.exists() ) {
56 DataInputStream dis = new DataInputStream(new FileInputStream(index));
57 try {
58 if( dis.readInt() == version ) {
59 final int n = dis.readInt();
60 for( int i=0; i<n; i++ ) {
61 File file = new File( logDir, dis.readUTF() );
62 logs.add( new LogFile(file,"rwd") );
63 }
64 deleteUnusedFiles();
65 log().gotoEnd();
66 return;
67 }
68 } finally {
69 dis.close();
70 }
71 }
72 for( int i=0; i<2; i++ ) {
73 logs.add( newLogFile() );
74 }
75 isMerging = true;
76 new Thread(new Runnable(){public void run(){
77 try {
78 logLucene( System.currentTimeMillis(), logs.get(0), indexWriter );
79 synchronized(LoggingIndexWriter.this) {
80 writeIndex();
81 }
82 } catch(IOException e) {
83 throw new RuntimeException(e);
84 } finally {
85 synchronized(LoggingIndexWriter.this) {
86 isMerging = false;
87 }
88 }
89 }}).start();
90 }
91
92 private static void logLucene(long time,LogFile log,LuceneIndexWriter indexWriter) throws IOException {
93 IndexReader reader = indexWriter.openReader();
94 final IndexSearcher searcher = new IndexSearcher(reader);
95 Query query = new MatchAllDocsQuery();
96 searcher.search( query, new GoodCollector(){
97 public void collectDoc(int iDoc) throws IOException {
98 Document doc = searcher.doc(iDoc);
99 Map<String,Object> storedFields = LuceneUtils.toMap(doc);
100 log.writeLong(time);
101 log.writeByte(OP_ADD_DOCUMENT);
102 log.writeMap(storedFields);
103 }
104 });
105 reader.close();
106 log.commit();
107 }
108
109 private LogFile newLogFile() throws IOException {
110 File file;
111 do {
112 file = new File(logDir,"_"+rnd.nextInt(100)+".log");
113 } while( file.exists() );
114 return new LogFile(file,"rwd");
115 }
116
117 private void deleteUnusedFiles() {
118 Set<String> used = new HashSet<String>();
119 used.add( index.getName() );
120 for( LogFile lf : logs ) {
121 used.add( lf.file.getName() );
122 }
123 for( File f : logDir.listFiles() ) {
124 if( !used.contains(f.getName()) ) {
125 deleteFile(f);
126 }
127 }
128 }
129
130 private static void deleteFile(File file) {
131 if( file.isDirectory() ) {
132 for( File f : file.listFiles() ) {
133 deleteFile(f);
134 }
135 }
136 if( !file.delete() )
137 throw new RuntimeException(file.getName());
138 }
139
140 private void writeIndex() throws IOException {
141 ByteArrayOutputStream baos = new ByteArrayOutputStream();
142 DataOutputStream dos = new DataOutputStream(baos);
143 dos.writeInt(version);
144 dos.writeInt(logs.size());
145 for( LogFile lf : logs ) {
146 String fileName = lf.file.getName();
147 dos.writeUTF(fileName);
148 }
149 dos.close();
150 RandomAccessFile raf = new RandomAccessFile( index, "rwd" );
151 raf.write( baos.toByteArray() );
152 raf.close();
153 deleteUnusedFiles();
154 logger.info("writeIndex "+logs.toString());
155 }
156
157 private void mergeLogs() throws IOException {
158 logger.info("merge");
159 LogFile first = logs.get(0);
160 LogFile second = logs.get(1);
161 second.gotoEnd();
162 long lastTime = second.readLong();
163 File dirFile = new File(logDir,"merge");
164 if( dirFile.exists() )
165 throw new RuntimeException();
166 Directory dir = FSDirectory.open(dirFile);
167 LuceneIndexWriter mergeWriter = new LuceneIndexWriter( indexWriter.luceneVersion, dir, indexWriter.goodConfig );
168 playLog(first,mergeWriter);
169 playLog(second,mergeWriter);
170 mergeWriter.commit();
171 LogFile merge = newLogFile();
172 logLucene( lastTime, merge, mergeWriter );
173 mergeWriter.close();
174 synchronized(this) {
175 check();
176 logs.remove(0);
177 logs.set(0,merge);
178 writeIndex();
179 check();
180 }
181 }
182 private final Runnable mergeLogs = new Runnable() { public void run() {
183 try {
184 mergeLogs();
185 /*
186 } catch(IOException e) {
187 throw new RuntimeException(e);
188 */
189 } catch(Exception e) {
190 e.printStackTrace();
191 System.exit(-1);
192 } finally {
193 synchronized(LoggingIndexWriter.this) {
194 isMerging = false;
195 }
196 }
197 } };
198
199 private void check() throws IOException {
200 File dirFile = new File(logDir,"check");
201 if( dirFile.exists() )
202 throw new RuntimeException();
203 Directory dir = FSDirectory.open(dirFile);
204 LuceneIndexWriter checkWriter = new LuceneIndexWriter( indexWriter.luceneVersion, dir, indexWriter.goodConfig );
205 playLog(checkWriter);
206 int nCheck = numDocs(checkWriter);
207 int nOrig = numDocs(indexWriter);
208 if( nCheck != nOrig ) {
209 logger.error("nCheck = "+nCheck);
210 logger.error("nOrig = "+nOrig);
211 //new Exception().printStackTrace();
212 Thread.dumpStack();
213 System.out.println();
214 System.out.println("indexWriter");
215 dump(indexWriter);
216 System.out.println("checkWriter");
217 dump(checkWriter);
218 System.exit(-1);
219 }
220 checkWriter.close();
221 deleteFile(dirFile);
222 }
223
224 private LogFile log() {
225 return logs.get(logs.size()-1);
226 }
227
228 public synchronized void close() throws IOException {
25 indexWriter.close(); 229 indexWriter.close();
26 logFile.commit(); 230 LogFile log = log();
27 } 231 log.commit();
28 232 }
29 public void commit() throws IOException { 233
234 public synchronized void commit() throws IOException {
30 indexWriter.commit(); 235 indexWriter.commit();
31 logFile.commit(); 236 LogFile log = log();
32 } 237 log.commit();
33 238 if( isMerging )
34 public void rollback() throws IOException { 239 return;
240 if( log.length() > logs.get(0).length() ) {
241 log.writeLong( System.currentTimeMillis() );
242 logs.add( newLogFile() );
243 writeIndex();
244 }
245 if( logs.size() > 3 ) {
246 isMerging = true;
247 // new Thread(mergeLogs).start();
248 mergeLogs.run();
249 }
250 }
251
252 public synchronized void rollback() throws IOException {
35 indexWriter.rollback(); 253 indexWriter.rollback();
36 logFile.gotoEnd(); 254 LogFile log = log();
37 } 255 log.gotoEnd();
38 256 }
39 public void deleteAll() throws IOException { 257
258 public synchronized void deleteAll() throws IOException {
40 indexWriter.deleteAll(); 259 indexWriter.deleteAll();
41 logFile.writeByte(OP_DELETE_ALL); 260 LogFile log = log();
42 } 261 writeOp(log,OP_DELETE_ALL);
43 262 }
44 public void deleteDocuments(Query query) throws IOException { 263
264 public synchronized void deleteDocuments(Query query) throws IOException {
45 indexWriter.deleteDocuments(query); 265 indexWriter.deleteDocuments(query);
46 logFile.writeByte(OP_DELETE_DOCUMENTS); 266 LogFile log = log();
47 logFile.writeQuery(query); 267 writeOp(log,OP_DELETE_DOCUMENTS);
48 } 268 log.writeQuery(query);
49 269 }
50 public void addDocument(Map<String,Object> storedFields) throws IOException { 270
271 public synchronized void addDocument(Map<String,Object> storedFields) throws IOException {
51 indexWriter.addDocument(storedFields); 272 indexWriter.addDocument(storedFields);
52 logFile.writeByte(OP_ADD_DOCUMENT); 273 LogFile log = log();
53 logFile.writeMap(storedFields); 274 writeOp(log,OP_ADD_DOCUMENT);
54 } 275 log.writeMap(storedFields);
55 276 }
56 public void updateDocument(String keyFieldName,Map<String,Object> storedFields) throws IOException { 277
278 public synchronized void updateDocument(String keyFieldName,Map<String,Object> storedFields) throws IOException {
57 indexWriter.updateDocument(keyFieldName,storedFields); 279 indexWriter.updateDocument(keyFieldName,storedFields);
58 logFile.writeByte(OP_UPDATE_DOCUMENT); 280 LogFile log = log();
59 logFile.writeUTF(keyFieldName); 281 writeOp(log,OP_UPDATE_DOCUMENT);
60 logFile.writeMap(storedFields); 282 log.writeUTF(keyFieldName);
61 } 283 log.writeMap(storedFields);
62 284 }
63 public void reindexDocuments(String keyFieldName,Query query) throws IOException { 285
286 public synchronized void reindexDocuments(String keyFieldName,Query query) throws IOException {
64 indexWriter.reindexDocuments(keyFieldName,query); 287 indexWriter.reindexDocuments(keyFieldName,query);
65 } 288 }
66 289
67 private void playOp() throws IOException { 290 private void writeOp(LogFile log,int op) throws IOException {
68 int op = logFile.readByte(); 291 log.writeLong(System.currentTimeMillis());
292 log.writeByte(op);
293 }
294
295 public synchronized void playLog() throws IOException {
296 playLog(indexWriter);
297 }
298
299 private void playLog(LuceneIndexWriter indexWriter) throws IOException {
300 if( numDocs(indexWriter) != 0 )
301 throw new RuntimeException ("not empty");
302 for( LogFile log : logs ) {
303 playLog(log,indexWriter);
304 }
305 indexWriter.commit();
306 }
307
308 private static int numDocs(LuceneIndexWriter indexWriter) throws IOException {
309 IndexReader reader = indexWriter.openReader();
310 int n = reader.numDocs();
311 reader.close();
312 return n;
313 }
314
315 private static void playLog(LogFile log,LuceneIndexWriter indexWriter) throws IOException {
316 log.gotoStart();
317 while( log.hasMore() ) {
318 playOp(log,indexWriter);
319 }
320 }
321
322 private static void playOp(LogFile log,LuceneIndexWriter indexWriter) throws IOException {
323 log.readLong(); // time
324 int op = log.readByte();
69 switch(op) { 325 switch(op) {
70 case OP_DELETE_ALL: 326 case OP_DELETE_ALL:
71 indexWriter.deleteAll(); 327 indexWriter.deleteAll();
72 return; 328 return;
73 case OP_DELETE_DOCUMENTS: 329 case OP_DELETE_DOCUMENTS:
74 indexWriter.deleteDocuments( logFile.readQuery() ); 330 indexWriter.deleteDocuments( log.readQuery() );
75 return; 331 return;
76 case OP_ADD_DOCUMENT: 332 case OP_ADD_DOCUMENT:
77 indexWriter.addDocument( logFile.readMap() ); 333 {
78 return; 334 Map storedFields = log.readMap();
335 indexWriter.addDocument(storedFields);
336 return;
337 }
79 case OP_UPDATE_DOCUMENT: 338 case OP_UPDATE_DOCUMENT:
80 indexWriter.updateDocument( logFile.readUTF(), logFile.readMap() ); 339 {
81 return; 340 String keyFieldName = log.readUTF();
341 Map storedFields = log.readMap();
342 indexWriter.updateDocument(keyFieldName,storedFields);
343 return;
344 }
82 default: 345 default:
83 throw new RuntimeException("invalid op "+op); 346 throw new RuntimeException("invalid op "+op);
84 } 347 }
85 } 348 }
86 349
87 public void playLog() throws IOException { 350 private static void dump(LuceneIndexWriter indexWriter) throws IOException {
88 logFile.gotoStart(); 351 IndexReader reader = indexWriter.openReader();
89 while( logFile.hasMore() ) { 352 IndexSearcher searcher = new IndexSearcher(reader);
90 playOp(); 353 Query query = new MatchAllDocsQuery();
91 } 354 TopDocs td = searcher.search(query,100);
92 indexWriter.commit(); 355 System.out.println("totalHits = "+td.totalHits);
93 } 356 for( int i=0; i<td.scoreDocs.length; i++ ) {
357 Document doc = searcher.doc(td.scoreDocs[i].doc);
358 System.out.println(LuceneUtils.toMap(doc));
359 }
360 System.out.println();
361 reader.close();
362 }
363
94 } 364 }