changeset 1499:22e15cf73040

lucene.backup
author Franklin Schmidt <fschmidt@gmail.com>
date Sat, 09 May 2020 23:14:13 -0600
parents 1b809d2fdf03
children f01abd6d5858
files src/goodjava/io/IoUtils.java src/goodjava/lucene/backup/Backup.java src/goodjava/lucene/backup/BackupIndexWriter.java src/goodjava/lucene/backup/BackupServer.java src/goodjava/lucene/logging/LoggingIndexWriter.java src/goodjava/rpc/RpcCon.java src/luan/modules/IoLuan.java
diffstat 7 files changed, 310 insertions(+), 20 deletions(-) [+]
line wrap: on
line diff
--- a/src/goodjava/io/IoUtils.java	Fri May 08 18:07:14 2020 -0600
+++ b/src/goodjava/io/IoUtils.java	Sat May 09 23:14:13 2020 -0600
@@ -47,6 +47,7 @@
 		while( (n=in.read(a)) != -1 ) {
 			out.write(a,0,n);
 		}
+		in.close();
 	}
 
 }
\ No newline at end of file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/goodjava/lucene/backup/Backup.java	Sat May 09 23:14:13 2020 -0600
@@ -0,0 +1,103 @@
+package goodjava.lucene.backup;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.Arrays;
+import goodjava.io.IoUtils;
+import goodjava.rpc.RpcServer;
+import goodjava.rpc.RpcCall;
+import goodjava.rpc.RpcResult;
+import goodjava.logging.Logger;
+import goodjava.logging.LoggerFactory;
+import goodjava.lucene.logging.LogFile;
+import goodjava.lucene.logging.LoggingIndexWriter;
+import goodjava.lucene.logging.LogOutputStream;
+
+
+class Backup {
+	private static final Logger logger = LoggerFactory.getLogger(Backup.class);
+
+	private final File dir;
+
+	Backup(File dir) {
+		this.dir = dir;
+	}
+
+	synchronized void handle(RpcServer rpc,RpcCall call) {
+		try {
+			handle2(rpc,call);
+		} catch(IOException e) {
+			throw new RuntimeException(e);
+		}
+	}
+
+	void handle2(RpcServer rpc,RpcCall call) throws IOException {
+		dir.mkdirs();
+		//logger.info(call.cmd+" "+Arrays.asList(call.args));
+		String fileName = null;
+		if( call.cmd.equals("check") ) {
+			// nothing
+		} else if( call.cmd.equals("add") || call.cmd.equals("append")  ) {
+			fileName = (String)call.args[2];
+			File f = new File(dir,fileName);
+			LogFile log = new LogFile(f);
+			LogOutputStream out = log.output();
+			IoUtils.copyAll(call.in,out);
+			out.commit();
+			out.close();
+			logger.info(call.cmd+" "+fileName+" "+call.lenIn);
+		} else
+			throw new RuntimeException("cmd "+call.cmd);
+		List logInfo = (List)call.args[1];
+		logger.info("check "+logInfo);
+		RpcResult result = new RpcResult("ok");
+		for( Object obj : logInfo ) {
+			Map fileInfo = (Map)obj;
+			String name = (String)fileInfo.get("name");
+			File f = new File(dir,name);
+			if( !f.exists() ) {
+				if( name.equals(fileName) )  logger.error("missing");
+				result = new RpcResult("missing",name);
+				break;
+			}
+			long end = (Long)fileInfo.get("end");
+			LogFile log = new LogFile(f);
+			long logEnd = log.end();
+			if( logEnd > end ) {
+				logger.error("logEnd > end - shouldn't happen, file="+name+" logEnd="+logEnd+" end="+end);
+				result = new RpcResult("missing",name);
+				break;
+			}
+			if( logEnd < end ) {
+				if( name.equals(fileName) )  logger.error("incomplete");
+				result = new RpcResult("incomplete",name,logEnd);
+				break;
+			}
+		}
+		if( call.cmd.equals("add") ) {
+			boolean complete = true;
+			List<LogFile> logs = new ArrayList<LogFile>();
+			for( Object obj : logInfo ) {
+				Map fileInfo = (Map)obj;
+				String name = (String)fileInfo.get("name");
+				File f = new File(dir,name);
+				if( !f.exists() ) {
+					complete = false;
+					break;
+				}
+				logs.add( new LogFile(f) );
+			}
+			if( complete ) {
+				File index = new File(dir,"index");
+				LoggingIndexWriter.writeIndex(logs,index);
+				logger.info("write index");
+			}
+		}
+		rpc.write(result);
+	}
+
+
+}
--- a/src/goodjava/lucene/backup/BackupIndexWriter.java	Fri May 08 18:07:14 2020 -0600
+++ b/src/goodjava/lucene/backup/BackupIndexWriter.java	Sat May 09 23:14:13 2020 -0600
@@ -1,21 +1,39 @@
 package goodjava.lucene.backup;
 
 import java.io.File;
+import java.io.InputStream;
 import java.io.IOException;
+import java.net.Socket;
 import java.util.List;
 import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Arrays;
+import javax.net.ssl.SSLSocketFactory;
+import javax.net.ssl.SSLSocket;
 import goodjava.io.IoUtils;
+import goodjava.rpc.RpcClient;
+import goodjava.rpc.RpcCall;
+import goodjava.rpc.RpcResult;
+import goodjava.rpc.RpcException;
 import goodjava.lucene.api.LuceneIndexWriter;
+import goodjava.logging.Logger;
+import goodjava.logging.LoggerFactory;
 import goodjava.lucene.logging.LoggingIndexWriter;
 import goodjava.lucene.logging.LogFile;
 
 
 public class BackupIndexWriter extends LoggingIndexWriter {
+	private static final Logger logger = LoggerFactory.getLogger(BackupIndexWriter.class);
+	public static String[] backupDomains;
 	private final String name;
 	private final File dir;
+	private boolean isSyncPending = false;
 
 	public BackupIndexWriter(LuceneIndexWriter indexWriter,File logDir,String name) throws IOException {
 		super(indexWriter,logDir);
+		if( backupDomains == null )
+			throw new RuntimeException("must set backupDomains");
 		this.name = name;
 		File f = new File(System.getProperty("java.io.tmpdir"));
 		dir = new File(f,"goodjava.lucene/"+name);
@@ -24,19 +42,92 @@
 
 	public synchronized void commit() throws IOException {
 		super.commit();
-		sync();
-	}
-
-	private void sync() throws IOException {
-		for( File f : dir.listFiles() ) {
-			IoUtils.delete(f);
-		}
-		List<LogFile> logs = new ArrayList<LogFile>();
-		for( LogFile log : this.logs ) {
-			File f = new File(dir,log.file.getName());
-			IoUtils.link(log.file,f);
-			logs.add( new LogFile(f) );
+		//sync();
+		if( !isSyncPending ) {
+			new Thread(sync).start();
+			isSyncPending = true;
 		}
 	}
 
+	public void runSync() {
+		sync.run();
+	}
+
+	private final Runnable sync = new Runnable() {
+		public synchronized void run() {
+			try {
+				sync();
+			} catch(IOException e) {
+				throw new RuntimeException(e);
+			}
+		}
+	};
+
+	private void sync() throws IOException {
+		List<LogFile> logs = new ArrayList<LogFile>();
+		synchronized(this) {
+			isSyncPending = false;
+			for( File f : dir.listFiles() ) {
+				IoUtils.delete(f);
+			}
+			for( LogFile log : this.logs ) {
+				File f = new File(dir,log.file.getName());
+				IoUtils.link(log.file,f);
+				logs.add( new LogFile(f) );
+			}
+		}
+		List logInfo = new ArrayList();
+		Map<String,LogFile> logMap = new HashMap<String,LogFile>();
+		for( LogFile log : logs ) {
+			Map fileInfo = new HashMap();
+			fileInfo.put("name",log.file.getName());
+			fileInfo.put("end",log.end());
+			logInfo.add(fileInfo);
+			logMap.put(log.file.getName(),log);
+		}
+		for( String backupDomain : backupDomains ) {
+			RpcClient rpc = rpcClient(backupDomain);
+			RpcCall call = new RpcCall("check",name,logInfo);
+			try {
+				while(true) {
+					rpc.write(call);
+					RpcResult result = rpc.read();
+					logger.info(Arrays.asList(result.returnValues).toString());
+					String status = (String)result.returnValues[0];
+					if( status.equals("ok") ) {
+						break;
+					} else if( status.equals("missing") ) {
+						String fileName = (String)result.returnValues[1];
+						LogFile log = logMap.get(fileName);
+						long len = log.end() - 8;
+						InputStream in = log.input();
+						call = new RpcCall(in,len,"add",name,logInfo,fileName);
+					} else if( status.equals("incomplete") ) {
+						String fileName = (String)result.returnValues[1];
+						long logEnd = (Long)result.returnValues[2];
+						LogFile log = logMap.get(fileName);
+						long len = log.end() - logEnd;
+						InputStream in = log.input();
+						in.skip(logEnd-8);
+						call = new RpcCall(in,len,"append",name,logInfo,fileName);
+					} else
+						throw new RuntimeException("status "+status);
+				}
+			} catch(RpcException e) {
+				logger.warn("",e);
+			}
+			rpc.close();
+		}
+	}
+
+	static RpcClient rpcClient(String backupDomain) throws IOException {
+		Socket socket;
+		if( BackupServer.cipherSuites == null ) {
+			socket = new Socket(backupDomain,BackupServer.port);
+		} else {
+			socket = SSLSocketFactory.getDefault().createSocket(backupDomain,BackupServer.port);
+			((SSLSocket)socket).setEnabledCipherSuites(BackupServer.cipherSuites);
+		}
+		return new RpcClient(socket);
+	}
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/goodjava/lucene/backup/BackupServer.java	Sat May 09 23:14:13 2020 -0600
@@ -0,0 +1,91 @@
+package goodjava.lucene.backup;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import java.net.Socket;
+import java.net.ServerSocket;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.Executors;
+import javax.net.ssl.SSLServerSocketFactory;
+import javax.net.ssl.SSLServerSocket;
+import goodjava.util.SoftCacheMap;
+import goodjava.rpc.RpcServer;
+import goodjava.rpc.RpcCall;
+import goodjava.logging.Logger;
+import goodjava.logging.LoggerFactory;
+
+
+public class BackupServer {
+	private static final Logger logger = LoggerFactory.getLogger(BackupServer.class);
+
+	public static int port = 9101;
+	public static String[] cipherSuites = new String[] {
+		"TLS_DH_anon_WITH_AES_128_GCM_SHA256",
+		"TLS_DH_anon_WITH_AES_128_CBC_SHA256",
+		"TLS_ECDH_anon_WITH_AES_128_CBC_SHA",
+		"TLS_DH_anon_WITH_AES_128_CBC_SHA",
+		"TLS_ECDH_anon_WITH_3DES_EDE_CBC_SHA",
+		"SSL_DH_anon_WITH_3DES_EDE_CBC_SHA",
+		"TLS_ECDH_anon_WITH_RC4_128_SHA",
+		"SSL_DH_anon_WITH_RC4_128_MD5",
+		"SSL_DH_anon_WITH_DES_CBC_SHA",
+		"SSL_DH_anon_EXPORT_WITH_DES40_CBC_SHA",
+		"SSL_DH_anon_EXPORT_WITH_RC4_40_MD5",
+	};
+	static {
+		cipherSuites = null;  // for now, until I figure out disgusting java security
+	}
+
+	private final File backupDir;
+	private static final ThreadPoolExecutor threadPool = (ThreadPoolExecutor)Executors.newCachedThreadPool();
+	private static final Map<String,Backup> backups = new SoftCacheMap<String,Backup>();
+
+	public BackupServer(File backupDir) {
+		this.backupDir = backupDir;
+		backupDir.mkdirs();
+	}
+
+	public synchronized void start() throws IOException {
+		final ServerSocket ss;
+		if( cipherSuites == null ) {
+			ss = new ServerSocket(port);
+		} else {
+			ss = SSLServerSocketFactory.getDefault().createServerSocket(port);
+			((SSLServerSocket)ss).setEnabledCipherSuites(cipherSuites);
+		}
+		threadPool.execute(new Runnable(){public void run() {
+			try {
+				while(!threadPool.isShutdown()) {
+					final Socket socket = ss.accept();
+					threadPool.execute(new Runnable(){public void run() {
+						handle(socket);
+					}});
+				}
+			} catch(IOException e) {
+				logger.error("",e);
+			}
+		}});
+		logger.info("started server on port "+port);
+	}
+
+	private void handle(Socket socket) {
+		RpcServer rpc = new RpcServer(socket);
+		while( !rpc.isClosed() ) {
+			RpcCall call = rpc.read();
+			if( call == null )
+				break;
+			String name = (String)call.args[0];
+			Backup backup;
+			synchronized(backups) {
+				backup = backups.get(name);
+				if( backup == null ) {
+					backup = new Backup(new File(backupDir,name));
+					backups.put(name,backup);
+				}
+			}
+			backup.handle(rpc,call);
+		}
+	}
+
+}
--- a/src/goodjava/lucene/logging/LoggingIndexWriter.java	Fri May 08 18:07:14 2020 -0600
+++ b/src/goodjava/lucene/logging/LoggingIndexWriter.java	Sat May 09 23:14:13 2020 -0600
@@ -134,12 +134,16 @@
 	}
 
 	private void deleteUnusedFiles() throws IOException {
+		deleteUnusedFiles(logs,index);
+	}
+
+	private static void deleteUnusedFiles(List<LogFile> logs,File index) throws IOException {
 		Set<String> used = new HashSet<String>();
 		used.add( index.getName() );
 		for( LogFile lf : logs ) {
 			used.add( lf.file.getName() );
 		}
-		for( File f : logDir.listFiles() ) {
+		for( File f : index.getParentFile().listFiles() ) {
 			if( !used.contains(f.getName()) ) {
 				IoUtils.deleteRecursively(f);
 			}
@@ -147,6 +151,10 @@
 	}
 
 	private void writeIndex() throws IOException {
+		writeIndex(logs,index);
+	}
+
+	public static void writeIndex(List<LogFile> logs,File index) throws IOException {
 		ByteArrayOutputStream baos = new ByteArrayOutputStream();
 		DataOutputStream dos = new DataOutputStream(baos);
 		dos.writeInt(version);
@@ -159,8 +167,8 @@
 		RandomAccessFile raf = new RandomAccessFile( index, "rwd" );
 		raf.write( baos.toByteArray() );
 		raf.close();
-		deleteUnusedFiles();
-		logger.info("writeIndex "+logs.toString());
+		deleteUnusedFiles(logs,index);
+		//logger.info("writeIndex "+logs.toString());
 	}
 
 	private void mergeLogs() throws IOException {
--- a/src/goodjava/rpc/RpcCon.java	Fri May 08 18:07:14 2020 -0600
+++ b/src/goodjava/rpc/RpcCon.java	Sat May 09 23:14:13 2020 -0600
@@ -60,10 +60,9 @@
 			if( in != null ) {
 				CountingInputStream countIn = new CountingInputStream(in);
 				IoUtils.copyAll(countIn,out);
-				countIn.close();
 				if( countIn.count() != lenIn ) {
 					close();
-					throw new RpcError("InputStream wrong length "+countIn.count()+" when should be "+lenIn);
+					throw new RpcError("InputStream wrong length "+countIn.count()+" when should be "+lenIn+" - list = "+list);
 				}
 			}
 			out.flush();
--- a/src/luan/modules/IoLuan.java	Fri May 08 18:07:14 2020 -0600
+++ b/src/luan/modules/IoLuan.java	Sat May 09 23:14:13 2020 -0600
@@ -171,14 +171,12 @@
 		public String read_text() throws IOException, LuanException {
 			Reader in = reader();
 			String s = Utils.readAll(in);
-			in.close();
 			return s;
 		}
 
 		public byte[] read_binary() throws IOException, LuanException {
 			InputStream in = inputStream();
 			byte[] a = Utils.readAll(in);
-			in.close();
 			return a;
 		}
 
@@ -285,7 +283,6 @@
 					OutputStream out = outputStream();
 					IoUtils.copyAll(in,out);
 					out.close();
-					in.close();
 					return;
 				}
 			}