Mercurial Hosting > luan
diff src/goodjava/lucene/backup/BackupIndexWriter.java @ 1499:22e15cf73040
lucene.backup
author | Franklin Schmidt <fschmidt@gmail.com> |
---|---|
date | Sat, 09 May 2020 23:14:13 -0600 |
parents | af55cfad6e12 |
children | f01abd6d5858 |
line wrap: on
line diff
--- a/src/goodjava/lucene/backup/BackupIndexWriter.java Fri May 08 18:07:14 2020 -0600 +++ b/src/goodjava/lucene/backup/BackupIndexWriter.java Sat May 09 23:14:13 2020 -0600 @@ -1,21 +1,39 @@ package goodjava.lucene.backup; import java.io.File; +import java.io.InputStream; import java.io.IOException; +import java.net.Socket; import java.util.List; import java.util.ArrayList; +import java.util.Map; +import java.util.HashMap; +import java.util.Arrays; +import javax.net.ssl.SSLSocketFactory; +import javax.net.ssl.SSLSocket; import goodjava.io.IoUtils; +import goodjava.rpc.RpcClient; +import goodjava.rpc.RpcCall; +import goodjava.rpc.RpcResult; +import goodjava.rpc.RpcException; import goodjava.lucene.api.LuceneIndexWriter; +import goodjava.logging.Logger; +import goodjava.logging.LoggerFactory; import goodjava.lucene.logging.LoggingIndexWriter; import goodjava.lucene.logging.LogFile; public class BackupIndexWriter extends LoggingIndexWriter { + private static final Logger logger = LoggerFactory.getLogger(BackupIndexWriter.class); + public static String[] backupDomains; private final String name; private final File dir; + private boolean isSyncPending = false; public BackupIndexWriter(LuceneIndexWriter indexWriter,File logDir,String name) throws IOException { super(indexWriter,logDir); + if( backupDomains == null ) + throw new RuntimeException("must set backupDomains"); this.name = name; File f = new File(System.getProperty("java.io.tmpdir")); dir = new File(f,"goodjava.lucene/"+name); @@ -24,19 +42,92 @@ public synchronized void commit() throws IOException { super.commit(); - sync(); - } - - private void sync() throws IOException { - for( File f : dir.listFiles() ) { - IoUtils.delete(f); - } - List<LogFile> logs = new ArrayList<LogFile>(); - for( LogFile log : this.logs ) { - File f = new File(dir,log.file.getName()); - IoUtils.link(log.file,f); - logs.add( new LogFile(f) ); + //sync(); + if( !isSyncPending ) { + new Thread(sync).start(); + isSyncPending = true; } } + public void runSync() { + sync.run(); + } + + private final Runnable sync = new Runnable() { + public synchronized void run() { + try { + sync(); + } catch(IOException e) { + throw new RuntimeException(e); + } + } + }; + + private void sync() throws IOException { + List<LogFile> logs = new ArrayList<LogFile>(); + synchronized(this) { + isSyncPending = false; + for( File f : dir.listFiles() ) { + IoUtils.delete(f); + } + for( LogFile log : this.logs ) { + File f = new File(dir,log.file.getName()); + IoUtils.link(log.file,f); + logs.add( new LogFile(f) ); + } + } + List logInfo = new ArrayList(); + Map<String,LogFile> logMap = new HashMap<String,LogFile>(); + for( LogFile log : logs ) { + Map fileInfo = new HashMap(); + fileInfo.put("name",log.file.getName()); + fileInfo.put("end",log.end()); + logInfo.add(fileInfo); + logMap.put(log.file.getName(),log); + } + for( String backupDomain : backupDomains ) { + RpcClient rpc = rpcClient(backupDomain); + RpcCall call = new RpcCall("check",name,logInfo); + try { + while(true) { + rpc.write(call); + RpcResult result = rpc.read(); + logger.info(Arrays.asList(result.returnValues).toString()); + String status = (String)result.returnValues[0]; + if( status.equals("ok") ) { + break; + } else if( status.equals("missing") ) { + String fileName = (String)result.returnValues[1]; + LogFile log = logMap.get(fileName); + long len = log.end() - 8; + InputStream in = log.input(); + call = new RpcCall(in,len,"add",name,logInfo,fileName); + } else if( status.equals("incomplete") ) { + String fileName = (String)result.returnValues[1]; + long logEnd = (Long)result.returnValues[2]; + LogFile log = logMap.get(fileName); + long len = log.end() - logEnd; + InputStream in = log.input(); + in.skip(logEnd-8); + call = new RpcCall(in,len,"append",name,logInfo,fileName); + } else + throw new RuntimeException("status "+status); + } + } catch(RpcException e) { + logger.warn("",e); + } + rpc.close(); + } + } + + static RpcClient rpcClient(String backupDomain) throws IOException { + Socket socket; + if( BackupServer.cipherSuites == null ) { + socket = new Socket(backupDomain,BackupServer.port); + } else { + socket = SSLSocketFactory.getDefault().createSocket(backupDomain,BackupServer.port); + ((SSLSocket)socket).setEnabledCipherSuites(BackupServer.cipherSuites); + } + return new RpcClient(socket); + } }