1488
|
1 package goodjava.lucene.backup;
|
|
2
|
|
3 import java.io.File;
|
1499
|
4 import java.io.InputStream;
|
1488
|
5 import java.io.IOException;
|
1499
|
6 import java.net.Socket;
|
1488
|
7 import java.util.List;
|
|
8 import java.util.ArrayList;
|
1499
|
9 import java.util.Map;
|
|
10 import java.util.HashMap;
|
|
11 import java.util.Arrays;
|
|
12 import javax.net.ssl.SSLSocketFactory;
|
|
13 import javax.net.ssl.SSLSocket;
|
1488
|
14 import goodjava.io.IoUtils;
|
1499
|
15 import goodjava.rpc.RpcClient;
|
|
16 import goodjava.rpc.RpcCall;
|
|
17 import goodjava.rpc.RpcResult;
|
|
18 import goodjava.rpc.RpcException;
|
1488
|
19 import goodjava.lucene.api.LuceneIndexWriter;
|
1499
|
20 import goodjava.logging.Logger;
|
|
21 import goodjava.logging.LoggerFactory;
|
1488
|
22 import goodjava.lucene.logging.LoggingIndexWriter;
|
|
23 import goodjava.lucene.logging.LogFile;
|
|
24
|
|
25
|
|
26 public class BackupIndexWriter extends LoggingIndexWriter {
|
1499
|
27 private static final Logger logger = LoggerFactory.getLogger(BackupIndexWriter.class);
|
|
28 public static String[] backupDomains;
|
1488
|
29 private final String name;
|
|
30 private final File dir;
|
1499
|
31 private boolean isSyncPending = false;
|
1488
|
32
|
|
33 public BackupIndexWriter(LuceneIndexWriter indexWriter,File logDir,String name) throws IOException {
|
|
34 super(indexWriter,logDir);
|
1499
|
35 if( backupDomains == null )
|
|
36 throw new RuntimeException("must set backupDomains");
|
1488
|
37 this.name = name;
|
|
38 File f = new File(System.getProperty("java.io.tmpdir"));
|
|
39 dir = new File(f,"goodjava.lucene/"+name);
|
|
40 dir.mkdirs();
|
|
41 }
|
|
42
|
|
43 public synchronized void commit() throws IOException {
|
|
44 super.commit();
|
1499
|
45 //sync();
|
|
46 if( !isSyncPending ) {
|
|
47 new Thread(sync).start();
|
|
48 isSyncPending = true;
|
1488
|
49 }
|
|
50 }
|
|
51
|
1499
|
52 public void runSync() {
|
|
53 sync.run();
|
|
54 }
|
|
55
|
|
56 private final Runnable sync = new Runnable() {
|
|
57 public synchronized void run() {
|
|
58 try {
|
|
59 sync();
|
|
60 } catch(IOException e) {
|
|
61 throw new RuntimeException(e);
|
|
62 }
|
|
63 }
|
|
64 };
|
|
65
|
|
66 private void sync() throws IOException {
|
|
67 List<LogFile> logs = new ArrayList<LogFile>();
|
|
68 synchronized(this) {
|
|
69 isSyncPending = false;
|
|
70 for( File f : dir.listFiles() ) {
|
|
71 IoUtils.delete(f);
|
|
72 }
|
|
73 for( LogFile log : this.logs ) {
|
|
74 File f = new File(dir,log.file.getName());
|
|
75 IoUtils.link(log.file,f);
|
|
76 logs.add( new LogFile(f) );
|
|
77 }
|
|
78 }
|
|
79 List logInfo = new ArrayList();
|
|
80 Map<String,LogFile> logMap = new HashMap<String,LogFile>();
|
|
81 for( LogFile log : logs ) {
|
|
82 Map fileInfo = new HashMap();
|
|
83 fileInfo.put("name",log.file.getName());
|
|
84 fileInfo.put("end",log.end());
|
|
85 logInfo.add(fileInfo);
|
|
86 logMap.put(log.file.getName(),log);
|
|
87 }
|
|
88 for( String backupDomain : backupDomains ) {
|
|
89 RpcClient rpc = rpcClient(backupDomain);
|
|
90 RpcCall call = new RpcCall("check",name,logInfo);
|
|
91 try {
|
|
92 while(true) {
|
|
93 rpc.write(call);
|
|
94 RpcResult result = rpc.read();
|
|
95 logger.info(Arrays.asList(result.returnValues).toString());
|
|
96 String status = (String)result.returnValues[0];
|
|
97 if( status.equals("ok") ) {
|
|
98 break;
|
|
99 } else if( status.equals("missing") ) {
|
|
100 String fileName = (String)result.returnValues[1];
|
|
101 LogFile log = logMap.get(fileName);
|
|
102 long len = log.end() - 8;
|
|
103 InputStream in = log.input();
|
|
104 call = new RpcCall(in,len,"add",name,logInfo,fileName);
|
|
105 } else if( status.equals("incomplete") ) {
|
|
106 String fileName = (String)result.returnValues[1];
|
|
107 long logEnd = (Long)result.returnValues[2];
|
|
108 LogFile log = logMap.get(fileName);
|
|
109 long len = log.end() - logEnd;
|
|
110 InputStream in = log.input();
|
|
111 in.skip(logEnd-8);
|
|
112 call = new RpcCall(in,len,"append",name,logInfo,fileName);
|
|
113 } else
|
|
114 throw new RuntimeException("status "+status);
|
|
115 }
|
|
116 } catch(RpcException e) {
|
|
117 logger.warn("",e);
|
|
118 }
|
|
119 rpc.close();
|
|
120 }
|
|
121 }
|
|
122
|
|
123 static RpcClient rpcClient(String backupDomain) throws IOException {
|
|
124 Socket socket;
|
|
125 if( BackupServer.cipherSuites == null ) {
|
|
126 socket = new Socket(backupDomain,BackupServer.port);
|
|
127 } else {
|
|
128 socket = SSLSocketFactory.getDefault().createSocket(backupDomain,BackupServer.port);
|
|
129 ((SSLSocket)socket).setEnabledCipherSuites(BackupServer.cipherSuites);
|
|
130 }
|
|
131 return new RpcClient(socket);
|
|
132 }
|
1488
|
133 }
|