1488
|
1 package goodjava.lucene.backup;
|
|
2
|
|
3 import java.io.File;
|
1499
|
4 import java.io.InputStream;
|
1488
|
5 import java.io.IOException;
|
1499
|
6 import java.net.Socket;
|
1488
|
7 import java.util.List;
|
|
8 import java.util.ArrayList;
|
1499
|
9 import java.util.Map;
|
|
10 import java.util.HashMap;
|
|
11 import java.util.Arrays;
|
|
12 import javax.net.ssl.SSLSocketFactory;
|
|
13 import javax.net.ssl.SSLSocket;
|
1488
|
14 import goodjava.io.IoUtils;
|
1499
|
15 import goodjava.rpc.RpcClient;
|
|
16 import goodjava.rpc.RpcCall;
|
|
17 import goodjava.rpc.RpcResult;
|
|
18 import goodjava.rpc.RpcException;
|
1488
|
19 import goodjava.lucene.api.LuceneIndexWriter;
|
1499
|
20 import goodjava.logging.Logger;
|
|
21 import goodjava.logging.LoggerFactory;
|
1488
|
22 import goodjava.lucene.logging.LoggingIndexWriter;
|
|
23 import goodjava.lucene.logging.LogFile;
|
|
24
|
|
25
|
|
26 public class BackupIndexWriter extends LoggingIndexWriter {
|
1499
|
27 private static final Logger logger = LoggerFactory.getLogger(BackupIndexWriter.class);
|
|
28 public static String[] backupDomains;
|
1488
|
29 private final String name;
|
|
30 private final File dir;
|
1499
|
31 private boolean isSyncPending = false;
|
1488
|
32
|
|
33 public BackupIndexWriter(LuceneIndexWriter indexWriter,File logDir,String name) throws IOException {
|
|
34 super(indexWriter,logDir);
|
1499
|
35 if( backupDomains == null )
|
|
36 throw new RuntimeException("must set backupDomains");
|
1488
|
37 this.name = name;
|
|
38 File f = new File(System.getProperty("java.io.tmpdir"));
|
|
39 dir = new File(f,"goodjava.lucene/"+name);
|
1501
|
40 IoUtils.mkdirs(dir);
|
1488
|
41 }
|
|
42
|
|
43 public synchronized void commit() throws IOException {
|
|
44 super.commit();
|
1499
|
45 //sync();
|
|
46 if( !isSyncPending ) {
|
1502
|
47 threadPool.execute(sync);
|
1499
|
48 isSyncPending = true;
|
1488
|
49 }
|
|
50 }
|
|
51
|
1499
|
52 public void runSync() {
|
|
53 sync.run();
|
|
54 }
|
|
55
|
|
56 private final Runnable sync = new Runnable() {
|
|
57 public synchronized void run() {
|
|
58 try {
|
|
59 sync();
|
|
60 } catch(IOException e) {
|
|
61 throw new RuntimeException(e);
|
|
62 }
|
|
63 }
|
|
64 };
|
|
65
|
|
66 private void sync() throws IOException {
|
|
67 List<LogFile> logs = new ArrayList<LogFile>();
|
|
68 synchronized(this) {
|
|
69 isSyncPending = false;
|
1500
|
70 clearDir();
|
1499
|
71 for( LogFile log : this.logs ) {
|
|
72 File f = new File(dir,log.file.getName());
|
|
73 IoUtils.link(log.file,f);
|
|
74 logs.add( new LogFile(f) );
|
|
75 }
|
|
76 }
|
|
77 List logInfo = new ArrayList();
|
|
78 Map<String,LogFile> logMap = new HashMap<String,LogFile>();
|
|
79 for( LogFile log : logs ) {
|
|
80 Map fileInfo = new HashMap();
|
|
81 fileInfo.put("name",log.file.getName());
|
|
82 fileInfo.put("end",log.end());
|
|
83 logInfo.add(fileInfo);
|
|
84 logMap.put(log.file.getName(),log);
|
|
85 }
|
|
86 for( String backupDomain : backupDomains ) {
|
|
87 RpcClient rpc = rpcClient(backupDomain);
|
|
88 RpcCall call = new RpcCall("check",name,logInfo);
|
|
89 try {
|
|
90 while(true) {
|
|
91 rpc.write(call);
|
|
92 RpcResult result = rpc.read();
|
|
93 logger.info(Arrays.asList(result.returnValues).toString());
|
|
94 String status = (String)result.returnValues[0];
|
|
95 if( status.equals("ok") ) {
|
|
96 break;
|
|
97 } else if( status.equals("missing") ) {
|
|
98 String fileName = (String)result.returnValues[1];
|
|
99 LogFile log = logMap.get(fileName);
|
|
100 long len = log.end() - 8;
|
|
101 InputStream in = log.input();
|
|
102 call = new RpcCall(in,len,"add",name,logInfo,fileName);
|
|
103 } else if( status.equals("incomplete") ) {
|
|
104 String fileName = (String)result.returnValues[1];
|
|
105 long logEnd = (Long)result.returnValues[2];
|
|
106 LogFile log = logMap.get(fileName);
|
|
107 long len = log.end() - logEnd;
|
|
108 InputStream in = log.input();
|
|
109 in.skip(logEnd-8);
|
|
110 call = new RpcCall(in,len,"append",name,logInfo,fileName);
|
|
111 } else
|
|
112 throw new RuntimeException("status "+status);
|
|
113 }
|
|
114 } catch(RpcException e) {
|
|
115 logger.warn("",e);
|
|
116 }
|
|
117 rpc.close();
|
|
118 }
|
1500
|
119 clearDir();
|
|
120 }
|
|
121
|
|
122 private void clearDir() throws IOException {
|
|
123 for( File f : dir.listFiles() ) {
|
|
124 IoUtils.delete(f);
|
|
125 }
|
1499
|
126 }
|
|
127
|
|
128 static RpcClient rpcClient(String backupDomain) throws IOException {
|
|
129 Socket socket;
|
|
130 if( BackupServer.cipherSuites == null ) {
|
|
131 socket = new Socket(backupDomain,BackupServer.port);
|
|
132 } else {
|
|
133 socket = SSLSocketFactory.getDefault().createSocket(backupDomain,BackupServer.port);
|
|
134 ((SSLSocket)socket).setEnabledCipherSuites(BackupServer.cipherSuites);
|
|
135 }
|
|
136 return new RpcClient(socket);
|
|
137 }
|
1488
|
138 }
|