view src/goodjava/lucene/backup/BackupIndexWriter.java @ 1502:8a7b6b32c691

minor threads
author Franklin Schmidt <fschmidt@gmail.com>
date Sun, 10 May 2020 22:48:15 -0600
parents e66e3d50b289
children f443542d8650
line wrap: on
line source

package goodjava.lucene.backup;

import java.io.File;
import java.io.InputStream;
import java.io.IOException;
import java.net.Socket;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.util.HashMap;
import java.util.Arrays;
import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.SSLSocket;
import goodjava.io.IoUtils;
import goodjava.rpc.RpcClient;
import goodjava.rpc.RpcCall;
import goodjava.rpc.RpcResult;
import goodjava.rpc.RpcException;
import goodjava.lucene.api.LuceneIndexWriter;
import goodjava.logging.Logger;
import goodjava.logging.LoggerFactory;
import goodjava.lucene.logging.LoggingIndexWriter;
import goodjava.lucene.logging.LogFile;


public class BackupIndexWriter extends LoggingIndexWriter {
	private static final Logger logger = LoggerFactory.getLogger(BackupIndexWriter.class);
	public static String[] backupDomains;
	private final String name;
	private final File dir;
	private boolean isSyncPending = false;

	public BackupIndexWriter(LuceneIndexWriter indexWriter,File logDir,String name) throws IOException {
		super(indexWriter,logDir);
		if( backupDomains == null )
			throw new RuntimeException("must set backupDomains");
		this.name = name;
		File f = new File(System.getProperty("java.io.tmpdir"));
		dir = new File(f,"goodjava.lucene/"+name);
		IoUtils.mkdirs(dir);
	}

	public synchronized void commit() throws IOException {
		super.commit();
		//sync();
		if( !isSyncPending ) {
			threadPool.execute(sync);
			isSyncPending = true;
		}
	}

	public void runSync() {
		sync.run();
	}

	private final Runnable sync = new Runnable() {
		public synchronized void run() {
			try {
				sync();
			} catch(IOException e) {
				throw new RuntimeException(e);
			}
		}
	};

	private void sync() throws IOException {
		List<LogFile> logs = new ArrayList<LogFile>();
		synchronized(this) {
			isSyncPending = false;
			clearDir();
			for( LogFile log : this.logs ) {
				File f = new File(dir,log.file.getName());
				IoUtils.link(log.file,f);
				logs.add( new LogFile(f) );
			}
		}
		List logInfo = new ArrayList();
		Map<String,LogFile> logMap = new HashMap<String,LogFile>();
		for( LogFile log : logs ) {
			Map fileInfo = new HashMap();
			fileInfo.put("name",log.file.getName());
			fileInfo.put("end",log.end());
			logInfo.add(fileInfo);
			logMap.put(log.file.getName(),log);
		}
		for( String backupDomain : backupDomains ) {
			RpcClient rpc = rpcClient(backupDomain);
			RpcCall call = new RpcCall("check",name,logInfo);
			try {
				while(true) {
					rpc.write(call);
					RpcResult result = rpc.read();
					logger.info(Arrays.asList(result.returnValues).toString());
					String status = (String)result.returnValues[0];
					if( status.equals("ok") ) {
						break;
					} else if( status.equals("missing") ) {
						String fileName = (String)result.returnValues[1];
						LogFile log = logMap.get(fileName);
						long len = log.end() - 8;
						InputStream in = log.input();
						call = new RpcCall(in,len,"add",name,logInfo,fileName);
					} else if( status.equals("incomplete") ) {
						String fileName = (String)result.returnValues[1];
						long logEnd = (Long)result.returnValues[2];
						LogFile log = logMap.get(fileName);
						long len = log.end() - logEnd;
						InputStream in = log.input();
						in.skip(logEnd-8);
						call = new RpcCall(in,len,"append",name,logInfo,fileName);
					} else
						throw new RuntimeException("status "+status);
				}
			} catch(RpcException e) {
				logger.warn("",e);
			}
			rpc.close();
		}
		clearDir();
	}

	private void clearDir() throws IOException {
		for( File f : dir.listFiles() ) {
			IoUtils.delete(f);
		}
	}

	static RpcClient rpcClient(String backupDomain) throws IOException {
		Socket socket;
		if( BackupServer.cipherSuites == null ) {
			socket = new Socket(backupDomain,BackupServer.port);
		} else {
			socket = SSLSocketFactory.getDefault().createSocket(backupDomain,BackupServer.port);
			((SSLSocket)socket).setEnabledCipherSuites(BackupServer.cipherSuites);
		}
		return new RpcClient(socket);
	}
}