comparison src/goodjava/lucene/backup/BackupIndexWriter.java @ 1499:22e15cf73040

lucene.backup
author Franklin Schmidt <fschmidt@gmail.com>
date Sat, 09 May 2020 23:14:13 -0600
parents af55cfad6e12
children f01abd6d5858
comparison
equal deleted inserted replaced
1498:1b809d2fdf03 1499:22e15cf73040
1 package goodjava.lucene.backup; 1 package goodjava.lucene.backup;
2 2
3 import java.io.File; 3 import java.io.File;
4 import java.io.InputStream;
4 import java.io.IOException; 5 import java.io.IOException;
6 import java.net.Socket;
5 import java.util.List; 7 import java.util.List;
6 import java.util.ArrayList; 8 import java.util.ArrayList;
9 import java.util.Map;
10 import java.util.HashMap;
11 import java.util.Arrays;
12 import javax.net.ssl.SSLSocketFactory;
13 import javax.net.ssl.SSLSocket;
7 import goodjava.io.IoUtils; 14 import goodjava.io.IoUtils;
15 import goodjava.rpc.RpcClient;
16 import goodjava.rpc.RpcCall;
17 import goodjava.rpc.RpcResult;
18 import goodjava.rpc.RpcException;
8 import goodjava.lucene.api.LuceneIndexWriter; 19 import goodjava.lucene.api.LuceneIndexWriter;
20 import goodjava.logging.Logger;
21 import goodjava.logging.LoggerFactory;
9 import goodjava.lucene.logging.LoggingIndexWriter; 22 import goodjava.lucene.logging.LoggingIndexWriter;
10 import goodjava.lucene.logging.LogFile; 23 import goodjava.lucene.logging.LogFile;
11 24
12 25
13 public class BackupIndexWriter extends LoggingIndexWriter { 26 public class BackupIndexWriter extends LoggingIndexWriter {
27 private static final Logger logger = LoggerFactory.getLogger(BackupIndexWriter.class);
28 public static String[] backupDomains;
14 private final String name; 29 private final String name;
15 private final File dir; 30 private final File dir;
31 private boolean isSyncPending = false;
16 32
17 public BackupIndexWriter(LuceneIndexWriter indexWriter,File logDir,String name) throws IOException { 33 public BackupIndexWriter(LuceneIndexWriter indexWriter,File logDir,String name) throws IOException {
18 super(indexWriter,logDir); 34 super(indexWriter,logDir);
35 if( backupDomains == null )
36 throw new RuntimeException("must set backupDomains");
19 this.name = name; 37 this.name = name;
20 File f = new File(System.getProperty("java.io.tmpdir")); 38 File f = new File(System.getProperty("java.io.tmpdir"));
21 dir = new File(f,"goodjava.lucene/"+name); 39 dir = new File(f,"goodjava.lucene/"+name);
22 dir.mkdirs(); 40 dir.mkdirs();
23 } 41 }
24 42
25 public synchronized void commit() throws IOException { 43 public synchronized void commit() throws IOException {
26 super.commit(); 44 super.commit();
27 sync(); 45 //sync();
28 } 46 if( !isSyncPending ) {
29 47 new Thread(sync).start();
30 private void sync() throws IOException { 48 isSyncPending = true;
31 for( File f : dir.listFiles() ) {
32 IoUtils.delete(f);
33 }
34 List<LogFile> logs = new ArrayList<LogFile>();
35 for( LogFile log : this.logs ) {
36 File f = new File(dir,log.file.getName());
37 IoUtils.link(log.file,f);
38 logs.add( new LogFile(f) );
39 } 49 }
40 } 50 }
41 51
52 public void runSync() {
53 sync.run();
54 }
55
56 private final Runnable sync = new Runnable() {
57 public synchronized void run() {
58 try {
59 sync();
60 } catch(IOException e) {
61 throw new RuntimeException(e);
62 }
63 }
64 };
65
66 private void sync() throws IOException {
67 List<LogFile> logs = new ArrayList<LogFile>();
68 synchronized(this) {
69 isSyncPending = false;
70 for( File f : dir.listFiles() ) {
71 IoUtils.delete(f);
72 }
73 for( LogFile log : this.logs ) {
74 File f = new File(dir,log.file.getName());
75 IoUtils.link(log.file,f);
76 logs.add( new LogFile(f) );
77 }
78 }
79 List logInfo = new ArrayList();
80 Map<String,LogFile> logMap = new HashMap<String,LogFile>();
81 for( LogFile log : logs ) {
82 Map fileInfo = new HashMap();
83 fileInfo.put("name",log.file.getName());
84 fileInfo.put("end",log.end());
85 logInfo.add(fileInfo);
86 logMap.put(log.file.getName(),log);
87 }
88 for( String backupDomain : backupDomains ) {
89 RpcClient rpc = rpcClient(backupDomain);
90 RpcCall call = new RpcCall("check",name,logInfo);
91 try {
92 while(true) {
93 rpc.write(call);
94 RpcResult result = rpc.read();
95 logger.info(Arrays.asList(result.returnValues).toString());
96 String status = (String)result.returnValues[0];
97 if( status.equals("ok") ) {
98 break;
99 } else if( status.equals("missing") ) {
100 String fileName = (String)result.returnValues[1];
101 LogFile log = logMap.get(fileName);
102 long len = log.end() - 8;
103 InputStream in = log.input();
104 call = new RpcCall(in,len,"add",name,logInfo,fileName);
105 } else if( status.equals("incomplete") ) {
106 String fileName = (String)result.returnValues[1];
107 long logEnd = (Long)result.returnValues[2];
108 LogFile log = logMap.get(fileName);
109 long len = log.end() - logEnd;
110 InputStream in = log.input();
111 in.skip(logEnd-8);
112 call = new RpcCall(in,len,"append",name,logInfo,fileName);
113 } else
114 throw new RuntimeException("status "+status);
115 }
116 } catch(RpcException e) {
117 logger.warn("",e);
118 }
119 rpc.close();
120 }
121 }
122
123 static RpcClient rpcClient(String backupDomain) throws IOException {
124 Socket socket;
125 if( BackupServer.cipherSuites == null ) {
126 socket = new Socket(backupDomain,BackupServer.port);
127 } else {
128 socket = SSLSocketFactory.getDefault().createSocket(backupDomain,BackupServer.port);
129 ((SSLSocket)socket).setEnabledCipherSuites(BackupServer.cipherSuites);
130 }
131 return new RpcClient(socket);
132 }
42 } 133 }