Mercurial Hosting > luan
changeset 1499:22e15cf73040
lucene.backup
author | Franklin Schmidt <fschmidt@gmail.com> |
---|---|
date | Sat, 09 May 2020 23:14:13 -0600 |
parents | 1b809d2fdf03 |
children | f01abd6d5858 |
files | src/goodjava/io/IoUtils.java src/goodjava/lucene/backup/Backup.java src/goodjava/lucene/backup/BackupIndexWriter.java src/goodjava/lucene/backup/BackupServer.java src/goodjava/lucene/logging/LoggingIndexWriter.java src/goodjava/rpc/RpcCon.java src/luan/modules/IoLuan.java |
diffstat | 7 files changed, 310 insertions(+), 20 deletions(-) [+] |
line wrap: on
line diff
--- a/src/goodjava/io/IoUtils.java Fri May 08 18:07:14 2020 -0600 +++ b/src/goodjava/io/IoUtils.java Sat May 09 23:14:13 2020 -0600 @@ -47,6 +47,7 @@ while( (n=in.read(a)) != -1 ) { out.write(a,0,n); } + in.close(); } } \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/goodjava/lucene/backup/Backup.java Sat May 09 23:14:13 2020 -0600 @@ -0,0 +1,103 @@ +package goodjava.lucene.backup; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.ArrayList; +import java.util.Map; +import java.util.Arrays; +import goodjava.io.IoUtils; +import goodjava.rpc.RpcServer; +import goodjava.rpc.RpcCall; +import goodjava.rpc.RpcResult; +import goodjava.logging.Logger; +import goodjava.logging.LoggerFactory; +import goodjava.lucene.logging.LogFile; +import goodjava.lucene.logging.LoggingIndexWriter; +import goodjava.lucene.logging.LogOutputStream; + + +class Backup { + private static final Logger logger = LoggerFactory.getLogger(Backup.class); + + private final File dir; + + Backup(File dir) { + this.dir = dir; + } + + synchronized void handle(RpcServer rpc,RpcCall call) { + try { + handle2(rpc,call); + } catch(IOException e) { + throw new RuntimeException(e); + } + } + + void handle2(RpcServer rpc,RpcCall call) throws IOException { + dir.mkdirs(); + //logger.info(call.cmd+" "+Arrays.asList(call.args)); + String fileName = null; + if( call.cmd.equals("check") ) { + // nothing + } else if( call.cmd.equals("add") || call.cmd.equals("append") ) { + fileName = (String)call.args[2]; + File f = new File(dir,fileName); + LogFile log = new LogFile(f); + LogOutputStream out = log.output(); + IoUtils.copyAll(call.in,out); + out.commit(); + out.close(); + logger.info(call.cmd+" "+fileName+" "+call.lenIn); + } else + throw new RuntimeException("cmd "+call.cmd); + List logInfo = (List)call.args[1]; + logger.info("check "+logInfo); + RpcResult result = new RpcResult("ok"); + for( Object obj : logInfo ) { + Map fileInfo = (Map)obj; + String name = (String)fileInfo.get("name"); + File f = new File(dir,name); + if( !f.exists() ) { + if( name.equals(fileName) ) logger.error("missing"); + result = new RpcResult("missing",name); + break; + } + long end = (Long)fileInfo.get("end"); + LogFile log = new LogFile(f); + long logEnd = log.end(); + if( logEnd > end ) { + logger.error("logEnd > end - shouldn't happen, file="+name+" logEnd="+logEnd+" end="+end); + result = new RpcResult("missing",name); + break; + } + if( logEnd < end ) { + if( name.equals(fileName) ) logger.error("incomplete"); + result = new RpcResult("incomplete",name,logEnd); + break; + } + } + if( call.cmd.equals("add") ) { + boolean complete = true; + List<LogFile> logs = new ArrayList<LogFile>(); + for( Object obj : logInfo ) { + Map fileInfo = (Map)obj; + String name = (String)fileInfo.get("name"); + File f = new File(dir,name); + if( !f.exists() ) { + complete = false; + break; + } + logs.add( new LogFile(f) ); + } + if( complete ) { + File index = new File(dir,"index"); + LoggingIndexWriter.writeIndex(logs,index); + logger.info("write index"); + } + } + rpc.write(result); + } + + +}
--- a/src/goodjava/lucene/backup/BackupIndexWriter.java Fri May 08 18:07:14 2020 -0600 +++ b/src/goodjava/lucene/backup/BackupIndexWriter.java Sat May 09 23:14:13 2020 -0600 @@ -1,21 +1,39 @@ package goodjava.lucene.backup; import java.io.File; +import java.io.InputStream; import java.io.IOException; +import java.net.Socket; import java.util.List; import java.util.ArrayList; +import java.util.Map; +import java.util.HashMap; +import java.util.Arrays; +import javax.net.ssl.SSLSocketFactory; +import javax.net.ssl.SSLSocket; import goodjava.io.IoUtils; +import goodjava.rpc.RpcClient; +import goodjava.rpc.RpcCall; +import goodjava.rpc.RpcResult; +import goodjava.rpc.RpcException; import goodjava.lucene.api.LuceneIndexWriter; +import goodjava.logging.Logger; +import goodjava.logging.LoggerFactory; import goodjava.lucene.logging.LoggingIndexWriter; import goodjava.lucene.logging.LogFile; public class BackupIndexWriter extends LoggingIndexWriter { + private static final Logger logger = LoggerFactory.getLogger(BackupIndexWriter.class); + public static String[] backupDomains; private final String name; private final File dir; + private boolean isSyncPending = false; public BackupIndexWriter(LuceneIndexWriter indexWriter,File logDir,String name) throws IOException { super(indexWriter,logDir); + if( backupDomains == null ) + throw new RuntimeException("must set backupDomains"); this.name = name; File f = new File(System.getProperty("java.io.tmpdir")); dir = new File(f,"goodjava.lucene/"+name); @@ -24,19 +42,92 @@ public synchronized void commit() throws IOException { super.commit(); - sync(); - } - - private void sync() throws IOException { - for( File f : dir.listFiles() ) { - IoUtils.delete(f); - } - List<LogFile> logs = new ArrayList<LogFile>(); - for( LogFile log : this.logs ) { - File f = new File(dir,log.file.getName()); - IoUtils.link(log.file,f); - logs.add( new LogFile(f) ); + //sync(); + if( !isSyncPending ) { + new Thread(sync).start(); + isSyncPending = true; } } + public void runSync() { + sync.run(); + } + + private final Runnable sync = new Runnable() { + public synchronized void run() { + try { + sync(); + } catch(IOException e) { + throw new RuntimeException(e); + } + } + }; + + private void sync() throws IOException { + List<LogFile> logs = new ArrayList<LogFile>(); + synchronized(this) { + isSyncPending = false; + for( File f : dir.listFiles() ) { + IoUtils.delete(f); + } + for( LogFile log : this.logs ) { + File f = new File(dir,log.file.getName()); + IoUtils.link(log.file,f); + logs.add( new LogFile(f) ); + } + } + List logInfo = new ArrayList(); + Map<String,LogFile> logMap = new HashMap<String,LogFile>(); + for( LogFile log : logs ) { + Map fileInfo = new HashMap(); + fileInfo.put("name",log.file.getName()); + fileInfo.put("end",log.end()); + logInfo.add(fileInfo); + logMap.put(log.file.getName(),log); + } + for( String backupDomain : backupDomains ) { + RpcClient rpc = rpcClient(backupDomain); + RpcCall call = new RpcCall("check",name,logInfo); + try { + while(true) { + rpc.write(call); + RpcResult result = rpc.read(); + logger.info(Arrays.asList(result.returnValues).toString()); + String status = (String)result.returnValues[0]; + if( status.equals("ok") ) { + break; + } else if( status.equals("missing") ) { + String fileName = (String)result.returnValues[1]; + LogFile log = logMap.get(fileName); + long len = log.end() - 8; + InputStream in = log.input(); + call = new RpcCall(in,len,"add",name,logInfo,fileName); + } else if( status.equals("incomplete") ) { + String fileName = (String)result.returnValues[1]; + long logEnd = (Long)result.returnValues[2]; + LogFile log = logMap.get(fileName); + long len = log.end() - logEnd; + InputStream in = log.input(); + in.skip(logEnd-8); + call = new RpcCall(in,len,"append",name,logInfo,fileName); + } else + throw new RuntimeException("status "+status); + } + } catch(RpcException e) { + logger.warn("",e); + } + rpc.close(); + } + } + + static RpcClient rpcClient(String backupDomain) throws IOException { + Socket socket; + if( BackupServer.cipherSuites == null ) { + socket = new Socket(backupDomain,BackupServer.port); + } else { + socket = SSLSocketFactory.getDefault().createSocket(backupDomain,BackupServer.port); + ((SSLSocket)socket).setEnabledCipherSuites(BackupServer.cipherSuites); + } + return new RpcClient(socket); + } }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/goodjava/lucene/backup/BackupServer.java Sat May 09 23:14:13 2020 -0600 @@ -0,0 +1,91 @@ +package goodjava.lucene.backup; + +import java.io.File; +import java.io.IOException; +import java.util.Map; +import java.net.Socket; +import java.net.ServerSocket; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.Executors; +import javax.net.ssl.SSLServerSocketFactory; +import javax.net.ssl.SSLServerSocket; +import goodjava.util.SoftCacheMap; +import goodjava.rpc.RpcServer; +import goodjava.rpc.RpcCall; +import goodjava.logging.Logger; +import goodjava.logging.LoggerFactory; + + +public class BackupServer { + private static final Logger logger = LoggerFactory.getLogger(BackupServer.class); + + public static int port = 9101; + public static String[] cipherSuites = new String[] { + "TLS_DH_anon_WITH_AES_128_GCM_SHA256", + "TLS_DH_anon_WITH_AES_128_CBC_SHA256", + "TLS_ECDH_anon_WITH_AES_128_CBC_SHA", + "TLS_DH_anon_WITH_AES_128_CBC_SHA", + "TLS_ECDH_anon_WITH_3DES_EDE_CBC_SHA", + "SSL_DH_anon_WITH_3DES_EDE_CBC_SHA", + "TLS_ECDH_anon_WITH_RC4_128_SHA", + "SSL_DH_anon_WITH_RC4_128_MD5", + "SSL_DH_anon_WITH_DES_CBC_SHA", + "SSL_DH_anon_EXPORT_WITH_DES40_CBC_SHA", + "SSL_DH_anon_EXPORT_WITH_RC4_40_MD5", + }; + static { + cipherSuites = null; // for now, until I figure out disgusting java security + } + + private final File backupDir; + private static final ThreadPoolExecutor threadPool = (ThreadPoolExecutor)Executors.newCachedThreadPool(); + private static final Map<String,Backup> backups = new SoftCacheMap<String,Backup>(); + + public BackupServer(File backupDir) { + this.backupDir = backupDir; + backupDir.mkdirs(); + } + + public synchronized void start() throws IOException { + final ServerSocket ss; + if( cipherSuites == null ) { + ss = new ServerSocket(port); + } else { + ss = SSLServerSocketFactory.getDefault().createServerSocket(port); + ((SSLServerSocket)ss).setEnabledCipherSuites(cipherSuites); + } + threadPool.execute(new Runnable(){public void run() { + try { + while(!threadPool.isShutdown()) { + final Socket socket = ss.accept(); + threadPool.execute(new Runnable(){public void run() { + handle(socket); + }}); + } + } catch(IOException e) { + logger.error("",e); + } + }}); + logger.info("started server on port "+port); + } + + private void handle(Socket socket) { + RpcServer rpc = new RpcServer(socket); + while( !rpc.isClosed() ) { + RpcCall call = rpc.read(); + if( call == null ) + break; + String name = (String)call.args[0]; + Backup backup; + synchronized(backups) { + backup = backups.get(name); + if( backup == null ) { + backup = new Backup(new File(backupDir,name)); + backups.put(name,backup); + } + } + backup.handle(rpc,call); + } + } + +}
--- a/src/goodjava/lucene/logging/LoggingIndexWriter.java Fri May 08 18:07:14 2020 -0600 +++ b/src/goodjava/lucene/logging/LoggingIndexWriter.java Sat May 09 23:14:13 2020 -0600 @@ -134,12 +134,16 @@ } private void deleteUnusedFiles() throws IOException { + deleteUnusedFiles(logs,index); + } + + private static void deleteUnusedFiles(List<LogFile> logs,File index) throws IOException { Set<String> used = new HashSet<String>(); used.add( index.getName() ); for( LogFile lf : logs ) { used.add( lf.file.getName() ); } - for( File f : logDir.listFiles() ) { + for( File f : index.getParentFile().listFiles() ) { if( !used.contains(f.getName()) ) { IoUtils.deleteRecursively(f); } @@ -147,6 +151,10 @@ } private void writeIndex() throws IOException { + writeIndex(logs,index); + } + + public static void writeIndex(List<LogFile> logs,File index) throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(baos); dos.writeInt(version); @@ -159,8 +167,8 @@ RandomAccessFile raf = new RandomAccessFile( index, "rwd" ); raf.write( baos.toByteArray() ); raf.close(); - deleteUnusedFiles(); - logger.info("writeIndex "+logs.toString()); + deleteUnusedFiles(logs,index); + //logger.info("writeIndex "+logs.toString()); } private void mergeLogs() throws IOException {
--- a/src/goodjava/rpc/RpcCon.java Fri May 08 18:07:14 2020 -0600 +++ b/src/goodjava/rpc/RpcCon.java Sat May 09 23:14:13 2020 -0600 @@ -60,10 +60,9 @@ if( in != null ) { CountingInputStream countIn = new CountingInputStream(in); IoUtils.copyAll(countIn,out); - countIn.close(); if( countIn.count() != lenIn ) { close(); - throw new RpcError("InputStream wrong length "+countIn.count()+" when should be "+lenIn); + throw new RpcError("InputStream wrong length "+countIn.count()+" when should be "+lenIn+" - list = "+list); } } out.flush();
--- a/src/luan/modules/IoLuan.java Fri May 08 18:07:14 2020 -0600 +++ b/src/luan/modules/IoLuan.java Sat May 09 23:14:13 2020 -0600 @@ -171,14 +171,12 @@ public String read_text() throws IOException, LuanException { Reader in = reader(); String s = Utils.readAll(in); - in.close(); return s; } public byte[] read_binary() throws IOException, LuanException { InputStream in = inputStream(); byte[] a = Utils.readAll(in); - in.close(); return a; } @@ -285,7 +283,6 @@ OutputStream out = outputStream(); IoUtils.copyAll(in,out); out.close(); - in.close(); return; } }