view src/goodjava/lucene/backup/BackupIndexWriter.java @ 1804:b4328322d2be

minor
author Franklin Schmidt <fschmidt@gmail.com>
date Tue, 07 May 2024 22:08:37 -0600
parents 1578324d2aac
children
line wrap: on
line source

package goodjava.lucene.backup;

import java.io.File;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.BufferedOutputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.Socket;
import java.net.ConnectException;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.util.HashMap;
import java.util.Arrays;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ExecutionException;
import org.apache.lucene.search.SortField;
import goodjava.io.IoUtils;
import goodjava.rpc.RpcClient;
import goodjava.rpc.RpcCall;
import goodjava.rpc.RpcResult;
import goodjava.rpc.RpcException;
import goodjava.rpc.RpcError;
import goodjava.lucene.api.LuceneIndexWriter;
import goodjava.lucene.logging.LoggingIndexWriter;
import goodjava.lucene.logging.LogFile;
import goodjava.logging.Logger;
import goodjava.logging.LoggerFactory;


public final class BackupIndexWriter extends LoggingIndexWriter {
	private static final Logger logger = LoggerFactory.getLogger(BackupIndexWriter.class);
	public static String[] backupDomains;
	private final String domain;
	private final String name;
	private final File dir;
	private boolean isSyncPending = false;
	private final ExecutorService exec = Executors.newSingleThreadExecutor();

	public BackupIndexWriter(LuceneIndexWriter indexWriter,File logDir,long logTime,String domain,String name)
		throws IOException
	{
		super(indexWriter,logDir,logTime);
		if( backupDomains == null )
			throw new RuntimeException("must set backupDomains");
		this.domain = domain;
		this.name = name;
		File f = new File(System.getProperty("java.io.tmpdir"));
		dir = new File(f,"goodjava.lucene/"+domain+"~"+name);
		IoUtils.mkdirs(dir);
	}

	@Override public synchronized void close() throws IOException {
		super.close();
		exec.shutdown();
	}

	@Override public synchronized void commit() throws IOException {
		super.commit();
		//sync();
		if( !isSyncPending ) {
			exec.execute(syncR);
			isSyncPending = true;
		}
	}

	@Override protected boolean doCheck(SortField sortField) throws IOException {
		boolean ok = super.doCheck(sortField);
		if( ok )
			runSyncWithChecksum();
		return ok;
	}

	public void runSync() {
		execR(syncR);
	}

	public void runSyncWithChecksum() {
		execR(syncWithChecksum);
	}

	private void execR(Runnable r) {
		try {
			exec.submit(r).get();
		} catch(InterruptedException e) {
			throw new RuntimeException(e);
		} catch(ExecutionException e) {
			Throwable cause = e.getCause();
			if( cause instanceof RuntimeException )
				throw (RuntimeException)cause;
			throw new RuntimeException(e);
		}
	}

	private final Runnable syncR = new Runnable() {
		public void run() {
			try {
				sync(false);
			} catch(ConnectException e) {
				logger.error("sync failed: "+e.getMessage());
			} catch(IOException e) {
				throw new RuntimeException(e);
			} catch(RpcError e) {
				logger.error("",e);
				throw e;
			}
		}
	};

	private final Runnable syncWithChecksum = new Runnable() {
		public void run() {
			try {
				sync(true);
			} catch(ConnectException e) {
				logger.error("syncWithChecksum failed: "+e.getMessage());
			} catch(IOException e) {
				throw new RuntimeException(e);
			}
		}
	};

	private void sync(boolean withChecksum) throws IOException {
		List<LogFile> logs = new ArrayList<LogFile>();
		synchronized(this) {
			isSyncPending = false;
			clearDir();
			for( LogFile log : this.logs ) {
				File f = new File(dir,log.file.getName());
				IoUtils.link(log.file,f);
				logs.add( new LogFile(f) );
			}
		}
		List logInfo = new ArrayList();
		Map<String,LogFile> logMap = new HashMap<String,LogFile>();
		for( LogFile log : logs ) {
			Map fileInfo = new HashMap();
			fileInfo.put("name",log.file.getName());
			fileInfo.put("end",log.end());
			if( withChecksum )
				fileInfo.put("checksum",log.checksum());
			logInfo.add(fileInfo);
			logMap.put(log.file.getName(),log);
		}
		for( String backupDomain : backupDomains ) {
			RpcClient rpc = BackupServer.rpcClient(backupDomain);
			try {
				RpcCall call = new RpcCall("login",domain,name);
				rpc.write(call);
				rpc.read();
				call = new RpcCall("check",logInfo);
				while(true) {
					rpc.write(call);
					RpcResult result = rpc.read();
					//logger.info(Arrays.asList(result.returnValues).toString());
					String status = (String)result.returnValues[0];
					if( status.equals("ok") ) {
						break;
					} else if( status.equals("missing") || status.equals("bad_checksum") ) {
						String fileName = (String)result.returnValues[1];
						if( status.equals("bad_checksum") )
							logger.error("bad_checksum "+fileName);
						LogFile log = logMap.get(fileName);
						long len = log.end() - 8;
						InputStream in = log.input();
						call = new RpcCall(in,len,"add",logInfo,fileName);
						logger.info("add "+fileName);
					} else if( status.equals("incomplete") ) {
						String fileName = (String)result.returnValues[1];
						long logEnd = (Long)result.returnValues[2];
						LogFile log = logMap.get(fileName);
						long len = log.end() - logEnd;
						InputStream in = log.input();
						in.skip(logEnd-8);
						call = new RpcCall(in,len,"append",logInfo,fileName);
						// logger.info("append "+fileName);
					} else
						throw new RuntimeException("status "+status);
				}
			} catch(RpcException e) {
				logger.error("",e);
			}
			rpc.close();
		}
		clearDir();
	}

	private void clearDir() throws IOException {
		for( File f : dir.listFiles() ) {
			IoUtils.delete(f);
		}
	}

	public static BackupIndexWriter newWithRestore(LuceneIndexWriter indexWriter,File logDir,long logTime,String domain,String name)
		throws IOException
	{
		if( !logDir.exists() ) {
			RpcClient rpc = BackupServer.rpcClient(backupDomains[0]);
			try {
				RpcCall call;
				RpcResult result;
				call = new RpcCall("exists",domain,name);
				rpc.write(call);
				result = rpc.read();
				boolean exists = (Boolean)result.returnValues[0];
				if( exists ) {
					logger.error("restoring "+logDir+" from backup");
					File zip = File.createTempFile("luan_",".zip");
					IoUtils.delete(zip);
					call = new RpcCall("login",domain,name);
					rpc.write(call);
					rpc.read();
					call = new RpcCall("zip");
					rpc.write(call);
					result = rpc.read();
					OutputStream out = new BufferedOutputStream(new FileOutputStream(zip));
					IoUtils.copyAll(result.in,out);
					out.close();
					IoUtils.mkdirs(logDir);
					String cmd = "unzip " + zip;
					Process proc = Runtime.getRuntime().exec(cmd,null,logDir);
					IoUtils.waitFor(proc);
					IoUtils.delete(zip);
				}
			} catch(RpcException e) {
				throw new RuntimeException(e);
			}
			rpc.close();
		}
		return new BackupIndexWriter(indexWriter,logDir,logTime,domain,name);
	}

}