Mercurial Hosting > nabble
comparison src/nabble/model/SubscriptionImpl.java @ 0:7ecd1a4ef557
add content
author | Franklin Schmidt <fschmidt@gmail.com> |
---|---|
date | Thu, 21 Mar 2019 19:15:52 -0600 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:7ecd1a4ef557 |
---|---|
1 package nabble.model; | |
2 | |
3 import fschmidt.db.DbDatabase; | |
4 import fschmidt.db.DbUtils; | |
5 import fschmidt.db.Listener; | |
6 import fschmidt.util.java.DateUtils; | |
7 import fschmidt.util.mail.MailHome; | |
8 import nabble.model.lucene.LuceneSearcher; | |
9 import nabble.naml.compiler.Template; | |
10 import nabble.naml.compiler.TemplatePrintWriter; | |
11 import nabble.naml.namespaces.BasicNamespace; | |
12 import nabble.view.web.template.NodeNamespace; | |
13 import nabble.view.web.template.NabbleNamespace; | |
14 import nabble.view.web.template.NodeList; | |
15 import nabble.view.web.template.SubscriptionNamespace; | |
16 import org.apache.lucene.search.BooleanClause; | |
17 import org.apache.lucene.search.BooleanQuery; | |
18 import org.apache.lucene.search.Query; | |
19 import org.apache.lucene.search.ScoreDoc; | |
20 import org.slf4j.Logger; | |
21 import org.slf4j.LoggerFactory; | |
22 | |
23 import java.io.IOException; | |
24 import java.sql.Connection; | |
25 import java.sql.PreparedStatement; | |
26 import java.sql.ResultSet; | |
27 import java.sql.SQLException; | |
28 import java.sql.Statement; | |
29 import java.util.ArrayList; | |
30 import java.util.Collections; | |
31 import java.util.Date; | |
32 import java.util.HashMap; | |
33 import java.util.HashSet; | |
34 import java.util.List; | |
35 import java.util.Map; | |
36 import java.util.Set; | |
37 import java.util.concurrent.TimeUnit; | |
38 | |
39 | |
40 final class SubscriptionImpl implements Subscription { | |
41 private static final Logger logger = LoggerFactory.getLogger(SubscriptionImpl.class); | |
42 | |
43 private final UserImpl subscriber; | |
44 private final NodeImpl node; | |
45 private To to; | |
46 private Type type; | |
47 | |
48 private SubscriptionImpl(UserImpl subscriber,NodeImpl node,To to,Type type) { | |
49 this.subscriber = subscriber; | |
50 this.node = node; | |
51 this.to = to; | |
52 this.type = type; | |
53 } | |
54 | |
55 private DbDatabase db() { | |
56 return node.siteKey.getDb(); | |
57 } | |
58 | |
59 public User getSubscriber() { | |
60 return subscriber; | |
61 } | |
62 | |
63 public Node getNode() { | |
64 return node; | |
65 } | |
66 | |
67 public To getTo() { | |
68 return to; | |
69 } | |
70 | |
71 public void setTo(To to) { | |
72 if( this.to == to ) | |
73 return; | |
74 this.to = to; | |
75 set( "direct_only", to==To.CHILDREN ); | |
76 } | |
77 | |
78 public Type getType() { | |
79 return type; | |
80 } | |
81 | |
82 public void setType(Type type) { | |
83 if( this.type == type ) | |
84 return; | |
85 this.type = type; | |
86 set( "daily_digest", type==Type.DAILY_DIGEST ); | |
87 } | |
88 | |
89 private void set(String field,boolean b) { | |
90 try { | |
91 Connection con = db().getConnection(); | |
92 PreparedStatement stmt = con.prepareStatement( | |
93 "update subscription set "+field+"= ? where user_id = ? and node_id = ?" | |
94 ); | |
95 stmt.setBoolean( 1, b ); | |
96 stmt.setLong( 2, subscriber.getId() ); | |
97 stmt.setLong( 3, node.getId() ); | |
98 stmt.executeUpdate(); | |
99 stmt.close(); | |
100 con.close(); | |
101 } catch(SQLException e) { | |
102 throw new RuntimeException(e); | |
103 } | |
104 } | |
105 | |
106 public void delete() { | |
107 try { | |
108 Connection con = db().getConnection(); | |
109 PreparedStatement stmt = con.prepareStatement( | |
110 "delete from subscription where user_id = ? and node_id = ?" | |
111 ); | |
112 stmt.setLong( 1, subscriber.getId() ); | |
113 stmt.setLong( 2, node.getId() ); | |
114 stmt.executeUpdate(); | |
115 stmt.close(); | |
116 con.close(); | |
117 } catch(SQLException e) { | |
118 throw new RuntimeException(e); | |
119 } | |
120 } | |
121 | |
122 static SubscriptionImpl insert(UserImpl subscriber,NodeImpl node,To to,Type type) { | |
123 SubscriptionImpl subscription = new SubscriptionImpl(subscriber,node,to,type); | |
124 subscription.insert(); | |
125 return subscription; | |
126 } | |
127 | |
128 private void insert() { | |
129 try { | |
130 Connection con = db().getConnection(); | |
131 PreparedStatement stmt = con.prepareStatement( | |
132 "insert into subscription (user_id,node_id,direct_only,daily_digest) values (?,?,?,?)" | |
133 ); | |
134 stmt.setLong( 1, subscriber.getId() ); | |
135 stmt.setLong( 2, node.getId() ); | |
136 stmt.setBoolean( 3, to==To.CHILDREN ); | |
137 stmt.setBoolean( 4, type==Type.DAILY_DIGEST ); | |
138 stmt.executeUpdate(); | |
139 stmt.close(); | |
140 con.close(); | |
141 } catch(SQLException e) { | |
142 throw new RuntimeException(e); | |
143 } | |
144 } | |
145 | |
146 private boolean autoUnsubscribe() { | |
147 if( subscriber.isAutoUnsubscribe() ) { | |
148 logger.warn("auto-unsubscribing "+subscriber); | |
149 delete(); | |
150 return true; | |
151 } else { | |
152 return false; | |
153 } | |
154 } | |
155 | |
156 static SubscriptionImpl getSubscription(SiteKey siteKey,ResultSet rs) throws SQLException { | |
157 UserImpl subscriber = UserImpl.getUser( siteKey, rs.getLong("user_id") ); | |
158 NodeImpl node = NodeImpl.getNode( siteKey, rs.getLong("node_id") ); | |
159 To to = rs.getBoolean("direct_only") ? To.CHILDREN : To.DESCENDANTS; | |
160 Type type = rs.getBoolean("daily_digest") ? Type.DAILY_DIGEST : Type.INSTANT; | |
161 SubscriptionImpl subscription = new SubscriptionImpl(subscriber,node,to,type); | |
162 return subscription.autoUnsubscribe() ? null : subscription; | |
163 } | |
164 | |
165 static SubscriptionImpl getSubscription(UserImpl subscriber,NodeImpl node) { | |
166 if( !node.getDbRecord().isInDb() ) | |
167 return null; | |
168 try { | |
169 Connection con = node.siteKey.getDb().getConnection(); | |
170 PreparedStatement stmt = con.prepareStatement( | |
171 "select * from subscription where user_id = ? and node_id = ?" | |
172 ); | |
173 stmt.setLong( 1, subscriber.getId() ); | |
174 stmt.setLong( 2, node.getId() ); | |
175 ResultSet rs = stmt.executeQuery(); | |
176 try { | |
177 return rs.next() ? SubscriptionImpl.getSubscription(node.siteKey,rs) : null; | |
178 } finally { | |
179 stmt.close(); | |
180 con.close(); | |
181 } | |
182 } catch(SQLException e) { | |
183 throw new RuntimeException(e); | |
184 } | |
185 } | |
186 | |
187 static boolean isSubscribed(UserImpl subscriber,NodeImpl node) { | |
188 try { | |
189 Connection con = node.siteKey.getDb().getConnection(); | |
190 PreparedStatement stmt = con.prepareStatement( | |
191 "select 1 from subscription where user_id = ? and node_id = ?" | |
192 ); | |
193 stmt.setLong( 1, subscriber.getId() ); | |
194 stmt.setLong( 2, node.getId() ); | |
195 try { | |
196 return stmt.executeQuery().next(); | |
197 } finally { | |
198 stmt.close(); | |
199 con.close(); | |
200 } | |
201 } catch(SQLException e) { | |
202 throw new RuntimeException(e); | |
203 } | |
204 } | |
205 | |
206 | |
207 static { | |
208 if( Init.hasDaemons && MailHome.isSendingMail() ) { | |
209 | |
210 NodeImpl.addPostInsertListener( new Listener<NodeImpl>() { | |
211 public void event(final NodeImpl node) { | |
212 if( ModelHome.insideImportProcedure.get() ) | |
213 return; | |
214 DbDatabase db = node.siteKey.getDb(); | |
215 Executors.executeAfterCommit( db, new Runnable(){public void run(){ | |
216 sendInstantMailsFor( DbUtils.getGoodCopy(node) ); | |
217 }}); | |
218 } | |
219 } ); | |
220 | |
221 long secondsPerDay = 24*60*60; | |
222 long runTime = 60*60; // 1 am | |
223 Date now = new Date(); | |
224 long time = (now.getTime() - DateUtils.roundToDay(now).getTime())/1000; | |
225 long initialDelay = time < runTime | |
226 ? runTime - time | |
227 : secondsPerDay - (time - runTime) | |
228 ; | |
229 Executors.scheduleWithFixedDelay(new Runnable(){public void run(){ | |
230 Date yesterday = DateUtils.addDays( new Date(), -1 ); | |
231 sendDailyDigestsFor(yesterday); | |
232 ModelHome.lastDigestRun = System.currentTimeMillis(); | |
233 }}, initialDelay, secondsPerDay, TimeUnit.SECONDS ); | |
234 | |
235 } | |
236 } | |
237 | |
238 private static final String DAILY_DIGEST_TASK = "daily_digest"; | |
239 | |
240 static Map<User,Subscription> getSubscribersToNotify(NodeImpl node) { | |
241 Map<User,Subscription> map = new HashMap<User,Subscription>(); | |
242 NodeImpl subscribed = node.getParentImpl(); | |
243 if( subscribed==null ) | |
244 return map; | |
245 Set<User> subscribers = new HashSet<User>(); | |
246 boolean addedTask = false; | |
247 for( Subscription subscription : subscribed.getSubscriptions( | |
248 "select * from subscription" | |
249 +" where node_id = ?" | |
250 ) ) { | |
251 if( !addedTask && subscription.getType()==Type.DAILY_DIGEST ) { | |
252 node.getSiteImpl().addTask(DAILY_DIGEST_TASK); | |
253 addedTask = true; | |
254 } | |
255 User subscriber = subscription.getSubscriber(); | |
256 if( subscribers.add(subscriber) && subscription.getType()==Type.INSTANT ) | |
257 map.put( subscriber, subscription ); | |
258 } | |
259 while( (subscribed = subscribed.getParentImpl()) != null ) { | |
260 for( Subscription subscription : subscribed.getSubscriptions( | |
261 "select * from subscription" | |
262 +" where node_id = ?" | |
263 +" and not direct_only" | |
264 ) ) { | |
265 if( !addedTask && subscription.getType()==Type.DAILY_DIGEST ) { | |
266 node.getSiteImpl().addTask(DAILY_DIGEST_TASK); | |
267 addedTask = true; | |
268 } | |
269 User subscriber = subscription.getSubscriber(); | |
270 if( subscribers.add(subscriber ) && subscription.getType()==Type.INSTANT ) | |
271 map.put( subscriber, subscription ); | |
272 } | |
273 } | |
274 return map; | |
275 } | |
276 | |
277 private static void sendInstantMailsFor(NodeImpl node) { | |
278 if( node==null ) | |
279 return; | |
280 Template template = node.getSite().getTemplate( "notify_subscribers", | |
281 BasicNamespace.class, NabbleNamespace.class, NodeNamespace.class | |
282 ); | |
283 template.run( TemplatePrintWriter.NULL, Collections.<String,Object>emptyMap(), | |
284 new BasicNamespace(template), | |
285 new NabbleNamespace(node.getSite()), | |
286 new NodeNamespace(node) | |
287 ); | |
288 } | |
289 | |
290 private static void sendDailyDigestsFor(Date date) { | |
291 logger.info("starting daily digests"); | |
292 try { | |
293 Query day = Lucene.day(date); | |
294 for( SiteKey siteKey : SiteKey.getSiteKeys(DAILY_DIGEST_TASK) ) { | |
295 logger.info("daily digests for site = " + siteKey.getId()); | |
296 Connection con = siteKey.getDb().getConnection(); | |
297 Statement stmt = con.createStatement(); | |
298 ResultSet rs = stmt.executeQuery( | |
299 "select *" | |
300 +" from subscription" | |
301 +" where daily_digest" | |
302 ); | |
303 while( rs.next() ) { | |
304 SubscriptionImpl subscription = getSubscription(siteKey,rs); | |
305 if( subscription==null ) | |
306 continue; | |
307 NodeImpl subscribed = subscription.node; | |
308 SiteImpl site = subscribed.getSiteImpl(); | |
309 User subscriber = subscription.getSubscriber(); | |
310 List<Node> nodes = new ArrayList<Node>(); | |
311 BooleanQuery query = new BooleanQuery(); | |
312 query.add( day, BooleanClause.Occur.MUST ); | |
313 if( subscription.getTo() == To.CHILDREN ) { | |
314 query.add( Lucene.children(subscribed), BooleanClause.Occur.MUST ); | |
315 } else { | |
316 query.add( Lucene.descendants(subscribed), BooleanClause.Occur.MUST ); | |
317 query.add( Lucene.node(subscribed), BooleanClause.Occur.MUST_NOT ); | |
318 } | |
319 LuceneSearcher searcher = Lucene.newSearcher(subscribed.getSite()); | |
320 try { | |
321 for( ScoreDoc sdoc : searcher.search(query,1000).scoreDocs ) { | |
322 Node node = Lucene.getNode( site, searcher, sdoc.doc ); | |
323 if( node != null ) | |
324 nodes.add(node); | |
325 } | |
326 } finally { | |
327 searcher.close(); | |
328 } | |
329 if( !nodes.isEmpty() ) { | |
330 Collections.sort(nodes, Node.dateComparator); | |
331 sendDigestEmail(subscribed, subscriber, nodes); | |
332 } | |
333 } | |
334 stmt.close(); | |
335 con.close(); | |
336 } | |
337 } catch(SQLException e) { | |
338 logger.error("SQLException", e); | |
339 throw new RuntimeException(e); | |
340 } catch(IOException e) { | |
341 logger.error("IOException", e); | |
342 throw new RuntimeException(e); | |
343 } | |
344 logger.info("finished daily digests"); | |
345 } | |
346 | |
347 static void nop() {} | |
348 | |
349 | |
350 private static void sendDigestEmail(Node subscriptionNode, User subscriber, List<Node> nodes) { | |
351 Template template = subscriptionNode.getSite().getTemplate( "digest email", | |
352 BasicNamespace.class, NabbleNamespace.class, NodeList.class, SubscriptionNamespace.class | |
353 ); | |
354 template.run( TemplatePrintWriter.NULL, Collections.<String,Object>emptyMap(), | |
355 new BasicNamespace(template), | |
356 new NabbleNamespace(subscriptionNode.getSite()), | |
357 new NodeList(nodes,null,false), | |
358 new SubscriptionNamespace(subscriptionNode, subscriber) | |
359 ); | |
360 } | |
361 | |
362 // luan shell | |
363 public static void testDigest() { | |
364 sendDailyDigestsFor(new Date()); | |
365 } | |
366 | |
367 } |