1461
|
1 package goodjava.lucene.logging;
|
|
2
|
1465
|
3 import java.io.File;
|
|
4 import java.io.RandomAccessFile;
|
|
5 import java.io.ByteArrayOutputStream;
|
|
6 import java.io.DataOutputStream;
|
|
7 import java.io.DataInputStream;
|
|
8 import java.io.FileInputStream;
|
1461
|
9 import java.io.IOException;
|
|
10 import java.util.Map;
|
1465
|
11 import java.util.Set;
|
|
12 import java.util.HashSet;
|
|
13 import java.util.List;
|
|
14 import java.util.ArrayList;
|
|
15 import java.util.Random;
|
|
16 import org.apache.lucene.document.Document;
|
|
17 import org.apache.lucene.index.DirectoryReader;
|
|
18 import org.apache.lucene.index.IndexReader;
|
|
19 import org.apache.lucene.search.IndexSearcher;
|
1461
|
20 import org.apache.lucene.search.Query;
|
1465
|
21 import org.apache.lucene.search.MatchAllDocsQuery;
|
|
22 import org.apache.lucene.search.TopDocs;
|
|
23 import org.apache.lucene.store.Directory;
|
|
24 import org.apache.lucene.store.FSDirectory;
|
1473
|
25 import goodjava.io.IoUtils;
|
1461
|
26 import goodjava.lucene.api.GoodIndexWriter;
|
1465
|
27 import goodjava.lucene.api.LuceneIndexWriter;
|
|
28 import goodjava.lucene.api.GoodCollector;
|
|
29 import goodjava.lucene.api.LuceneUtils;
|
|
30 import goodjava.logging.Logger;
|
|
31 import goodjava.logging.LoggerFactory;
|
1461
|
32
|
|
33
|
1465
|
34 public final class LoggingIndexWriter implements GoodIndexWriter {
|
|
35 private static final Logger logger = LoggerFactory.getLogger(LoggingIndexWriter.class);
|
|
36 private static final int version = 1;
|
1461
|
37 private static final int OP_DELETE_ALL = 1;
|
|
38 private static final int OP_DELETE_DOCUMENTS = 2;
|
|
39 private static final int OP_ADD_DOCUMENT = 3;
|
|
40 private static final int OP_UPDATE_DOCUMENT = 4;
|
1465
|
41 private static final Random rnd = new Random();
|
1461
|
42
|
1465
|
43 public final LuceneIndexWriter indexWriter;
|
|
44 private final File logDir;
|
|
45 private final List<LogFile> logs = new ArrayList<LogFile>();
|
|
46 private final File index;
|
|
47 private boolean isMerging = false;
|
1461
|
48
|
1465
|
49 public LoggingIndexWriter(LuceneIndexWriter indexWriter,File logDir) throws IOException {
|
1461
|
50 this.indexWriter = indexWriter;
|
1465
|
51 this.logDir = logDir;
|
|
52 logDir.mkdirs();
|
|
53 if( !logDir.isDirectory() )
|
|
54 throw new RuntimeException();
|
|
55 index = new File(logDir,"index");
|
|
56 if( index.exists() ) {
|
|
57 DataInputStream dis = new DataInputStream(new FileInputStream(index));
|
|
58 try {
|
|
59 if( dis.readInt() == version ) {
|
|
60 final int n = dis.readInt();
|
|
61 for( int i=0; i<n; i++ ) {
|
|
62 File file = new File( logDir, dis.readUTF() );
|
|
63 logs.add( new LogFile(file,"rwd") );
|
|
64 }
|
|
65 deleteUnusedFiles();
|
|
66 log().gotoEnd();
|
|
67 return;
|
|
68 }
|
|
69 } finally {
|
|
70 dis.close();
|
|
71 }
|
|
72 }
|
|
73 for( int i=0; i<2; i++ ) {
|
|
74 logs.add( newLogFile() );
|
|
75 }
|
|
76 isMerging = true;
|
|
77 new Thread(new Runnable(){public void run(){
|
|
78 try {
|
|
79 logLucene( System.currentTimeMillis(), logs.get(0), indexWriter );
|
|
80 synchronized(LoggingIndexWriter.this) {
|
|
81 writeIndex();
|
|
82 }
|
|
83 } catch(IOException e) {
|
|
84 throw new RuntimeException(e);
|
|
85 } finally {
|
|
86 synchronized(LoggingIndexWriter.this) {
|
|
87 isMerging = false;
|
|
88 }
|
|
89 }
|
|
90 }}).start();
|
1461
|
91 }
|
|
92
|
1465
|
93 private static void logLucene(long time,LogFile log,LuceneIndexWriter indexWriter) throws IOException {
|
|
94 IndexReader reader = indexWriter.openReader();
|
|
95 final IndexSearcher searcher = new IndexSearcher(reader);
|
|
96 Query query = new MatchAllDocsQuery();
|
|
97 searcher.search( query, new GoodCollector(){
|
|
98 public void collectDoc(int iDoc) throws IOException {
|
|
99 Document doc = searcher.doc(iDoc);
|
|
100 Map<String,Object> storedFields = LuceneUtils.toMap(doc);
|
|
101 log.writeLong(time);
|
|
102 log.writeByte(OP_ADD_DOCUMENT);
|
|
103 log.writeMap(storedFields);
|
|
104 }
|
|
105 });
|
|
106 reader.close();
|
|
107 log.commit();
|
|
108 }
|
|
109
|
|
110 private LogFile newLogFile() throws IOException {
|
|
111 File file;
|
|
112 do {
|
|
113 file = new File(logDir,"_"+rnd.nextInt(100)+".log");
|
|
114 } while( file.exists() );
|
|
115 return new LogFile(file,"rwd");
|
1461
|
116 }
|
|
117
|
1473
|
118 private void deleteUnusedFiles() throws IOException {
|
1465
|
119 Set<String> used = new HashSet<String>();
|
|
120 used.add( index.getName() );
|
|
121 for( LogFile lf : logs ) {
|
|
122 used.add( lf.file.getName() );
|
|
123 }
|
|
124 for( File f : logDir.listFiles() ) {
|
|
125 if( !used.contains(f.getName()) ) {
|
1475
|
126 IoUtils.deleteRecursively(f);
|
1465
|
127 }
|
|
128 }
|
1461
|
129 }
|
|
130
|
1465
|
131 private void writeIndex() throws IOException {
|
|
132 ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
|
133 DataOutputStream dos = new DataOutputStream(baos);
|
|
134 dos.writeInt(version);
|
|
135 dos.writeInt(logs.size());
|
|
136 for( LogFile lf : logs ) {
|
|
137 String fileName = lf.file.getName();
|
|
138 dos.writeUTF(fileName);
|
|
139 }
|
|
140 dos.close();
|
|
141 RandomAccessFile raf = new RandomAccessFile( index, "rwd" );
|
|
142 raf.write( baos.toByteArray() );
|
|
143 raf.close();
|
|
144 deleteUnusedFiles();
|
|
145 logger.info("writeIndex "+logs.toString());
|
1461
|
146 }
|
|
147
|
1465
|
148 private void mergeLogs() throws IOException {
|
|
149 logger.info("merge");
|
|
150 LogFile first = logs.get(0);
|
|
151 LogFile second = logs.get(1);
|
|
152 second.gotoEnd();
|
|
153 long lastTime = second.readLong();
|
|
154 File dirFile = new File(logDir,"merge");
|
|
155 if( dirFile.exists() )
|
|
156 throw new RuntimeException();
|
|
157 Directory dir = FSDirectory.open(dirFile);
|
|
158 LuceneIndexWriter mergeWriter = new LuceneIndexWriter( indexWriter.luceneVersion, dir, indexWriter.goodConfig );
|
|
159 playLog(first,mergeWriter);
|
|
160 playLog(second,mergeWriter);
|
|
161 mergeWriter.commit();
|
|
162 LogFile merge = newLogFile();
|
|
163 logLucene( lastTime, merge, mergeWriter );
|
|
164 mergeWriter.close();
|
|
165 synchronized(this) {
|
|
166 check();
|
|
167 logs.remove(0);
|
|
168 logs.set(0,merge);
|
|
169 writeIndex();
|
|
170 check();
|
|
171 }
|
1461
|
172 }
|
1465
|
173 private final Runnable mergeLogs = new Runnable() { public void run() {
|
|
174 try {
|
|
175 mergeLogs();
|
|
176 /*
|
|
177 } catch(IOException e) {
|
|
178 throw new RuntimeException(e);
|
|
179 */
|
|
180 } catch(Exception e) {
|
|
181 e.printStackTrace();
|
|
182 System.exit(-1);
|
|
183 } finally {
|
|
184 synchronized(LoggingIndexWriter.this) {
|
|
185 isMerging = false;
|
|
186 }
|
|
187 }
|
|
188 } };
|
1461
|
189
|
1465
|
190 private void check() throws IOException {
|
|
191 File dirFile = new File(logDir,"check");
|
|
192 if( dirFile.exists() )
|
|
193 throw new RuntimeException();
|
|
194 Directory dir = FSDirectory.open(dirFile);
|
|
195 LuceneIndexWriter checkWriter = new LuceneIndexWriter( indexWriter.luceneVersion, dir, indexWriter.goodConfig );
|
|
196 playLog(checkWriter);
|
|
197 int nCheck = numDocs(checkWriter);
|
|
198 int nOrig = numDocs(indexWriter);
|
|
199 if( nCheck != nOrig ) {
|
|
200 logger.error("nCheck = "+nCheck);
|
|
201 logger.error("nOrig = "+nOrig);
|
|
202 //new Exception().printStackTrace();
|
|
203 Thread.dumpStack();
|
|
204 System.out.println();
|
|
205 System.out.println("indexWriter");
|
|
206 dump(indexWriter);
|
|
207 System.out.println("checkWriter");
|
|
208 dump(checkWriter);
|
|
209 System.exit(-1);
|
|
210 }
|
|
211 checkWriter.close();
|
1475
|
212 IoUtils.deleteRecursively(dirFile);
|
1461
|
213 }
|
|
214
|
1465
|
215 private LogFile log() {
|
|
216 return logs.get(logs.size()-1);
|
|
217 }
|
|
218
|
|
219 public synchronized void close() throws IOException {
|
|
220 indexWriter.close();
|
|
221 LogFile log = log();
|
|
222 log.commit();
|
|
223 }
|
|
224
|
|
225 public synchronized void commit() throws IOException {
|
|
226 indexWriter.commit();
|
|
227 LogFile log = log();
|
|
228 log.commit();
|
|
229 if( isMerging )
|
|
230 return;
|
|
231 if( log.length() > logs.get(0).length() ) {
|
|
232 log.writeLong( System.currentTimeMillis() );
|
|
233 logs.add( newLogFile() );
|
|
234 writeIndex();
|
|
235 }
|
|
236 if( logs.size() > 3 ) {
|
|
237 isMerging = true;
|
|
238 // new Thread(mergeLogs).start();
|
|
239 mergeLogs.run();
|
|
240 }
|
1461
|
241 }
|
|
242
|
1465
|
243 public synchronized void rollback() throws IOException {
|
|
244 indexWriter.rollback();
|
|
245 LogFile log = log();
|
|
246 log.gotoEnd();
|
|
247 }
|
|
248
|
|
249 public synchronized void deleteAll() throws IOException {
|
|
250 indexWriter.deleteAll();
|
|
251 LogFile log = log();
|
|
252 writeOp(log,OP_DELETE_ALL);
|
1461
|
253 }
|
|
254
|
1465
|
255 public synchronized void deleteDocuments(Query query) throws IOException {
|
|
256 indexWriter.deleteDocuments(query);
|
|
257 LogFile log = log();
|
|
258 writeOp(log,OP_DELETE_DOCUMENTS);
|
|
259 log.writeQuery(query);
|
|
260 }
|
|
261
|
|
262 public synchronized void addDocument(Map<String,Object> storedFields) throws IOException {
|
|
263 indexWriter.addDocument(storedFields);
|
|
264 LogFile log = log();
|
|
265 writeOp(log,OP_ADD_DOCUMENT);
|
|
266 log.writeMap(storedFields);
|
|
267 }
|
|
268
|
|
269 public synchronized void updateDocument(String keyFieldName,Map<String,Object> storedFields) throws IOException {
|
|
270 indexWriter.updateDocument(keyFieldName,storedFields);
|
|
271 LogFile log = log();
|
|
272 writeOp(log,OP_UPDATE_DOCUMENT);
|
|
273 log.writeUTF(keyFieldName);
|
|
274 log.writeMap(storedFields);
|
|
275 }
|
|
276
|
|
277 public synchronized void reindexDocuments(String keyFieldName,Query query) throws IOException {
|
1461
|
278 indexWriter.reindexDocuments(keyFieldName,query);
|
|
279 }
|
|
280
|
1465
|
281 private void writeOp(LogFile log,int op) throws IOException {
|
|
282 log.writeLong(System.currentTimeMillis());
|
|
283 log.writeByte(op);
|
|
284 }
|
|
285
|
|
286 public synchronized void playLog() throws IOException {
|
|
287 playLog(indexWriter);
|
|
288 }
|
|
289
|
|
290 private void playLog(LuceneIndexWriter indexWriter) throws IOException {
|
|
291 if( numDocs(indexWriter) != 0 )
|
|
292 throw new RuntimeException ("not empty");
|
|
293 for( LogFile log : logs ) {
|
|
294 playLog(log,indexWriter);
|
|
295 }
|
|
296 indexWriter.commit();
|
|
297 }
|
|
298
|
|
299 private static int numDocs(LuceneIndexWriter indexWriter) throws IOException {
|
|
300 IndexReader reader = indexWriter.openReader();
|
|
301 int n = reader.numDocs();
|
|
302 reader.close();
|
|
303 return n;
|
|
304 }
|
|
305
|
|
306 private static void playLog(LogFile log,LuceneIndexWriter indexWriter) throws IOException {
|
|
307 log.gotoStart();
|
|
308 while( log.hasMore() ) {
|
|
309 playOp(log,indexWriter);
|
|
310 }
|
|
311 }
|
|
312
|
|
313 private static void playOp(LogFile log,LuceneIndexWriter indexWriter) throws IOException {
|
|
314 log.readLong(); // time
|
|
315 int op = log.readByte();
|
1461
|
316 switch(op) {
|
|
317 case OP_DELETE_ALL:
|
|
318 indexWriter.deleteAll();
|
|
319 return;
|
|
320 case OP_DELETE_DOCUMENTS:
|
1465
|
321 indexWriter.deleteDocuments( log.readQuery() );
|
1461
|
322 return;
|
|
323 case OP_ADD_DOCUMENT:
|
1465
|
324 {
|
|
325 Map storedFields = log.readMap();
|
|
326 indexWriter.addDocument(storedFields);
|
|
327 return;
|
|
328 }
|
1461
|
329 case OP_UPDATE_DOCUMENT:
|
1465
|
330 {
|
|
331 String keyFieldName = log.readUTF();
|
|
332 Map storedFields = log.readMap();
|
|
333 indexWriter.updateDocument(keyFieldName,storedFields);
|
|
334 return;
|
|
335 }
|
1461
|
336 default:
|
|
337 throw new RuntimeException("invalid op "+op);
|
|
338 }
|
|
339 }
|
|
340
|
1465
|
341 private static void dump(LuceneIndexWriter indexWriter) throws IOException {
|
|
342 IndexReader reader = indexWriter.openReader();
|
|
343 IndexSearcher searcher = new IndexSearcher(reader);
|
|
344 Query query = new MatchAllDocsQuery();
|
|
345 TopDocs td = searcher.search(query,100);
|
|
346 System.out.println("totalHits = "+td.totalHits);
|
|
347 for( int i=0; i<td.scoreDocs.length; i++ ) {
|
|
348 Document doc = searcher.doc(td.scoreDocs[i].doc);
|
|
349 System.out.println(LuceneUtils.toMap(doc));
|
1461
|
350 }
|
1465
|
351 System.out.println();
|
|
352 reader.close();
|
1461
|
353 }
|
1465
|
354
|
1461
|
355 }
|