view src/goodjava/lucene/logging/LoggingIndexWriter.java @ 1473:6c6ce14db6a8

add goodjava.io
author Franklin Schmidt <fschmidt@gmail.com>
date Fri, 17 Apr 2020 13:56:57 -0600
parents 5e3870618377
children c7b86342857f
line wrap: on
line source

package goodjava.lucene.logging;

import java.io.File;
import java.io.RandomAccessFile;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.DataInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.HashSet;
import java.util.List;
import java.util.ArrayList;
import java.util.Random;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import goodjava.io.IoUtils;
import goodjava.lucene.api.GoodIndexWriter;
import goodjava.lucene.api.LuceneIndexWriter;
import goodjava.lucene.api.GoodCollector;
import goodjava.lucene.api.LuceneUtils;
import goodjava.logging.Logger;
import goodjava.logging.LoggerFactory;


public final class LoggingIndexWriter implements GoodIndexWriter {
	private static final Logger logger = LoggerFactory.getLogger(LoggingIndexWriter.class);
	private static final int version = 1;
	private static final int OP_DELETE_ALL = 1;
	private static final int OP_DELETE_DOCUMENTS = 2;
	private static final int OP_ADD_DOCUMENT = 3;
	private static final int OP_UPDATE_DOCUMENT = 4;
	private static final Random rnd = new Random();

	public final LuceneIndexWriter indexWriter;
	private final File logDir;
	private final List<LogFile> logs = new ArrayList<LogFile>();
	private final File index;
	private boolean isMerging = false;

	public LoggingIndexWriter(LuceneIndexWriter indexWriter,File logDir) throws IOException {
		this.indexWriter = indexWriter;
		this.logDir = logDir;
		logDir.mkdirs();
		if( !logDir.isDirectory() )
			throw new RuntimeException();
		index = new File(logDir,"index");
		if( index.exists() ) {
			DataInputStream dis = new DataInputStream(new FileInputStream(index));
			try {
				if( dis.readInt() == version ) {
					final int n = dis.readInt();
					for( int i=0; i<n; i++ ) {
						File file = new File( logDir, dis.readUTF() );
						logs.add( new LogFile(file,"rwd") );
					}
					deleteUnusedFiles();
					log().gotoEnd();
					return;
				}
			} finally {
				dis.close();
			}
		}
		for( int i=0; i<2; i++ ) {
			logs.add( newLogFile() );
		}
		isMerging = true;
		new Thread(new Runnable(){public void run(){
			try {
				logLucene( System.currentTimeMillis(), logs.get(0), indexWriter );
				synchronized(LoggingIndexWriter.this) {
					writeIndex();
				}
			} catch(IOException e) {
				throw new RuntimeException(e);
			} finally {
				synchronized(LoggingIndexWriter.this) {
					isMerging = false;
				}
			}
		}}).start();
	}

	private static void logLucene(long time,LogFile log,LuceneIndexWriter indexWriter) throws IOException {
		IndexReader reader = indexWriter.openReader();
		final IndexSearcher searcher = new IndexSearcher(reader);
		Query query = new MatchAllDocsQuery();
		searcher.search( query, new GoodCollector(){
			public void collectDoc(int iDoc) throws IOException {
				Document doc = searcher.doc(iDoc);
				Map<String,Object> storedFields = LuceneUtils.toMap(doc);
				log.writeLong(time);
				log.writeByte(OP_ADD_DOCUMENT);
				log.writeMap(storedFields);
			}
		});
		reader.close();
		log.commit();
	}

	private LogFile newLogFile() throws IOException {
		File file;
		do {
			file = new File(logDir,"_"+rnd.nextInt(100)+".log");
		} while( file.exists() );
		return new LogFile(file,"rwd");
	}

	private void deleteUnusedFiles() throws IOException {
		Set<String> used = new HashSet<String>();
		used.add( index.getName() );
		for( LogFile lf : logs ) {
			used.add( lf.file.getName() );
		}
		for( File f : logDir.listFiles() ) {
			if( !used.contains(f.getName()) ) {
				deleteFile(f);
			}
		}
	}

	private static void deleteFile(File file) throws IOException {
		if( file.isDirectory() ) {
			for( File f : file.listFiles() ) {
				deleteFile(f);
			}
		}
		IoUtils.delete(file);
	}

	private void writeIndex() throws IOException {
		ByteArrayOutputStream baos = new ByteArrayOutputStream();
		DataOutputStream dos = new DataOutputStream(baos);
		dos.writeInt(version);
		dos.writeInt(logs.size());
		for( LogFile lf : logs ) {
			String fileName = lf.file.getName();
			dos.writeUTF(fileName);
		}
		dos.close();
		RandomAccessFile raf = new RandomAccessFile( index, "rwd" );
		raf.write( baos.toByteArray() );
		raf.close();
		deleteUnusedFiles();
		logger.info("writeIndex "+logs.toString());
	}

	private void mergeLogs() throws IOException {
		logger.info("merge");
		LogFile first = logs.get(0);
		LogFile second = logs.get(1);
		second.gotoEnd();
		long lastTime = second.readLong();
		File dirFile = new File(logDir,"merge");
		if( dirFile.exists() )
			throw new RuntimeException();
		Directory dir = FSDirectory.open(dirFile);
		LuceneIndexWriter mergeWriter = new LuceneIndexWriter( indexWriter.luceneVersion, dir, indexWriter.goodConfig );
		playLog(first,mergeWriter);
		playLog(second,mergeWriter);
		mergeWriter.commit();
		LogFile merge = newLogFile();
		logLucene( lastTime, merge, mergeWriter );
		mergeWriter.close();
		synchronized(this) {
			check();
			logs.remove(0);
			logs.set(0,merge);
			writeIndex();
			check();
		}
	}
	private final Runnable mergeLogs = new Runnable() { public void run() {
		try {
			mergeLogs();
/*
		} catch(IOException e) {
			throw new RuntimeException(e);
*/
		} catch(Exception e) {
			e.printStackTrace();
			System.exit(-1);
		} finally {
			synchronized(LoggingIndexWriter.this) {
				isMerging = false;
			}
		}
	} };

	private void check() throws IOException {
		File dirFile = new File(logDir,"check");
		if( dirFile.exists() )
			throw new RuntimeException();
		Directory dir = FSDirectory.open(dirFile);
		LuceneIndexWriter checkWriter = new LuceneIndexWriter( indexWriter.luceneVersion, dir, indexWriter.goodConfig );
		playLog(checkWriter);
		int nCheck = numDocs(checkWriter);
		int nOrig = numDocs(indexWriter);
		if( nCheck != nOrig ) {
			logger.error("nCheck = "+nCheck);
			logger.error("nOrig = "+nOrig);
			//new Exception().printStackTrace();
			Thread.dumpStack();
			System.out.println();
			System.out.println("indexWriter");
			dump(indexWriter);
			System.out.println("checkWriter");
			dump(checkWriter);
			System.exit(-1);
		}
		checkWriter.close();
		deleteFile(dirFile);
	}

	private LogFile log() {
		return logs.get(logs.size()-1);
	}

	public synchronized void close() throws IOException {
		indexWriter.close();
		LogFile log = log();
		log.commit();
	}

	public synchronized void commit() throws IOException {
		indexWriter.commit();
		LogFile log = log();
		log.commit();
		if( isMerging )
			return;
		if( log.length() > logs.get(0).length() ) {
			log.writeLong( System.currentTimeMillis() );
			logs.add( newLogFile() );
			writeIndex();
		}
		if( logs.size() > 3 ) {
			isMerging = true;
//			new Thread(mergeLogs).start();
			mergeLogs.run();
		}
	}

	public synchronized void rollback() throws IOException {
		indexWriter.rollback();
		LogFile log = log();
		log.gotoEnd();
	}

	public synchronized void deleteAll() throws IOException {
		indexWriter.deleteAll();
		LogFile log = log();
		writeOp(log,OP_DELETE_ALL);
	}

	public synchronized void deleteDocuments(Query query) throws IOException {
		indexWriter.deleteDocuments(query);
		LogFile log = log();
		writeOp(log,OP_DELETE_DOCUMENTS);
		log.writeQuery(query);
	}

	public synchronized void addDocument(Map<String,Object> storedFields) throws IOException {
		indexWriter.addDocument(storedFields);
		LogFile log = log();
		writeOp(log,OP_ADD_DOCUMENT);
		log.writeMap(storedFields);
	}

	public synchronized void updateDocument(String keyFieldName,Map<String,Object> storedFields) throws IOException {
		indexWriter.updateDocument(keyFieldName,storedFields);
		LogFile log = log();
		writeOp(log,OP_UPDATE_DOCUMENT);
		log.writeUTF(keyFieldName);
		log.writeMap(storedFields);
	}

	public synchronized void reindexDocuments(String keyFieldName,Query query) throws IOException {
		indexWriter.reindexDocuments(keyFieldName,query);
	}

	private void writeOp(LogFile log,int op) throws IOException {
		log.writeLong(System.currentTimeMillis());
		log.writeByte(op);
	}

	public synchronized void playLog() throws IOException {
		playLog(indexWriter);
	}

	private void playLog(LuceneIndexWriter indexWriter) throws IOException {
		if( numDocs(indexWriter) != 0 )
			throw new RuntimeException ("not empty");
		for( LogFile log : logs ) {
			playLog(log,indexWriter);
		}
		indexWriter.commit();
	}

	private static int numDocs(LuceneIndexWriter indexWriter) throws IOException {
		IndexReader reader = indexWriter.openReader();
		int n = reader.numDocs();
		reader.close();
		return n;
	}

	private static void playLog(LogFile log,LuceneIndexWriter indexWriter) throws IOException {
		log.gotoStart();
		while( log.hasMore() ) {
			playOp(log,indexWriter);
		}
	}

	private static void playOp(LogFile log,LuceneIndexWriter indexWriter) throws IOException {
		log.readLong();  // time
		int op = log.readByte();
		switch(op) {
		case OP_DELETE_ALL:
			indexWriter.deleteAll();
			return;
		case OP_DELETE_DOCUMENTS:
			indexWriter.deleteDocuments( log.readQuery() );
			return;
		case OP_ADD_DOCUMENT:
			{
				Map storedFields = log.readMap();
				indexWriter.addDocument(storedFields);
				return;
			}
		case OP_UPDATE_DOCUMENT:
			{
				String keyFieldName = log.readUTF();
				Map storedFields = log.readMap();
				indexWriter.updateDocument(keyFieldName,storedFields);
				return;
			}
		default:
			throw new RuntimeException("invalid op "+op);
		}
	}

	private static void dump(LuceneIndexWriter indexWriter) throws IOException {
		IndexReader reader = indexWriter.openReader();
		IndexSearcher searcher = new IndexSearcher(reader);
		Query query = new MatchAllDocsQuery();
		TopDocs td = searcher.search(query,100);
		System.out.println("totalHits = "+td.totalHits);
		for( int i=0; i<td.scoreDocs.length; i++ ) {
			Document doc = searcher.doc(td.scoreDocs[i].doc);
			System.out.println(LuceneUtils.toMap(doc));
		}
		System.out.println();
		reader.close();
	}
 
}