1461
|
1 package goodjava.lucene.logging;
|
|
2
|
1465
|
3 import java.io.File;
|
|
4 import java.io.RandomAccessFile;
|
|
5 import java.io.ByteArrayOutputStream;
|
|
6 import java.io.DataOutputStream;
|
|
7 import java.io.DataInputStream;
|
|
8 import java.io.FileInputStream;
|
1461
|
9 import java.io.IOException;
|
|
10 import java.util.Map;
|
1465
|
11 import java.util.Set;
|
|
12 import java.util.HashSet;
|
|
13 import java.util.List;
|
|
14 import java.util.ArrayList;
|
|
15 import java.util.Random;
|
|
16 import org.apache.lucene.document.Document;
|
|
17 import org.apache.lucene.index.DirectoryReader;
|
|
18 import org.apache.lucene.index.IndexReader;
|
|
19 import org.apache.lucene.search.IndexSearcher;
|
1461
|
20 import org.apache.lucene.search.Query;
|
1465
|
21 import org.apache.lucene.search.MatchAllDocsQuery;
|
|
22 import org.apache.lucene.search.TopDocs;
|
|
23 import org.apache.lucene.store.Directory;
|
|
24 import org.apache.lucene.store.FSDirectory;
|
1473
|
25 import goodjava.io.IoUtils;
|
1461
|
26 import goodjava.lucene.api.GoodIndexWriter;
|
1465
|
27 import goodjava.lucene.api.LuceneIndexWriter;
|
|
28 import goodjava.lucene.api.GoodCollector;
|
|
29 import goodjava.lucene.api.LuceneUtils;
|
|
30 import goodjava.logging.Logger;
|
|
31 import goodjava.logging.LoggerFactory;
|
1461
|
32
|
|
33
|
1465
|
34 public final class LoggingIndexWriter implements GoodIndexWriter {
|
|
35 private static final Logger logger = LoggerFactory.getLogger(LoggingIndexWriter.class);
|
|
36 private static final int version = 1;
|
1461
|
37 private static final int OP_DELETE_ALL = 1;
|
|
38 private static final int OP_DELETE_DOCUMENTS = 2;
|
|
39 private static final int OP_ADD_DOCUMENT = 3;
|
|
40 private static final int OP_UPDATE_DOCUMENT = 4;
|
1465
|
41 private static final Random rnd = new Random();
|
1461
|
42
|
1465
|
43 public final LuceneIndexWriter indexWriter;
|
|
44 private final File logDir;
|
|
45 private final List<LogFile> logs = new ArrayList<LogFile>();
|
|
46 private final File index;
|
|
47 private boolean isMerging = false;
|
1461
|
48
|
1465
|
49 public LoggingIndexWriter(LuceneIndexWriter indexWriter,File logDir) throws IOException {
|
1461
|
50 this.indexWriter = indexWriter;
|
1465
|
51 this.logDir = logDir;
|
|
52 logDir.mkdirs();
|
|
53 if( !logDir.isDirectory() )
|
|
54 throw new RuntimeException();
|
|
55 index = new File(logDir,"index");
|
|
56 if( index.exists() ) {
|
|
57 DataInputStream dis = new DataInputStream(new FileInputStream(index));
|
|
58 try {
|
|
59 if( dis.readInt() == version ) {
|
|
60 final int n = dis.readInt();
|
|
61 for( int i=0; i<n; i++ ) {
|
|
62 File file = new File( logDir, dis.readUTF() );
|
|
63 logs.add( new LogFile(file,"rwd") );
|
|
64 }
|
|
65 deleteUnusedFiles();
|
|
66 log().gotoEnd();
|
|
67 return;
|
|
68 }
|
|
69 } finally {
|
|
70 dis.close();
|
|
71 }
|
|
72 }
|
|
73 for( int i=0; i<2; i++ ) {
|
|
74 logs.add( newLogFile() );
|
|
75 }
|
|
76 isMerging = true;
|
|
77 new Thread(new Runnable(){public void run(){
|
|
78 try {
|
|
79 logLucene( System.currentTimeMillis(), logs.get(0), indexWriter );
|
|
80 synchronized(LoggingIndexWriter.this) {
|
|
81 writeIndex();
|
|
82 }
|
|
83 } catch(IOException e) {
|
|
84 throw new RuntimeException(e);
|
|
85 } finally {
|
|
86 synchronized(LoggingIndexWriter.this) {
|
|
87 isMerging = false;
|
|
88 }
|
|
89 }
|
|
90 }}).start();
|
1461
|
91 }
|
|
92
|
1465
|
93 private static void logLucene(long time,LogFile log,LuceneIndexWriter indexWriter) throws IOException {
|
|
94 IndexReader reader = indexWriter.openReader();
|
|
95 final IndexSearcher searcher = new IndexSearcher(reader);
|
|
96 Query query = new MatchAllDocsQuery();
|
|
97 searcher.search( query, new GoodCollector(){
|
|
98 public void collectDoc(int iDoc) throws IOException {
|
|
99 Document doc = searcher.doc(iDoc);
|
|
100 Map<String,Object> storedFields = LuceneUtils.toMap(doc);
|
|
101 log.writeLong(time);
|
|
102 log.writeByte(OP_ADD_DOCUMENT);
|
|
103 log.writeMap(storedFields);
|
|
104 }
|
|
105 });
|
|
106 reader.close();
|
|
107 log.commit();
|
|
108 }
|
|
109
|
|
110 private LogFile newLogFile() throws IOException {
|
|
111 File file;
|
|
112 do {
|
|
113 file = new File(logDir,"_"+rnd.nextInt(100)+".log");
|
|
114 } while( file.exists() );
|
|
115 return new LogFile(file,"rwd");
|
1461
|
116 }
|
|
117
|
1473
|
118 private void deleteUnusedFiles() throws IOException {
|
1465
|
119 Set<String> used = new HashSet<String>();
|
|
120 used.add( index.getName() );
|
|
121 for( LogFile lf : logs ) {
|
|
122 used.add( lf.file.getName() );
|
|
123 }
|
|
124 for( File f : logDir.listFiles() ) {
|
|
125 if( !used.contains(f.getName()) ) {
|
|
126 deleteFile(f);
|
|
127 }
|
|
128 }
|
1461
|
129 }
|
|
130
|
1473
|
131 private static void deleteFile(File file) throws IOException {
|
1465
|
132 if( file.isDirectory() ) {
|
|
133 for( File f : file.listFiles() ) {
|
|
134 deleteFile(f);
|
|
135 }
|
|
136 }
|
1473
|
137 IoUtils.delete(file);
|
1465
|
138 }
|
|
139
|
|
140 private void writeIndex() throws IOException {
|
|
141 ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
|
142 DataOutputStream dos = new DataOutputStream(baos);
|
|
143 dos.writeInt(version);
|
|
144 dos.writeInt(logs.size());
|
|
145 for( LogFile lf : logs ) {
|
|
146 String fileName = lf.file.getName();
|
|
147 dos.writeUTF(fileName);
|
|
148 }
|
|
149 dos.close();
|
|
150 RandomAccessFile raf = new RandomAccessFile( index, "rwd" );
|
|
151 raf.write( baos.toByteArray() );
|
|
152 raf.close();
|
|
153 deleteUnusedFiles();
|
|
154 logger.info("writeIndex "+logs.toString());
|
1461
|
155 }
|
|
156
|
1465
|
157 private void mergeLogs() throws IOException {
|
|
158 logger.info("merge");
|
|
159 LogFile first = logs.get(0);
|
|
160 LogFile second = logs.get(1);
|
|
161 second.gotoEnd();
|
|
162 long lastTime = second.readLong();
|
|
163 File dirFile = new File(logDir,"merge");
|
|
164 if( dirFile.exists() )
|
|
165 throw new RuntimeException();
|
|
166 Directory dir = FSDirectory.open(dirFile);
|
|
167 LuceneIndexWriter mergeWriter = new LuceneIndexWriter( indexWriter.luceneVersion, dir, indexWriter.goodConfig );
|
|
168 playLog(first,mergeWriter);
|
|
169 playLog(second,mergeWriter);
|
|
170 mergeWriter.commit();
|
|
171 LogFile merge = newLogFile();
|
|
172 logLucene( lastTime, merge, mergeWriter );
|
|
173 mergeWriter.close();
|
|
174 synchronized(this) {
|
|
175 check();
|
|
176 logs.remove(0);
|
|
177 logs.set(0,merge);
|
|
178 writeIndex();
|
|
179 check();
|
|
180 }
|
1461
|
181 }
|
1465
|
182 private final Runnable mergeLogs = new Runnable() { public void run() {
|
|
183 try {
|
|
184 mergeLogs();
|
|
185 /*
|
|
186 } catch(IOException e) {
|
|
187 throw new RuntimeException(e);
|
|
188 */
|
|
189 } catch(Exception e) {
|
|
190 e.printStackTrace();
|
|
191 System.exit(-1);
|
|
192 } finally {
|
|
193 synchronized(LoggingIndexWriter.this) {
|
|
194 isMerging = false;
|
|
195 }
|
|
196 }
|
|
197 } };
|
1461
|
198
|
1465
|
199 private void check() throws IOException {
|
|
200 File dirFile = new File(logDir,"check");
|
|
201 if( dirFile.exists() )
|
|
202 throw new RuntimeException();
|
|
203 Directory dir = FSDirectory.open(dirFile);
|
|
204 LuceneIndexWriter checkWriter = new LuceneIndexWriter( indexWriter.luceneVersion, dir, indexWriter.goodConfig );
|
|
205 playLog(checkWriter);
|
|
206 int nCheck = numDocs(checkWriter);
|
|
207 int nOrig = numDocs(indexWriter);
|
|
208 if( nCheck != nOrig ) {
|
|
209 logger.error("nCheck = "+nCheck);
|
|
210 logger.error("nOrig = "+nOrig);
|
|
211 //new Exception().printStackTrace();
|
|
212 Thread.dumpStack();
|
|
213 System.out.println();
|
|
214 System.out.println("indexWriter");
|
|
215 dump(indexWriter);
|
|
216 System.out.println("checkWriter");
|
|
217 dump(checkWriter);
|
|
218 System.exit(-1);
|
|
219 }
|
|
220 checkWriter.close();
|
|
221 deleteFile(dirFile);
|
1461
|
222 }
|
|
223
|
1465
|
224 private LogFile log() {
|
|
225 return logs.get(logs.size()-1);
|
|
226 }
|
|
227
|
|
228 public synchronized void close() throws IOException {
|
|
229 indexWriter.close();
|
|
230 LogFile log = log();
|
|
231 log.commit();
|
|
232 }
|
|
233
|
|
234 public synchronized void commit() throws IOException {
|
|
235 indexWriter.commit();
|
|
236 LogFile log = log();
|
|
237 log.commit();
|
|
238 if( isMerging )
|
|
239 return;
|
|
240 if( log.length() > logs.get(0).length() ) {
|
|
241 log.writeLong( System.currentTimeMillis() );
|
|
242 logs.add( newLogFile() );
|
|
243 writeIndex();
|
|
244 }
|
|
245 if( logs.size() > 3 ) {
|
|
246 isMerging = true;
|
|
247 // new Thread(mergeLogs).start();
|
|
248 mergeLogs.run();
|
|
249 }
|
1461
|
250 }
|
|
251
|
1465
|
252 public synchronized void rollback() throws IOException {
|
|
253 indexWriter.rollback();
|
|
254 LogFile log = log();
|
|
255 log.gotoEnd();
|
|
256 }
|
|
257
|
|
258 public synchronized void deleteAll() throws IOException {
|
|
259 indexWriter.deleteAll();
|
|
260 LogFile log = log();
|
|
261 writeOp(log,OP_DELETE_ALL);
|
1461
|
262 }
|
|
263
|
1465
|
264 public synchronized void deleteDocuments(Query query) throws IOException {
|
|
265 indexWriter.deleteDocuments(query);
|
|
266 LogFile log = log();
|
|
267 writeOp(log,OP_DELETE_DOCUMENTS);
|
|
268 log.writeQuery(query);
|
|
269 }
|
|
270
|
|
271 public synchronized void addDocument(Map<String,Object> storedFields) throws IOException {
|
|
272 indexWriter.addDocument(storedFields);
|
|
273 LogFile log = log();
|
|
274 writeOp(log,OP_ADD_DOCUMENT);
|
|
275 log.writeMap(storedFields);
|
|
276 }
|
|
277
|
|
278 public synchronized void updateDocument(String keyFieldName,Map<String,Object> storedFields) throws IOException {
|
|
279 indexWriter.updateDocument(keyFieldName,storedFields);
|
|
280 LogFile log = log();
|
|
281 writeOp(log,OP_UPDATE_DOCUMENT);
|
|
282 log.writeUTF(keyFieldName);
|
|
283 log.writeMap(storedFields);
|
|
284 }
|
|
285
|
|
286 public synchronized void reindexDocuments(String keyFieldName,Query query) throws IOException {
|
1461
|
287 indexWriter.reindexDocuments(keyFieldName,query);
|
|
288 }
|
|
289
|
1465
|
290 private void writeOp(LogFile log,int op) throws IOException {
|
|
291 log.writeLong(System.currentTimeMillis());
|
|
292 log.writeByte(op);
|
|
293 }
|
|
294
|
|
295 public synchronized void playLog() throws IOException {
|
|
296 playLog(indexWriter);
|
|
297 }
|
|
298
|
|
299 private void playLog(LuceneIndexWriter indexWriter) throws IOException {
|
|
300 if( numDocs(indexWriter) != 0 )
|
|
301 throw new RuntimeException ("not empty");
|
|
302 for( LogFile log : logs ) {
|
|
303 playLog(log,indexWriter);
|
|
304 }
|
|
305 indexWriter.commit();
|
|
306 }
|
|
307
|
|
308 private static int numDocs(LuceneIndexWriter indexWriter) throws IOException {
|
|
309 IndexReader reader = indexWriter.openReader();
|
|
310 int n = reader.numDocs();
|
|
311 reader.close();
|
|
312 return n;
|
|
313 }
|
|
314
|
|
315 private static void playLog(LogFile log,LuceneIndexWriter indexWriter) throws IOException {
|
|
316 log.gotoStart();
|
|
317 while( log.hasMore() ) {
|
|
318 playOp(log,indexWriter);
|
|
319 }
|
|
320 }
|
|
321
|
|
322 private static void playOp(LogFile log,LuceneIndexWriter indexWriter) throws IOException {
|
|
323 log.readLong(); // time
|
|
324 int op = log.readByte();
|
1461
|
325 switch(op) {
|
|
326 case OP_DELETE_ALL:
|
|
327 indexWriter.deleteAll();
|
|
328 return;
|
|
329 case OP_DELETE_DOCUMENTS:
|
1465
|
330 indexWriter.deleteDocuments( log.readQuery() );
|
1461
|
331 return;
|
|
332 case OP_ADD_DOCUMENT:
|
1465
|
333 {
|
|
334 Map storedFields = log.readMap();
|
|
335 indexWriter.addDocument(storedFields);
|
|
336 return;
|
|
337 }
|
1461
|
338 case OP_UPDATE_DOCUMENT:
|
1465
|
339 {
|
|
340 String keyFieldName = log.readUTF();
|
|
341 Map storedFields = log.readMap();
|
|
342 indexWriter.updateDocument(keyFieldName,storedFields);
|
|
343 return;
|
|
344 }
|
1461
|
345 default:
|
|
346 throw new RuntimeException("invalid op "+op);
|
|
347 }
|
|
348 }
|
|
349
|
1465
|
350 private static void dump(LuceneIndexWriter indexWriter) throws IOException {
|
|
351 IndexReader reader = indexWriter.openReader();
|
|
352 IndexSearcher searcher = new IndexSearcher(reader);
|
|
353 Query query = new MatchAllDocsQuery();
|
|
354 TopDocs td = searcher.search(query,100);
|
|
355 System.out.println("totalHits = "+td.totalHits);
|
|
356 for( int i=0; i<td.scoreDocs.length; i++ ) {
|
|
357 Document doc = searcher.doc(td.scoreDocs[i].doc);
|
|
358 System.out.println(LuceneUtils.toMap(doc));
|
1461
|
359 }
|
1465
|
360 System.out.println();
|
|
361 reader.close();
|
1461
|
362 }
|
1465
|
363
|
1461
|
364 }
|