Mercurial Hosting > nabble
view src/nabble/model/CursorNodeIterator.java @ 0:7ecd1a4ef557
add content
author | Franklin Schmidt <fschmidt@gmail.com> |
---|---|
date | Thu, 21 Mar 2019 19:15:52 -0600 |
parents | |
children |
line wrap: on
line source
package nabble.model; import java.sql.Connection; import java.sql.Statement; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.List; import java.util.ArrayList; import java.util.NoSuchElementException; import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; final class CursorNodeIterator extends NodeIterator<NodeImpl> { private static final Logger logger = LoggerFactory.getLogger(CursorNodeIterator.class); private static final int fetchSize = Init.get("fetchSize",100); private final SiteKey siteKey; private final Connection con; private final PreparedStatement stmt; private boolean isClosed = false; private ResultSet rs = null; private int offset = 0; private boolean hasCheckedNext = false; private NodeImpl next = null; private final boolean oldAutoCommit; private static final AtomicInteger counter = new AtomicInteger(); private final String cursor = "crs" + counter.incrementAndGet(); // private final Exception initException = new Exception("init"); CursorNodeIterator(SiteKey siteKey,String sql,DbParamSetter paramSetter) { try { this.siteKey = siteKey; this.con = siteKey.getDb().getConnection(); PreparedStatement pstmt = con.prepareStatement( "declare " + cursor + " cursor with hold for " +sql ); Connection pgCon = pstmt.getConnection(); oldAutoCommit = pgCon.getAutoCommit(); if( oldAutoCommit ) pgCon.setAutoCommit(false); paramSetter.setParams(pstmt); pstmt.execute(); pstmt.close(); this.stmt = con.prepareStatement( "fetch " + fetchSize + " " + cursor ); } catch(SQLException e) { close(); throw new RuntimeException(sql,e); } catch(RuntimeException e) { close(); throw e; } } private void openRs() throws SQLException { rs = stmt.executeQuery(); } private void closeRs() throws SQLException { rs.close(); rs = null; } @Override public boolean hasNext() { if( isClosed ) return false; if( !hasCheckedNext ) { hasCheckedNext = true; try { if( rs == null ) openRs(); if( rs.next() ) { next = NodeImpl.getNode(siteKey,rs); if( ++offset % fetchSize == 0 ) closeRs(); } else { next = null; close(); } } catch(SQLException e) { throw new RuntimeException(e); } } return next != null; } @Override public NodeImpl next() { if( !hasNext() ) throw new NoSuchElementException(); hasCheckedNext = false; return next; } @Override public void skip(int n) { try { if( n==0 ) return; if( hasCheckedNext ) { hasCheckedNext = false; if( --n == 0 ) return; } if( rs != null ) { int left = fetchSize - (offset % fetchSize); if( n < left ) { rs.relative(n); return; } closeRs(); offset += left; n -= left; } if( n >= fetchSize ) { int fwd = n / fetchSize * fetchSize; n -= fwd; offset += fwd; Statement stmt = con.createStatement(); stmt.execute( "move " + fwd + " " + cursor ); stmt.close(); } if( n == 0 ) return; openRs(); rs.relative(n); offset += n; } catch(SQLException e) { throw new RuntimeException("n = "+n,e); } } @Override public List<Node> get(int i,int n) { try { i += offset; Statement stmt = con.createStatement(); stmt.execute( "move absolute " + i + " " + cursor ); ResultSet rs = stmt.executeQuery( "fetch " + n + " " + cursor ); List<Node> list = new ArrayList<Node>(); while( rs.next() ) { list.add( NodeImpl.getNode(siteKey,rs) ); } rs.close(); stmt.close(); close(); return list; } catch(SQLException e) { throw new RuntimeException(e); } } @Override public List<NodeImpl> asList() { try { List<NodeImpl> list = new ArrayList<NodeImpl>(); if( rs != null ) { while( rs.next() ) { list.add( NodeImpl.getNode(siteKey,rs) ); } closeRs(); } Statement stmt = con.createStatement(); ResultSet rs = stmt.executeQuery( "fetch forward all " + cursor ); while( rs.next() ) { list.add( NodeImpl.getNode(siteKey,rs) ); } rs.close(); stmt.close(); close(); return list; } catch(SQLException e) { throw new RuntimeException(e); } } @Override public void close() { if( isClosed ) return; isClosed = true; try { if( rs != null ) rs.close(); if( stmt != null ) { Connection pgCon = stmt.getConnection(); stmt.close(); { Statement stmt = con.createStatement(); stmt.execute( "close " + cursor ); stmt.close(); } if( oldAutoCommit ) pgCon.setAutoCommit(true); } if( con != null ) con.close(); } catch(SQLException e) { throw new RuntimeException(e); } } @Override protected void finalize() throws Throwable { if( !isClosed ) { logger.error("didn't close NodeIterator"/*,initException*/); close(); } super.finalize(); } }