0
|
1 package nabble.model;
|
|
2
|
|
3 import java.sql.Connection;
|
|
4 import java.sql.Statement;
|
|
5 import java.sql.PreparedStatement;
|
|
6 import java.sql.ResultSet;
|
|
7 import java.sql.SQLException;
|
|
8 import java.util.List;
|
|
9 import java.util.ArrayList;
|
|
10 import java.util.NoSuchElementException;
|
|
11 import java.util.concurrent.atomic.AtomicInteger;
|
|
12 import org.slf4j.Logger;
|
|
13 import org.slf4j.LoggerFactory;
|
|
14
|
|
15
|
|
16 final class CursorNodeIterator extends NodeIterator<NodeImpl> {
|
|
17 private static final Logger logger = LoggerFactory.getLogger(CursorNodeIterator.class);
|
|
18
|
|
19 private static final int fetchSize = Init.get("fetchSize",100);
|
|
20
|
|
21 private final SiteKey siteKey;
|
|
22 private final Connection con;
|
|
23 private final PreparedStatement stmt;
|
|
24 private boolean isClosed = false;
|
|
25 private ResultSet rs = null;
|
|
26 private int offset = 0;
|
|
27 private boolean hasCheckedNext = false;
|
|
28 private NodeImpl next = null;
|
|
29 private final boolean oldAutoCommit;
|
|
30 private static final AtomicInteger counter = new AtomicInteger();
|
|
31 private final String cursor = "crs" + counter.incrementAndGet();
|
|
32 // private final Exception initException = new Exception("init");
|
|
33
|
|
34 CursorNodeIterator(SiteKey siteKey,String sql,DbParamSetter paramSetter) {
|
|
35 try {
|
|
36 this.siteKey = siteKey;
|
|
37 this.con = siteKey.getDb().getConnection();
|
|
38 PreparedStatement pstmt = con.prepareStatement(
|
|
39 "declare " + cursor + " cursor with hold for " +sql
|
|
40 );
|
|
41 Connection pgCon = pstmt.getConnection();
|
|
42 oldAutoCommit = pgCon.getAutoCommit();
|
|
43 if( oldAutoCommit )
|
|
44 pgCon.setAutoCommit(false);
|
|
45 paramSetter.setParams(pstmt);
|
|
46 pstmt.execute();
|
|
47 pstmt.close();
|
|
48 this.stmt = con.prepareStatement(
|
|
49 "fetch " + fetchSize + " " + cursor
|
|
50 );
|
|
51 } catch(SQLException e) {
|
|
52 close();
|
|
53 throw new RuntimeException(sql,e);
|
|
54 } catch(RuntimeException e) {
|
|
55 close();
|
|
56 throw e;
|
|
57 }
|
|
58 }
|
|
59
|
|
60 private void openRs()
|
|
61 throws SQLException
|
|
62 {
|
|
63 rs = stmt.executeQuery();
|
|
64 }
|
|
65
|
|
66 private void closeRs()
|
|
67 throws SQLException
|
|
68 {
|
|
69 rs.close();
|
|
70 rs = null;
|
|
71 }
|
|
72
|
|
73 @Override public boolean hasNext() {
|
|
74 if( isClosed )
|
|
75 return false;
|
|
76 if( !hasCheckedNext ) {
|
|
77 hasCheckedNext = true;
|
|
78 try {
|
|
79 if( rs == null )
|
|
80 openRs();
|
|
81 if( rs.next() ) {
|
|
82 next = NodeImpl.getNode(siteKey,rs);
|
|
83 if( ++offset % fetchSize == 0 )
|
|
84 closeRs();
|
|
85 } else {
|
|
86 next = null;
|
|
87 close();
|
|
88 }
|
|
89 } catch(SQLException e) {
|
|
90 throw new RuntimeException(e);
|
|
91 }
|
|
92 }
|
|
93 return next != null;
|
|
94 }
|
|
95
|
|
96 @Override public NodeImpl next() {
|
|
97 if( !hasNext() )
|
|
98 throw new NoSuchElementException();
|
|
99 hasCheckedNext = false;
|
|
100 return next;
|
|
101 }
|
|
102
|
|
103 @Override public void skip(int n) {
|
|
104 try {
|
|
105 if( n==0 )
|
|
106 return;
|
|
107 if( hasCheckedNext ) {
|
|
108 hasCheckedNext = false;
|
|
109 if( --n == 0 )
|
|
110 return;
|
|
111 }
|
|
112 if( rs != null ) {
|
|
113 int left = fetchSize - (offset % fetchSize);
|
|
114 if( n < left ) {
|
|
115 rs.relative(n);
|
|
116 return;
|
|
117 }
|
|
118 closeRs();
|
|
119 offset += left;
|
|
120 n -= left;
|
|
121 }
|
|
122 if( n >= fetchSize ) {
|
|
123 int fwd = n / fetchSize * fetchSize;
|
|
124 n -= fwd;
|
|
125 offset += fwd;
|
|
126 Statement stmt = con.createStatement();
|
|
127 stmt.execute(
|
|
128 "move " + fwd + " " + cursor
|
|
129 );
|
|
130 stmt.close();
|
|
131 }
|
|
132 if( n == 0 )
|
|
133 return;
|
|
134 openRs();
|
|
135 rs.relative(n);
|
|
136 offset += n;
|
|
137 } catch(SQLException e) {
|
|
138 throw new RuntimeException("n = "+n,e);
|
|
139 }
|
|
140 }
|
|
141
|
|
142 @Override public List<Node> get(int i,int n) {
|
|
143 try {
|
|
144 i += offset;
|
|
145 Statement stmt = con.createStatement();
|
|
146 stmt.execute(
|
|
147 "move absolute " + i + " " + cursor
|
|
148 );
|
|
149 ResultSet rs = stmt.executeQuery(
|
|
150 "fetch " + n + " " + cursor
|
|
151 );
|
|
152 List<Node> list = new ArrayList<Node>();
|
|
153 while( rs.next() ) {
|
|
154 list.add( NodeImpl.getNode(siteKey,rs) );
|
|
155 }
|
|
156 rs.close();
|
|
157 stmt.close();
|
|
158 close();
|
|
159 return list;
|
|
160 } catch(SQLException e) {
|
|
161 throw new RuntimeException(e);
|
|
162 }
|
|
163 }
|
|
164
|
|
165 @Override public List<NodeImpl> asList() {
|
|
166 try {
|
|
167 List<NodeImpl> list = new ArrayList<NodeImpl>();
|
|
168 if( rs != null ) {
|
|
169 while( rs.next() ) {
|
|
170 list.add( NodeImpl.getNode(siteKey,rs) );
|
|
171 }
|
|
172 closeRs();
|
|
173 }
|
|
174 Statement stmt = con.createStatement();
|
|
175 ResultSet rs = stmt.executeQuery(
|
|
176 "fetch forward all " + cursor
|
|
177 );
|
|
178 while( rs.next() ) {
|
|
179 list.add( NodeImpl.getNode(siteKey,rs) );
|
|
180 }
|
|
181 rs.close();
|
|
182 stmt.close();
|
|
183 close();
|
|
184 return list;
|
|
185 } catch(SQLException e) {
|
|
186 throw new RuntimeException(e);
|
|
187 }
|
|
188 }
|
|
189
|
|
190 @Override public void close() {
|
|
191 if( isClosed )
|
|
192 return;
|
|
193 isClosed = true;
|
|
194 try {
|
|
195 if( rs != null )
|
|
196 rs.close();
|
|
197 if( stmt != null ) {
|
|
198 Connection pgCon = stmt.getConnection();
|
|
199 stmt.close();
|
|
200 {
|
|
201 Statement stmt = con.createStatement();
|
|
202 stmt.execute(
|
|
203 "close " + cursor
|
|
204 );
|
|
205 stmt.close();
|
|
206 }
|
|
207 if( oldAutoCommit )
|
|
208 pgCon.setAutoCommit(true);
|
|
209 }
|
|
210 if( con != null )
|
|
211 con.close();
|
|
212 } catch(SQLException e) {
|
|
213 throw new RuntimeException(e);
|
|
214 }
|
|
215 }
|
|
216
|
|
217 @Override protected void finalize() throws Throwable {
|
|
218 if( !isClosed ) {
|
|
219 logger.error("didn't close NodeIterator"/*,initException*/);
|
|
220 close();
|
|
221 }
|
|
222 super.finalize();
|
|
223 }
|
|
224
|
|
225 }
|