Mercurial Hosting > luan
comparison src/goodjava/lucene/backup/BackupIndexWriter.java @ 1499:22e15cf73040
lucene.backup
| author | Franklin Schmidt <fschmidt@gmail.com> |
|---|---|
| date | Sat, 09 May 2020 23:14:13 -0600 |
| parents | af55cfad6e12 |
| children | f01abd6d5858 |
comparison
equal
deleted
inserted
replaced
| 1498:1b809d2fdf03 | 1499:22e15cf73040 |
|---|---|
| 1 package goodjava.lucene.backup; | 1 package goodjava.lucene.backup; |
| 2 | 2 |
| 3 import java.io.File; | 3 import java.io.File; |
| 4 import java.io.InputStream; | |
| 4 import java.io.IOException; | 5 import java.io.IOException; |
| 6 import java.net.Socket; | |
| 5 import java.util.List; | 7 import java.util.List; |
| 6 import java.util.ArrayList; | 8 import java.util.ArrayList; |
| 9 import java.util.Map; | |
| 10 import java.util.HashMap; | |
| 11 import java.util.Arrays; | |
| 12 import javax.net.ssl.SSLSocketFactory; | |
| 13 import javax.net.ssl.SSLSocket; | |
| 7 import goodjava.io.IoUtils; | 14 import goodjava.io.IoUtils; |
| 15 import goodjava.rpc.RpcClient; | |
| 16 import goodjava.rpc.RpcCall; | |
| 17 import goodjava.rpc.RpcResult; | |
| 18 import goodjava.rpc.RpcException; | |
| 8 import goodjava.lucene.api.LuceneIndexWriter; | 19 import goodjava.lucene.api.LuceneIndexWriter; |
| 20 import goodjava.logging.Logger; | |
| 21 import goodjava.logging.LoggerFactory; | |
| 9 import goodjava.lucene.logging.LoggingIndexWriter; | 22 import goodjava.lucene.logging.LoggingIndexWriter; |
| 10 import goodjava.lucene.logging.LogFile; | 23 import goodjava.lucene.logging.LogFile; |
| 11 | 24 |
| 12 | 25 |
| 13 public class BackupIndexWriter extends LoggingIndexWriter { | 26 public class BackupIndexWriter extends LoggingIndexWriter { |
| 27 private static final Logger logger = LoggerFactory.getLogger(BackupIndexWriter.class); | |
| 28 public static String[] backupDomains; | |
| 14 private final String name; | 29 private final String name; |
| 15 private final File dir; | 30 private final File dir; |
| 31 private boolean isSyncPending = false; | |
| 16 | 32 |
| 17 public BackupIndexWriter(LuceneIndexWriter indexWriter,File logDir,String name) throws IOException { | 33 public BackupIndexWriter(LuceneIndexWriter indexWriter,File logDir,String name) throws IOException { |
| 18 super(indexWriter,logDir); | 34 super(indexWriter,logDir); |
| 35 if( backupDomains == null ) | |
| 36 throw new RuntimeException("must set backupDomains"); | |
| 19 this.name = name; | 37 this.name = name; |
| 20 File f = new File(System.getProperty("java.io.tmpdir")); | 38 File f = new File(System.getProperty("java.io.tmpdir")); |
| 21 dir = new File(f,"goodjava.lucene/"+name); | 39 dir = new File(f,"goodjava.lucene/"+name); |
| 22 dir.mkdirs(); | 40 dir.mkdirs(); |
| 23 } | 41 } |
| 24 | 42 |
| 25 public synchronized void commit() throws IOException { | 43 public synchronized void commit() throws IOException { |
| 26 super.commit(); | 44 super.commit(); |
| 27 sync(); | 45 //sync(); |
| 28 } | 46 if( !isSyncPending ) { |
| 29 | 47 new Thread(sync).start(); |
| 30 private void sync() throws IOException { | 48 isSyncPending = true; |
| 31 for( File f : dir.listFiles() ) { | |
| 32 IoUtils.delete(f); | |
| 33 } | |
| 34 List<LogFile> logs = new ArrayList<LogFile>(); | |
| 35 for( LogFile log : this.logs ) { | |
| 36 File f = new File(dir,log.file.getName()); | |
| 37 IoUtils.link(log.file,f); | |
| 38 logs.add( new LogFile(f) ); | |
| 39 } | 49 } |
| 40 } | 50 } |
| 41 | 51 |
| 52 public void runSync() { | |
| 53 sync.run(); | |
| 54 } | |
| 55 | |
| 56 private final Runnable sync = new Runnable() { | |
| 57 public synchronized void run() { | |
| 58 try { | |
| 59 sync(); | |
| 60 } catch(IOException e) { | |
| 61 throw new RuntimeException(e); | |
| 62 } | |
| 63 } | |
| 64 }; | |
| 65 | |
| 66 private void sync() throws IOException { | |
| 67 List<LogFile> logs = new ArrayList<LogFile>(); | |
| 68 synchronized(this) { | |
| 69 isSyncPending = false; | |
| 70 for( File f : dir.listFiles() ) { | |
| 71 IoUtils.delete(f); | |
| 72 } | |
| 73 for( LogFile log : this.logs ) { | |
| 74 File f = new File(dir,log.file.getName()); | |
| 75 IoUtils.link(log.file,f); | |
| 76 logs.add( new LogFile(f) ); | |
| 77 } | |
| 78 } | |
| 79 List logInfo = new ArrayList(); | |
| 80 Map<String,LogFile> logMap = new HashMap<String,LogFile>(); | |
| 81 for( LogFile log : logs ) { | |
| 82 Map fileInfo = new HashMap(); | |
| 83 fileInfo.put("name",log.file.getName()); | |
| 84 fileInfo.put("end",log.end()); | |
| 85 logInfo.add(fileInfo); | |
| 86 logMap.put(log.file.getName(),log); | |
| 87 } | |
| 88 for( String backupDomain : backupDomains ) { | |
| 89 RpcClient rpc = rpcClient(backupDomain); | |
| 90 RpcCall call = new RpcCall("check",name,logInfo); | |
| 91 try { | |
| 92 while(true) { | |
| 93 rpc.write(call); | |
| 94 RpcResult result = rpc.read(); | |
| 95 logger.info(Arrays.asList(result.returnValues).toString()); | |
| 96 String status = (String)result.returnValues[0]; | |
| 97 if( status.equals("ok") ) { | |
| 98 break; | |
| 99 } else if( status.equals("missing") ) { | |
| 100 String fileName = (String)result.returnValues[1]; | |
| 101 LogFile log = logMap.get(fileName); | |
| 102 long len = log.end() - 8; | |
| 103 InputStream in = log.input(); | |
| 104 call = new RpcCall(in,len,"add",name,logInfo,fileName); | |
| 105 } else if( status.equals("incomplete") ) { | |
| 106 String fileName = (String)result.returnValues[1]; | |
| 107 long logEnd = (Long)result.returnValues[2]; | |
| 108 LogFile log = logMap.get(fileName); | |
| 109 long len = log.end() - logEnd; | |
| 110 InputStream in = log.input(); | |
| 111 in.skip(logEnd-8); | |
| 112 call = new RpcCall(in,len,"append",name,logInfo,fileName); | |
| 113 } else | |
| 114 throw new RuntimeException("status "+status); | |
| 115 } | |
| 116 } catch(RpcException e) { | |
| 117 logger.warn("",e); | |
| 118 } | |
| 119 rpc.close(); | |
| 120 } | |
| 121 } | |
| 122 | |
| 123 static RpcClient rpcClient(String backupDomain) throws IOException { | |
| 124 Socket socket; | |
| 125 if( BackupServer.cipherSuites == null ) { | |
| 126 socket = new Socket(backupDomain,BackupServer.port); | |
| 127 } else { | |
| 128 socket = SSLSocketFactory.getDefault().createSocket(backupDomain,BackupServer.port); | |
| 129 ((SSLSocket)socket).setEnabledCipherSuites(BackupServer.cipherSuites); | |
| 130 } | |
| 131 return new RpcClient(socket); | |
| 132 } | |
| 42 } | 133 } |
