Mercurial Hosting > luan
view src/goodjava/lucene/backup/BackupIndexWriter.java @ 1708:8d257d56dc42
minor
author | Franklin Schmidt <fschmidt@gmail.com> |
---|---|
date | Mon, 04 Jul 2022 14:06:59 -0600 |
parents | 973d3039c421 |
children | 1578324d2aac |
line wrap: on
line source
package goodjava.lucene.backup; import java.io.File; import java.io.InputStream; import java.io.OutputStream; import java.io.BufferedOutputStream; import java.io.FileOutputStream; import java.io.IOException; import java.net.Socket; import java.net.ConnectException; import java.util.List; import java.util.ArrayList; import java.util.Map; import java.util.HashMap; import java.util.Arrays; import java.util.concurrent.Executors; import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutionException; import org.apache.lucene.search.SortField; import goodjava.io.IoUtils; import goodjava.rpc.RpcClient; import goodjava.rpc.RpcCall; import goodjava.rpc.RpcResult; import goodjava.rpc.RpcException; import goodjava.lucene.api.LuceneIndexWriter; import goodjava.lucene.logging.LoggingIndexWriter; import goodjava.lucene.logging.LogFile; import goodjava.logging.Logger; import goodjava.logging.LoggerFactory; public final class BackupIndexWriter extends LoggingIndexWriter { private static final Logger logger = LoggerFactory.getLogger(BackupIndexWriter.class); public static String[] backupDomains; private final String domain; private final String name; private final File dir; private boolean isSyncPending = false; private final ExecutorService exec = Executors.newSingleThreadExecutor(); public BackupIndexWriter(LuceneIndexWriter indexWriter,File logDir,long logTime,String domain,String name) throws IOException { super(indexWriter,logDir,logTime); if( backupDomains == null ) throw new RuntimeException("must set backupDomains"); this.domain = domain; this.name = name; File f = new File(System.getProperty("java.io.tmpdir")); dir = new File(f,"goodjava.lucene/"+domain+"~"+name); IoUtils.mkdirs(dir); } @Override public synchronized void close() throws IOException { super.close(); exec.shutdown(); } @Override public synchronized void commit() throws IOException { super.commit(); //sync(); if( !isSyncPending ) { exec.execute(sync); isSyncPending = true; } } @Override protected boolean doCheck(SortField sortField) throws IOException { boolean ok = super.doCheck(sortField); if( ok ) runSyncWithChecksum(); return ok; } public void runSync() { exec(sync); } public void runSyncWithChecksum() { exec(syncWithChecksum); } private void exec(Runnable r) { try { exec.submit(r).get(); } catch(InterruptedException e) { throw new RuntimeException(e); } catch(ExecutionException e) { Throwable cause = e.getCause(); if( cause instanceof RuntimeException ) throw (RuntimeException)cause; throw new RuntimeException(e); } } private final Runnable sync = new Runnable() { public void run() { try { sync(false); } catch(ConnectException e) { logger.error("sync failed: "+e.getMessage()); } catch(IOException e) { throw new RuntimeException(e); } } }; private final Runnable syncWithChecksum = new Runnable() { public void run() { try { sync(true); } catch(ConnectException e) { logger.error("syncWithChecksum failed: "+e.getMessage()); } catch(IOException e) { throw new RuntimeException(e); } } }; private void sync(boolean withChecksum) throws IOException { List<LogFile> logs = new ArrayList<LogFile>(); synchronized(this) { isSyncPending = false; clearDir(); for( LogFile log : this.logs ) { File f = new File(dir,log.file.getName()); IoUtils.link(log.file,f); logs.add( new LogFile(f) ); } } List logInfo = new ArrayList(); Map<String,LogFile> logMap = new HashMap<String,LogFile>(); for( LogFile log : logs ) { Map fileInfo = new HashMap(); fileInfo.put("name",log.file.getName()); fileInfo.put("end",log.end()); if( withChecksum ) fileInfo.put("checksum",log.checksum()); logInfo.add(fileInfo); logMap.put(log.file.getName(),log); } for( String backupDomain : backupDomains ) { RpcClient rpc = BackupServer.rpcClient(backupDomain); try { RpcCall call = new RpcCall("login",domain,name); rpc.write(call); rpc.read(); call = new RpcCall("check",logInfo); while(true) { rpc.write(call); RpcResult result = rpc.read(); //logger.info(Arrays.asList(result.returnValues).toString()); String status = (String)result.returnValues[0]; if( status.equals("ok") ) { break; } else if( status.equals("missing") || status.equals("bad_checksum") ) { String fileName = (String)result.returnValues[1]; if( status.equals("bad_checksum") ) logger.error("bad_checksum "+fileName); LogFile log = logMap.get(fileName); long len = log.end() - 8; InputStream in = log.input(); call = new RpcCall(in,len,"add",logInfo,fileName); logger.info("add "+fileName); } else if( status.equals("incomplete") ) { String fileName = (String)result.returnValues[1]; long logEnd = (Long)result.returnValues[2]; LogFile log = logMap.get(fileName); long len = log.end() - logEnd; InputStream in = log.input(); in.skip(logEnd-8); call = new RpcCall(in,len,"append",logInfo,fileName); // logger.info("append "+fileName); } else throw new RuntimeException("status "+status); } } catch(RpcException e) { logger.error("",e); } rpc.close(); } clearDir(); } private void clearDir() throws IOException { for( File f : dir.listFiles() ) { IoUtils.delete(f); } } public static BackupIndexWriter newWithRestore(LuceneIndexWriter indexWriter,File logDir,long logTime,String domain,String name) throws IOException { if( !logDir.exists() ) { RpcClient rpc = BackupServer.rpcClient(backupDomains[0]); try { RpcCall call; RpcResult result; call = new RpcCall("exists",domain,name); rpc.write(call); result = rpc.read(); boolean exists = (Boolean)result.returnValues[0]; if( exists ) { logger.error("restoring "+logDir+" from backup"); File zip = File.createTempFile("luan_",".zip"); IoUtils.delete(zip); call = new RpcCall("login",domain,name); rpc.write(call); rpc.read(); call = new RpcCall("zip"); rpc.write(call); result = rpc.read(); OutputStream out = new BufferedOutputStream(new FileOutputStream(zip)); IoUtils.copyAll(result.in,out); out.close(); IoUtils.mkdirs(logDir); String cmd = "unzip " + zip; Process proc = Runtime.getRuntime().exec(cmd,null,logDir); IoUtils.waitFor(proc); IoUtils.delete(zip); } } catch(RpcException e) { throw new RuntimeException(e); } rpc.close(); } return new BackupIndexWriter(indexWriter,logDir,logTime,domain,name); } }