view src/nabble/model/CursorNodeIterator.java @ 0:7ecd1a4ef557

add content
author Franklin Schmidt <fschmidt@gmail.com>
date Thu, 21 Mar 2019 19:15:52 -0600
parents
children
line wrap: on
line source

package nabble.model;

import java.sql.Connection;
import java.sql.Statement;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import java.util.ArrayList;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


final class CursorNodeIterator extends NodeIterator<NodeImpl> {
	private static final Logger logger = LoggerFactory.getLogger(CursorNodeIterator.class);

	private static final int fetchSize = Init.get("fetchSize",100);

	private final SiteKey siteKey;
	private final Connection con;
	private final PreparedStatement stmt;
	private boolean isClosed = false;
	private ResultSet rs = null;
	private int offset = 0;
	private boolean hasCheckedNext = false;
	private NodeImpl next = null;
	private final boolean oldAutoCommit;
	private static final AtomicInteger counter = new AtomicInteger();
	private final String cursor = "crs" + counter.incrementAndGet();
//	private final Exception initException = new Exception("init");

	CursorNodeIterator(SiteKey siteKey,String sql,DbParamSetter paramSetter) {
		try {
			this.siteKey = siteKey;
			this.con = siteKey.getDb().getConnection();
			PreparedStatement pstmt = con.prepareStatement(
				"declare " + cursor + " cursor with hold for " +sql
			);
			Connection pgCon = pstmt.getConnection();
			oldAutoCommit = pgCon.getAutoCommit();
			if( oldAutoCommit )
				pgCon.setAutoCommit(false);
			paramSetter.setParams(pstmt);
			pstmt.execute();
			pstmt.close();
			this.stmt = con.prepareStatement(
				"fetch " + fetchSize + " " + cursor
			);
		} catch(SQLException e) {
			close();
			throw new RuntimeException(sql,e);
		} catch(RuntimeException e) {
			close();
			throw e;
		}
	}

	private void openRs()
		throws SQLException
	{
		rs = stmt.executeQuery();
	}

	private void closeRs()
		throws SQLException
	{
		rs.close();
		rs = null;
	}

	@Override public boolean hasNext() {
		if( isClosed )
			return false;
		if( !hasCheckedNext ) {
			hasCheckedNext = true;
			try {
				if( rs == null )
					openRs();
				if( rs.next() ) {
					next = NodeImpl.getNode(siteKey,rs);
					if( ++offset % fetchSize == 0 )
						closeRs();
				} else {
					next = null;
					close();
				}
			} catch(SQLException e) {
				throw new RuntimeException(e);
			}
		}
		return next != null;
	}

	@Override public NodeImpl next() {
		if( !hasNext() )
			throw new NoSuchElementException();
		hasCheckedNext = false;
		return next;
	}

	@Override public void skip(int n) {
		try {
			if( n==0 )
				return;
			if( hasCheckedNext ) {
				hasCheckedNext = false;
				if( --n == 0 )
					return;
			}
			if( rs != null ) {
				int left = fetchSize - (offset % fetchSize);
				if( n < left ) {
					rs.relative(n);
					return;
				}
				closeRs();
				offset += left;
				n -= left;
			}
			if( n >= fetchSize ) {
				int fwd = n / fetchSize * fetchSize;
				n -= fwd;
				offset += fwd;
				Statement stmt = con.createStatement();
				stmt.execute(
					"move " + fwd + " " + cursor
				);
				stmt.close();
			}
			if( n == 0 )
				return;
			openRs();
			rs.relative(n);
			offset += n;
		} catch(SQLException e) {
			throw new RuntimeException("n = "+n,e);
		}
	}

	@Override public List<Node> get(int i,int n) {
		try {
			i += offset;
			Statement stmt = con.createStatement();
			stmt.execute(
				"move absolute " + i + " " + cursor
			);
			ResultSet rs = stmt.executeQuery(
				"fetch " + n + " " + cursor
			);
			List<Node> list = new ArrayList<Node>();
			while( rs.next() ) {
				list.add( NodeImpl.getNode(siteKey,rs) );
			}
			rs.close();
			stmt.close();
			close();
			return list;
		} catch(SQLException e) {
			throw new RuntimeException(e);
		}
	}

	@Override public List<NodeImpl> asList() {
		try {
			List<NodeImpl> list = new ArrayList<NodeImpl>();
			if( rs != null ) {
				while( rs.next() ) {
					list.add( NodeImpl.getNode(siteKey,rs) );
				}
				closeRs();
			}
			Statement stmt = con.createStatement();
			ResultSet rs = stmt.executeQuery(
				"fetch forward all " + cursor
			);
			while( rs.next() ) {
				list.add( NodeImpl.getNode(siteKey,rs) );
			}
			rs.close();
			stmt.close();
			close();
			return list;
		} catch(SQLException e) {
			throw new RuntimeException(e);
		}
	}

	@Override public void close() {
		if( isClosed )
			return;
		isClosed = true;
		try {
			if( rs != null )
				rs.close();
			if( stmt != null ) {
				Connection pgCon = stmt.getConnection();
				stmt.close();
				{
					Statement stmt = con.createStatement();
					stmt.execute(
						"close " + cursor
					);
					stmt.close();
				}
				if( oldAutoCommit )
					pgCon.setAutoCommit(true);
			}
			if( con != null )
				con.close();
		} catch(SQLException e) {
			throw new RuntimeException(e);
		}
	}

	@Override protected void finalize() throws Throwable {
		if( !isClosed ) {
			logger.error("didn't close NodeIterator"/*,initException*/);
			close();
		}
		super.finalize();
	}

}