Mercurial Hosting > luan
changeset 1548:736ec76bbf42
lucene log work
author | Franklin Schmidt <fschmidt@gmail.com> |
---|---|
date | Sun, 27 Sep 2020 22:07:18 -0600 |
parents | f24a9ba7551e |
children | 41c32da4cbd1 |
files | src/goodjava/io/IoUtils.java src/goodjava/lucene/api/GoodIndexWriter.java src/goodjava/lucene/api/GoodWriter.java src/goodjava/lucene/backup/Backup.java src/goodjava/lucene/backup/BackupIndexWriter.java src/goodjava/lucene/logging/FilterGoodWriter.java src/goodjava/lucene/logging/LoggingIndexWriter.java src/luan/modules/lucene/Lucene.luan src/luan/modules/lucene/LuceneIndex.java |
diffstat | 9 files changed, 169 insertions(+), 82 deletions(-) [+] |
line wrap: on
line diff
--- a/src/goodjava/io/IoUtils.java Thu Sep 24 15:33:56 2020 -0600 +++ b/src/goodjava/io/IoUtils.java Sun Sep 27 22:07:18 2020 -0600 @@ -9,6 +9,7 @@ import java.io.StringWriter; import java.io.IOException; import java.nio.file.Files; +import java.nio.file.attribute.FileTime; import java.security.Security; import javax.net.ssl.SSLSocketFactory; import javax.net.ssl.SSLServerSocketFactory; @@ -54,6 +55,10 @@ Files.createSymbolicLink( link.toPath(), existing.toPath() ); } + public static long getCreationTime(File f) throws IOException { + return ((FileTime)Files.getAttribute(f.toPath(),"creationTime")).toMillis(); + } + public static void copyAll(InputStream in,OutputStream out) throws IOException {
--- a/src/goodjava/lucene/api/GoodIndexWriter.java Thu Sep 24 15:33:56 2020 -0600 +++ b/src/goodjava/lucene/api/GoodIndexWriter.java Sun Sep 27 22:07:18 2020 -0600 @@ -1,20 +1,13 @@ package goodjava.lucene.api; import java.io.IOException; -import java.util.Map; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.search.Query; -public interface GoodIndexWriter { +public interface GoodIndexWriter extends GoodWriter { public void close() throws IOException; - public void commit() throws IOException; public void rollback() throws IOException; - public void deleteAll() throws IOException; - public void deleteDocuments(Query query) throws IOException; - public void addDocument(Map<String,Object> storedFields) throws IOException; - public void updateDocument(String keyFieldName,Map<String,Object> storedFields) throws IOException; public void reindexDocuments(String keyFieldName,Query query) throws IOException; - public void tag(String tag) throws IOException; public IndexWriter getLuceneIndexWriter(); }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/goodjava/lucene/api/GoodWriter.java Sun Sep 27 22:07:18 2020 -0600 @@ -0,0 +1,17 @@ +package goodjava.lucene.api; + +import java.io.IOException; +import java.util.Map; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.search.Query; + + +public interface GoodWriter { + public IndexReader openReader() throws IOException; + public void commit() throws IOException; + public void deleteAll() throws IOException; + public void deleteDocuments(Query query) throws IOException; + public void addDocument(Map<String,Object> storedFields) throws IOException; + public void updateDocument(String keyFieldName,Map<String,Object> storedFields) throws IOException; + public void tag(String tag) throws IOException; +}
--- a/src/goodjava/lucene/backup/Backup.java Thu Sep 24 15:33:56 2020 -0600 +++ b/src/goodjava/lucene/backup/Backup.java Sun Sep 27 22:07:18 2020 -0600 @@ -105,16 +105,16 @@ } if( call.cmd.equals("add") ) { boolean complete = true; - List<LogFile> logs = new ArrayList<LogFile>(); - for( Object obj : logInfo ) { - Map fileInfo = (Map)obj; + final LogFile[] logs = new LogFile[logInfo.size()]; + for( int i=0; i<logs.length; i++ ) { + Map fileInfo = (Map)logInfo.get(i); String name = (String)fileInfo.get("name"); File f = new File(dir,name); if( !f.exists() ) { complete = false; break; } - logs.add( new LogFile(f) ); + logs[i] = new LogFile(f); } if( complete ) { LoggingIndexWriter.writeIndex(logs,index);
--- a/src/goodjava/lucene/backup/BackupIndexWriter.java Thu Sep 24 15:33:56 2020 -0600 +++ b/src/goodjava/lucene/backup/BackupIndexWriter.java Sun Sep 27 22:07:18 2020 -0600 @@ -33,8 +33,10 @@ private boolean isSyncPending = false; private final ExecutorService exec = Executors.newSingleThreadExecutor(); - public BackupIndexWriter(LuceneIndexWriter indexWriter,File logDir,String name,String password) throws IOException { - super(indexWriter,logDir); + public BackupIndexWriter(LuceneIndexWriter indexWriter,File logDir,long logTime,String name,String password) + throws IOException + { + super(indexWriter,logDir,logTime); if( backupDomains == null ) throw new RuntimeException("must set backupDomains"); this.name = name;
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/goodjava/lucene/logging/FilterGoodWriter.java Sun Sep 27 22:07:18 2020 -0600 @@ -0,0 +1,54 @@ +package goodjava.lucene.logging; + +import java.io.IOException; +import java.util.Map; +import org.apache.lucene.search.Query; +import org.apache.lucene.index.IndexReader; +import goodjava.lucene.api.GoodWriter; + + +public class FilterGoodWriter implements GoodWriter { + protected final GoodWriter writer; + protected boolean isActive = true; + + protected FilterGoodWriter(GoodWriter writer) { + this.writer = writer; + } + + public IndexReader openReader() throws IOException { + return writer.openReader(); + } + + public void commit() throws IOException { + if( !isActive ) + return; + writer.commit(); + } + + public void deleteAll() throws IOException { + if( !isActive ) + return; + writer.deleteAll(); + } + + public void deleteDocuments(Query query) throws IOException { + if( !isActive ) + return; + writer.deleteDocuments(query); + } + + public void addDocument(Map<String,Object> storedFields) throws IOException { + if( !isActive ) + return; + writer.addDocument(storedFields); + } + + public void updateDocument(String keyFieldName,Map<String,Object> storedFields) throws IOException { + if( !isActive ) + return; + writer.updateDocument(keyFieldName,storedFields); + } + + public void tag(String tag) throws IOException {} + +}
--- a/src/goodjava/lucene/logging/LoggingIndexWriter.java Thu Sep 24 15:33:56 2020 -0600 +++ b/src/goodjava/lucene/logging/LoggingIndexWriter.java Sun Sep 27 22:07:18 2020 -0600 @@ -10,8 +10,6 @@ import java.util.Map; import java.util.Set; import java.util.HashSet; -import java.util.List; -import java.util.ArrayList; import java.util.Random; import java.util.concurrent.TimeUnit; import org.apache.lucene.document.Document; @@ -29,6 +27,7 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.store.FSDirectory; import goodjava.io.IoUtils; +import goodjava.lucene.api.GoodWriter; import goodjava.lucene.api.GoodIndexWriter; import goodjava.lucene.api.LuceneIndexWriter; import goodjava.lucene.api.GoodCollector; @@ -39,7 +38,7 @@ public class LoggingIndexWriter implements GoodIndexWriter { private static final Logger logger = LoggerFactory.getLogger(LoggingIndexWriter.class); - private static final int version = 1; + private static final int version = 2; private static final int OP_DELETE_ALL = 1; private static final int OP_DELETE_DOCUMENTS = 2; private static final int OP_ADD_DOCUMENT = 3; @@ -50,14 +49,18 @@ public final LuceneIndexWriter indexWriter; public boolean wasCreated; private final File logDir; - protected final List<LogFile> logs = new ArrayList<LogFile>(); + private final long logTime; + protected final LogFile[] logs = new LogFile[3]; private LogOutputStream log; private final File index; private final SemaphoreLock mergeLock = new SemaphoreLock(); - public LoggingIndexWriter(LuceneIndexWriter indexWriter,File logDir) throws IOException { + public LoggingIndexWriter(LuceneIndexWriter indexWriter,File logDir,long logTime) + throws IOException + { this.indexWriter = indexWriter; this.logDir = logDir; + this.logTime = logTime; IoUtils.mkdirs(logDir); if( !logDir.isDirectory() ) throw new RuntimeException(); @@ -66,10 +69,9 @@ DataInputStream dis = new DataInputStream(new FileInputStream(index)); try { if( dis.readInt() == version ) { - final int n = dis.readInt(); - for( int i=0; i<n; i++ ) { + for( int i=0; i<logs.length; i++ ) { File file = new File( logDir, dis.readUTF() ); - logs.add( new LogFile(file) ); + logs[i] = new LogFile(file); } deleteUnusedFiles(); setLog(); @@ -84,6 +86,10 @@ wasCreated = true; } + public IndexReader openReader() throws IOException { + return indexWriter.openReader(); + } + public IndexWriter getLuceneIndexWriter() { return indexWriter.getLuceneIndexWriter(); } @@ -91,7 +97,7 @@ private void setLog() throws IOException { if( log != null ) log.close(); - log = logs.get(logs.size()-1).output(); + log = logs[2].output(); } /* public synchronized boolean isMerging() { @@ -118,18 +124,29 @@ private void newLogs2() throws IOException { logger.info("building new logs"); - logs.clear(); - for( int i=0; i<2; i++ ) { - logs.add( newLogFile() ); + for( int i=0; i<logs.length; i++ ) { + logs[i] = newLogFile(); } - logLucene( System.currentTimeMillis(), logs.get(0), indexWriter ); + LogOutputStream log = logs[0].output(); + logLucene( System.currentTimeMillis(), log, indexWriter ); + log.close(); writeIndex(); setLog(); logger.info("done building new logs"); } - private static void logLucene(long time,LogFile logLucene,LuceneIndexWriter indexWriter) throws IOException { - LogOutputStream log = logLucene.output(); + public synchronized void logLucene() + throws IOException + { + //log.rollback(); ? + logLucene( System.currentTimeMillis(), log, indexWriter ); + } + + private static void logLucene(long time,LogOutputStream log,LuceneIndexWriter indexWriter) + throws IOException + { + log.writeLong(time); + log.writeByte(OP_DELETE_ALL); IndexReader reader = indexWriter.openReader(); final IndexSearcher searcher = new IndexSearcher(reader); Query query = new MatchAllDocsQuery(); @@ -144,7 +161,6 @@ }); reader.close(); log.commit(); - log.close(); } private LogFile newLogFile() throws IOException { @@ -159,7 +175,7 @@ deleteUnusedFiles(logs,index); } - private static void deleteUnusedFiles(List<LogFile> logs,File index) throws IOException { + private static void deleteUnusedFiles(LogFile[] logs,File index) throws IOException { Set<String> used = new HashSet<String>(); used.add( index.getName() ); for( LogFile lf : logs ) { @@ -176,11 +192,12 @@ writeIndex(logs,index); } - public static void writeIndex(List<LogFile> logs,File index) throws IOException { + public static void writeIndex(LogFile[] logs,File index) throws IOException { + if( logs.length != 3 ) + throw new RuntimeException(); ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(baos); dos.writeInt(version); - dos.writeInt(logs.size()); for( LogFile lf : logs ) { String fileName = lf.file.getName(); dos.writeUTF(fileName); @@ -194,28 +211,34 @@ } private void mergeLogs() throws IOException { - //logger.info("merge"); - if( logs.size() <= 3 ) + logger.info("merge"); + if( !mergeLock.isLocked() ) { + logger.error("merge without lock"); return; - LogFile first = logs.get(0); - LogFile second = logs.get(1); + } + LogFile first = logs[0]; + LogFile second = logs[1]; long lastTime = second.file.lastModified(); File dirFile = new File(logDir,"merge"); if( dirFile.exists() ) throw new RuntimeException(); Directory dir = FSDirectory.open(dirFile); LuceneIndexWriter mergeWriter = new LuceneIndexWriter( dir, indexWriter.goodConfig ); - playLog( first.input(), mergeWriter, null ); - playLog( second.input(), mergeWriter, null ); + playLog( first.input(), mergeWriter ); + playLog( second.input(), mergeWriter ); mergeWriter.commit(); LogFile merge = newLogFile(); - logLucene( lastTime, merge, mergeWriter ); + LogOutputStream log = merge.output(); + logLucene( lastTime, log, mergeWriter ); + log.close(); mergeWriter.close(); synchronized(this) { //check(); - logs.remove(0); - logs.set(0,merge); + logs[0] = merge; + logs[1] = logs[2]; + logs[2] = newLogFile(); writeIndex(); + setLog(); //check(null); } } @@ -265,7 +288,7 @@ protected boolean doCheck(SortField sortField) throws IOException { boolean ok = true; IndexReader indexReader; - List<LogInputStream> logReaders; + LogInputStream[] logReaders; synchronized(this) { indexReader = indexWriter.openReader(); logReaders = logReaders(logs); @@ -277,7 +300,7 @@ IoUtils.deleteRecursively(dirFile); Directory dir = FSDirectory.open(dirFile); LuceneIndexWriter checkWriter = new LuceneIndexWriter( dir, indexWriter.goodConfig ); - playLogs(logReaders,checkWriter,null); + playLogs(logReaders,checkWriter); //logger.info("check lucene"); IndexReader checkReader = checkWriter.openReader(); int nCheck = checkReader.numDocs(); @@ -375,12 +398,7 @@ log.commit(); if( mergeLock.isLocked() ) return; - if( log.logFile.end() > logs.get(0).end() ) { - logs.add( newLogFile() ); - writeIndex(); - setLog(); - } - if( logs.size() > 3 ) { + if( logs[1].file.lastModified() < System.currentTimeMillis() - logTime ) { getMergeLock(); new Thread(mergeLogs).start(); // mergeLogs.run(); @@ -431,86 +449,81 @@ } // return whether stopped at tag - public synchronized boolean playLogs(String upToTag) throws IOException { - return playLogs( logReaders(logs), indexWriter, upToTag ); + public synchronized void playLogs(GoodWriter writer) throws IOException { + if( writer == null ) + writer = indexWriter; + playLogs( logReaders(logs), writer ); } - private static List<LogInputStream> logReaders(List<LogFile> logs) throws IOException { - List<LogInputStream> logReaders = new ArrayList<LogInputStream>(); - for( LogFile log : logs ) { - logReaders.add( log.input() ); + private static LogInputStream[] logReaders(LogFile[] logs) throws IOException { + LogInputStream[] logReaders = new LogInputStream[logs.length]; + for( int i=0; i<logs.length; i++ ) { + logReaders[i] = logs[i].input(); } return logReaders; } - private static boolean playLogs(List<LogInputStream> logReaders,LuceneIndexWriter indexWriter,String upToTag) + private static void playLogs(LogInputStream[] logReaders,GoodWriter indexWriter) throws IOException { if( numDocs(indexWriter) != 0 ) throw new RuntimeException ("not empty"); - boolean rtn = false; for( LogInputStream reader : logReaders ) { - if( playLog(reader,indexWriter,upToTag) ) { - rtn = true; - break; - } + playLog(reader,indexWriter); } indexWriter.commit(); - return rtn; } - private static int numDocs(LuceneIndexWriter indexWriter) throws IOException { + private static int numDocs(GoodWriter indexWriter) throws IOException { IndexReader reader = indexWriter.openReader(); int n = reader.numDocs(); reader.close(); return n; } - private static boolean playLog(LogInputStream in,LuceneIndexWriter indexWriter,String upToTag) + private static void playLog(LogInputStream in,GoodWriter indexWriter) throws IOException { - boolean rtn = false; while( in.available() > 0 ) { - if( playOp(in,indexWriter,upToTag) ) { - rtn = true; - break; - } + playOp(in,indexWriter); } in.close(); - return rtn; } - private static boolean playOp(LogInputStream in,LuceneIndexWriter indexWriter,String upToTag) throws IOException { + private static void playOp(LogInputStream in,GoodWriter indexWriter) + throws IOException + { in.readLong(); // time int op = in.readByte(); switch(op) { case OP_DELETE_ALL: indexWriter.deleteAll(); - return false; + return; case OP_DELETE_DOCUMENTS: { Query query = in.readQuery(); //System.out.println("OP_DELETE_DOCUMENTS "+query); indexWriter.deleteDocuments(query); - return false; + return; } case OP_ADD_DOCUMENT: { Map storedFields = in.readMap(); indexWriter.addDocument(storedFields); - return false; + return; } case OP_UPDATE_DOCUMENT: { String keyFieldName = in.readUTF(); Map storedFields = in.readMap(); indexWriter.updateDocument(keyFieldName,storedFields); - return false; + return; } case OP_TAG: { String tag = in.readUTF(); - return tag.equals(upToTag); + indexWriter.tag(tag); + return; } default: throw new RuntimeException("invalid op "+op);
--- a/src/luan/modules/lucene/Lucene.luan Thu Sep 24 15:33:56 2020 -0600 +++ b/src/luan/modules/lucene/Lucene.luan Sun Sep 27 22:07:18 2020 -0600 @@ -9,6 +9,7 @@ local Html = require "luan:Html.luan" local Number = require "luan:Number.luan" local integer = Number.integer or error() +local Time = require "luan:Time.luan" local Io = require "luan:Io.luan" local uri = Io.uri or error() local String = require "luan:String.luan" @@ -46,6 +47,7 @@ index_dir = get_file(index_dir) options = options or {} options.log_dir = options.log_dir and get_file(options.log_dir) + options.log_time = options.log_time or Time.period{days=30} local java_index = LuceneIndex.getLuceneIndex(index_dir,options) index.java = java_index
--- a/src/luan/modules/lucene/LuceneIndex.java Thu Sep 24 15:33:56 2020 -0600 +++ b/src/luan/modules/lucene/LuceneIndex.java Sun Sep 27 22:07:18 2020 -0600 @@ -144,6 +144,7 @@ private final PostgresBackup postgresBackup; private boolean wasCreated; private final File logDir; + private final long logTime; private LuceneIndex(Luan luan,File indexDir,LuanTable options) throws LuanException, IOException, ClassNotFoundException, SQLException @@ -156,6 +157,7 @@ LuanTable postgresSpec = Utils.removeTable(options,"postgres_spec"); LuanFunction supplementer = Utils.removeFunction(options,"supplementer"); logDir = (File)options.remove("log_dir"); + logTime = (Long)options.remove("log_time"); Utils.checkEmpty(options); mfp = defaultFieldParser==null ? new MultiFieldParser() : new MultiFieldParser(defaultFieldParser,defaultFields); @@ -191,7 +193,7 @@ boolean wasCreated = !fsDir.getDirectory().exists(); writer = new LuceneIndexWriter(fsDir,config); if( logDir != null ) - writer = new LoggingIndexWriter((LuceneIndexWriter)writer,logDir); + writer = new LoggingIndexWriter((LuceneIndexWriter)writer,logDir,logTime); reader = DirectoryReader.open(fsDir); searcher = new IndexSearcher(reader); initId(); @@ -741,11 +743,11 @@ writeLock.lock(); boolean ok = false; try { - IndexWriter iw = writer.getLuceneIndexWriter(); - iw.deleteAll(); + writer.tag("restore_from_postgres"); + writer.deleteAll(); postgresBackup.restoreLucene(this); ok = true; - iw.commit(); + writer.commit(); wrote(); ensure_open(); // refresh searcher initId(); @@ -791,7 +793,6 @@ iw.deleteAll(); loggingWriter.playLogs(null); ok = true; - iw.commit(); wrote(); ensure_open(); // refresh searcher initId();