1488
|
1 package goodjava.lucene.backup;
|
|
2
|
|
3 import java.io.File;
|
1499
|
4 import java.io.InputStream;
|
1488
|
5 import java.io.IOException;
|
1499
|
6 import java.net.Socket;
|
1488
|
7 import java.util.List;
|
|
8 import java.util.ArrayList;
|
1499
|
9 import java.util.Map;
|
|
10 import java.util.HashMap;
|
|
11 import java.util.Arrays;
|
1504
|
12 import java.util.concurrent.Executors;
|
|
13 import java.util.concurrent.ExecutorService;
|
1499
|
14 import javax.net.ssl.SSLSocketFactory;
|
|
15 import javax.net.ssl.SSLSocket;
|
1488
|
16 import goodjava.io.IoUtils;
|
1499
|
17 import goodjava.rpc.RpcClient;
|
|
18 import goodjava.rpc.RpcCall;
|
|
19 import goodjava.rpc.RpcResult;
|
|
20 import goodjava.rpc.RpcException;
|
1488
|
21 import goodjava.lucene.api.LuceneIndexWriter;
|
1499
|
22 import goodjava.logging.Logger;
|
|
23 import goodjava.logging.LoggerFactory;
|
1488
|
24 import goodjava.lucene.logging.LoggingIndexWriter;
|
|
25 import goodjava.lucene.logging.LogFile;
|
|
26
|
|
27
|
|
28 public class BackupIndexWriter extends LoggingIndexWriter {
|
1499
|
29 private static final Logger logger = LoggerFactory.getLogger(BackupIndexWriter.class);
|
|
30 public static String[] backupDomains;
|
1488
|
31 private final String name;
|
|
32 private final File dir;
|
1499
|
33 private boolean isSyncPending = false;
|
1504
|
34 private final ExecutorService exec = Executors.newSingleThreadExecutor();
|
1488
|
35
|
|
36 public BackupIndexWriter(LuceneIndexWriter indexWriter,File logDir,String name) throws IOException {
|
|
37 super(indexWriter,logDir);
|
1499
|
38 if( backupDomains == null )
|
|
39 throw new RuntimeException("must set backupDomains");
|
1488
|
40 this.name = name;
|
|
41 File f = new File(System.getProperty("java.io.tmpdir"));
|
|
42 dir = new File(f,"goodjava.lucene/"+name);
|
1501
|
43 IoUtils.mkdirs(dir);
|
1488
|
44 }
|
|
45
|
1504
|
46 public synchronized void close() throws IOException {
|
|
47 super.close();
|
|
48 exec.shutdown();
|
|
49 }
|
|
50
|
1488
|
51 public synchronized void commit() throws IOException {
|
|
52 super.commit();
|
1499
|
53 //sync();
|
|
54 if( !isSyncPending ) {
|
1504
|
55 exec.execute(sync);
|
1499
|
56 isSyncPending = true;
|
1488
|
57 }
|
|
58 }
|
|
59
|
1499
|
60 public void runSync() {
|
1504
|
61 try {
|
|
62 exec.submit(sync).get();
|
|
63 } catch(Exception e) {
|
|
64 throw new RuntimeException(e);
|
|
65 }
|
1499
|
66 }
|
|
67
|
|
68 private final Runnable sync = new Runnable() {
|
1504
|
69 public void run() {
|
1499
|
70 try {
|
|
71 sync();
|
|
72 } catch(IOException e) {
|
|
73 throw new RuntimeException(e);
|
|
74 }
|
|
75 }
|
|
76 };
|
|
77
|
|
78 private void sync() throws IOException {
|
|
79 List<LogFile> logs = new ArrayList<LogFile>();
|
|
80 synchronized(this) {
|
|
81 isSyncPending = false;
|
1500
|
82 clearDir();
|
1499
|
83 for( LogFile log : this.logs ) {
|
|
84 File f = new File(dir,log.file.getName());
|
|
85 IoUtils.link(log.file,f);
|
|
86 logs.add( new LogFile(f) );
|
|
87 }
|
|
88 }
|
|
89 List logInfo = new ArrayList();
|
|
90 Map<String,LogFile> logMap = new HashMap<String,LogFile>();
|
|
91 for( LogFile log : logs ) {
|
|
92 Map fileInfo = new HashMap();
|
|
93 fileInfo.put("name",log.file.getName());
|
|
94 fileInfo.put("end",log.end());
|
|
95 logInfo.add(fileInfo);
|
|
96 logMap.put(log.file.getName(),log);
|
|
97 }
|
|
98 for( String backupDomain : backupDomains ) {
|
|
99 RpcClient rpc = rpcClient(backupDomain);
|
|
100 RpcCall call = new RpcCall("check",name,logInfo);
|
|
101 try {
|
|
102 while(true) {
|
|
103 rpc.write(call);
|
|
104 RpcResult result = rpc.read();
|
|
105 logger.info(Arrays.asList(result.returnValues).toString());
|
|
106 String status = (String)result.returnValues[0];
|
|
107 if( status.equals("ok") ) {
|
|
108 break;
|
|
109 } else if( status.equals("missing") ) {
|
|
110 String fileName = (String)result.returnValues[1];
|
|
111 LogFile log = logMap.get(fileName);
|
|
112 long len = log.end() - 8;
|
|
113 InputStream in = log.input();
|
|
114 call = new RpcCall(in,len,"add",name,logInfo,fileName);
|
|
115 } else if( status.equals("incomplete") ) {
|
|
116 String fileName = (String)result.returnValues[1];
|
|
117 long logEnd = (Long)result.returnValues[2];
|
|
118 LogFile log = logMap.get(fileName);
|
|
119 long len = log.end() - logEnd;
|
|
120 InputStream in = log.input();
|
|
121 in.skip(logEnd-8);
|
|
122 call = new RpcCall(in,len,"append",name,logInfo,fileName);
|
|
123 } else
|
|
124 throw new RuntimeException("status "+status);
|
|
125 }
|
|
126 } catch(RpcException e) {
|
|
127 logger.warn("",e);
|
|
128 }
|
|
129 rpc.close();
|
|
130 }
|
1500
|
131 clearDir();
|
|
132 }
|
|
133
|
|
134 private void clearDir() throws IOException {
|
|
135 for( File f : dir.listFiles() ) {
|
|
136 IoUtils.delete(f);
|
|
137 }
|
1499
|
138 }
|
|
139
|
|
140 static RpcClient rpcClient(String backupDomain) throws IOException {
|
|
141 Socket socket;
|
|
142 if( BackupServer.cipherSuites == null ) {
|
|
143 socket = new Socket(backupDomain,BackupServer.port);
|
|
144 } else {
|
|
145 socket = SSLSocketFactory.getDefault().createSocket(backupDomain,BackupServer.port);
|
|
146 ((SSLSocket)socket).setEnabledCipherSuites(BackupServer.cipherSuites);
|
|
147 }
|
|
148 return new RpcClient(socket);
|
|
149 }
|
1488
|
150 }
|