diff src/nabble/model/CursorNodeIterator.java @ 0:7ecd1a4ef557

add content
author Franklin Schmidt <fschmidt@gmail.com>
date Thu, 21 Mar 2019 19:15:52 -0600
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/nabble/model/CursorNodeIterator.java	Thu Mar 21 19:15:52 2019 -0600
@@ -0,0 +1,225 @@
+package nabble.model;
+
+import java.sql.Connection;
+import java.sql.Statement;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+final class CursorNodeIterator extends NodeIterator<NodeImpl> {
+	private static final Logger logger = LoggerFactory.getLogger(CursorNodeIterator.class);
+
+	private static final int fetchSize = Init.get("fetchSize",100);
+
+	private final SiteKey siteKey;
+	private final Connection con;
+	private final PreparedStatement stmt;
+	private boolean isClosed = false;
+	private ResultSet rs = null;
+	private int offset = 0;
+	private boolean hasCheckedNext = false;
+	private NodeImpl next = null;
+	private final boolean oldAutoCommit;
+	private static final AtomicInteger counter = new AtomicInteger();
+	private final String cursor = "crs" + counter.incrementAndGet();
+//	private final Exception initException = new Exception("init");
+
+	CursorNodeIterator(SiteKey siteKey,String sql,DbParamSetter paramSetter) {
+		try {
+			this.siteKey = siteKey;
+			this.con = siteKey.getDb().getConnection();
+			PreparedStatement pstmt = con.prepareStatement(
+				"declare " + cursor + " cursor with hold for " +sql
+			);
+			Connection pgCon = pstmt.getConnection();
+			oldAutoCommit = pgCon.getAutoCommit();
+			if( oldAutoCommit )
+				pgCon.setAutoCommit(false);
+			paramSetter.setParams(pstmt);
+			pstmt.execute();
+			pstmt.close();
+			this.stmt = con.prepareStatement(
+				"fetch " + fetchSize + " " + cursor
+			);
+		} catch(SQLException e) {
+			close();
+			throw new RuntimeException(sql,e);
+		} catch(RuntimeException e) {
+			close();
+			throw e;
+		}
+	}
+
+	private void openRs()
+		throws SQLException
+	{
+		rs = stmt.executeQuery();
+	}
+
+	private void closeRs()
+		throws SQLException
+	{
+		rs.close();
+		rs = null;
+	}
+
+	@Override public boolean hasNext() {
+		if( isClosed )
+			return false;
+		if( !hasCheckedNext ) {
+			hasCheckedNext = true;
+			try {
+				if( rs == null )
+					openRs();
+				if( rs.next() ) {
+					next = NodeImpl.getNode(siteKey,rs);
+					if( ++offset % fetchSize == 0 )
+						closeRs();
+				} else {
+					next = null;
+					close();
+				}
+			} catch(SQLException e) {
+				throw new RuntimeException(e);
+			}
+		}
+		return next != null;
+	}
+
+	@Override public NodeImpl next() {
+		if( !hasNext() )
+			throw new NoSuchElementException();
+		hasCheckedNext = false;
+		return next;
+	}
+
+	@Override public void skip(int n) {
+		try {
+			if( n==0 )
+				return;
+			if( hasCheckedNext ) {
+				hasCheckedNext = false;
+				if( --n == 0 )
+					return;
+			}
+			if( rs != null ) {
+				int left = fetchSize - (offset % fetchSize);
+				if( n < left ) {
+					rs.relative(n);
+					return;
+				}
+				closeRs();
+				offset += left;
+				n -= left;
+			}
+			if( n >= fetchSize ) {
+				int fwd = n / fetchSize * fetchSize;
+				n -= fwd;
+				offset += fwd;
+				Statement stmt = con.createStatement();
+				stmt.execute(
+					"move " + fwd + " " + cursor
+				);
+				stmt.close();
+			}
+			if( n == 0 )
+				return;
+			openRs();
+			rs.relative(n);
+			offset += n;
+		} catch(SQLException e) {
+			throw new RuntimeException("n = "+n,e);
+		}
+	}
+
+	@Override public List<Node> get(int i,int n) {
+		try {
+			i += offset;
+			Statement stmt = con.createStatement();
+			stmt.execute(
+				"move absolute " + i + " " + cursor
+			);
+			ResultSet rs = stmt.executeQuery(
+				"fetch " + n + " " + cursor
+			);
+			List<Node> list = new ArrayList<Node>();
+			while( rs.next() ) {
+				list.add( NodeImpl.getNode(siteKey,rs) );
+			}
+			rs.close();
+			stmt.close();
+			close();
+			return list;
+		} catch(SQLException e) {
+			throw new RuntimeException(e);
+		}
+	}
+
+	@Override public List<NodeImpl> asList() {
+		try {
+			List<NodeImpl> list = new ArrayList<NodeImpl>();
+			if( rs != null ) {
+				while( rs.next() ) {
+					list.add( NodeImpl.getNode(siteKey,rs) );
+				}
+				closeRs();
+			}
+			Statement stmt = con.createStatement();
+			ResultSet rs = stmt.executeQuery(
+				"fetch forward all " + cursor
+			);
+			while( rs.next() ) {
+				list.add( NodeImpl.getNode(siteKey,rs) );
+			}
+			rs.close();
+			stmt.close();
+			close();
+			return list;
+		} catch(SQLException e) {
+			throw new RuntimeException(e);
+		}
+	}
+
+	@Override public void close() {
+		if( isClosed )
+			return;
+		isClosed = true;
+		try {
+			if( rs != null )
+				rs.close();
+			if( stmt != null ) {
+				Connection pgCon = stmt.getConnection();
+				stmt.close();
+				{
+					Statement stmt = con.createStatement();
+					stmt.execute(
+						"close " + cursor
+					);
+					stmt.close();
+				}
+				if( oldAutoCommit )
+					pgCon.setAutoCommit(true);
+			}
+			if( con != null )
+				con.close();
+		} catch(SQLException e) {
+			throw new RuntimeException(e);
+		}
+	}
+
+	@Override protected void finalize() throws Throwable {
+		if( !isClosed ) {
+			logger.error("didn't close NodeIterator"/*,initException*/);
+			close();
+		}
+		super.finalize();
+	}
+
+}