comparison src/nabble/model/SubscriptionImpl.java @ 0:7ecd1a4ef557

add content
author Franklin Schmidt <fschmidt@gmail.com>
date Thu, 21 Mar 2019 19:15:52 -0600
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:7ecd1a4ef557
1 package nabble.model;
2
3 import fschmidt.db.DbDatabase;
4 import fschmidt.db.DbUtils;
5 import fschmidt.db.Listener;
6 import fschmidt.util.java.DateUtils;
7 import fschmidt.util.mail.MailHome;
8 import nabble.model.lucene.LuceneSearcher;
9 import nabble.naml.compiler.Template;
10 import nabble.naml.compiler.TemplatePrintWriter;
11 import nabble.naml.namespaces.BasicNamespace;
12 import nabble.view.web.template.NodeNamespace;
13 import nabble.view.web.template.NabbleNamespace;
14 import nabble.view.web.template.NodeList;
15 import nabble.view.web.template.SubscriptionNamespace;
16 import org.apache.lucene.search.BooleanClause;
17 import org.apache.lucene.search.BooleanQuery;
18 import org.apache.lucene.search.Query;
19 import org.apache.lucene.search.ScoreDoc;
20 import org.slf4j.Logger;
21 import org.slf4j.LoggerFactory;
22
23 import java.io.IOException;
24 import java.sql.Connection;
25 import java.sql.PreparedStatement;
26 import java.sql.ResultSet;
27 import java.sql.SQLException;
28 import java.sql.Statement;
29 import java.util.ArrayList;
30 import java.util.Collections;
31 import java.util.Date;
32 import java.util.HashMap;
33 import java.util.HashSet;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.Set;
37 import java.util.concurrent.TimeUnit;
38
39
40 final class SubscriptionImpl implements Subscription {
41 private static final Logger logger = LoggerFactory.getLogger(SubscriptionImpl.class);
42
43 private final UserImpl subscriber;
44 private final NodeImpl node;
45 private To to;
46 private Type type;
47
48 private SubscriptionImpl(UserImpl subscriber,NodeImpl node,To to,Type type) {
49 this.subscriber = subscriber;
50 this.node = node;
51 this.to = to;
52 this.type = type;
53 }
54
55 private DbDatabase db() {
56 return node.siteKey.getDb();
57 }
58
59 public User getSubscriber() {
60 return subscriber;
61 }
62
63 public Node getNode() {
64 return node;
65 }
66
67 public To getTo() {
68 return to;
69 }
70
71 public void setTo(To to) {
72 if( this.to == to )
73 return;
74 this.to = to;
75 set( "direct_only", to==To.CHILDREN );
76 }
77
78 public Type getType() {
79 return type;
80 }
81
82 public void setType(Type type) {
83 if( this.type == type )
84 return;
85 this.type = type;
86 set( "daily_digest", type==Type.DAILY_DIGEST );
87 }
88
89 private void set(String field,boolean b) {
90 try {
91 Connection con = db().getConnection();
92 PreparedStatement stmt = con.prepareStatement(
93 "update subscription set "+field+"= ? where user_id = ? and node_id = ?"
94 );
95 stmt.setBoolean( 1, b );
96 stmt.setLong( 2, subscriber.getId() );
97 stmt.setLong( 3, node.getId() );
98 stmt.executeUpdate();
99 stmt.close();
100 con.close();
101 } catch(SQLException e) {
102 throw new RuntimeException(e);
103 }
104 }
105
106 public void delete() {
107 try {
108 Connection con = db().getConnection();
109 PreparedStatement stmt = con.prepareStatement(
110 "delete from subscription where user_id = ? and node_id = ?"
111 );
112 stmt.setLong( 1, subscriber.getId() );
113 stmt.setLong( 2, node.getId() );
114 stmt.executeUpdate();
115 stmt.close();
116 con.close();
117 } catch(SQLException e) {
118 throw new RuntimeException(e);
119 }
120 }
121
122 static SubscriptionImpl insert(UserImpl subscriber,NodeImpl node,To to,Type type) {
123 SubscriptionImpl subscription = new SubscriptionImpl(subscriber,node,to,type);
124 subscription.insert();
125 return subscription;
126 }
127
128 private void insert() {
129 try {
130 Connection con = db().getConnection();
131 PreparedStatement stmt = con.prepareStatement(
132 "insert into subscription (user_id,node_id,direct_only,daily_digest) values (?,?,?,?)"
133 );
134 stmt.setLong( 1, subscriber.getId() );
135 stmt.setLong( 2, node.getId() );
136 stmt.setBoolean( 3, to==To.CHILDREN );
137 stmt.setBoolean( 4, type==Type.DAILY_DIGEST );
138 stmt.executeUpdate();
139 stmt.close();
140 con.close();
141 } catch(SQLException e) {
142 throw new RuntimeException(e);
143 }
144 }
145
146 private boolean autoUnsubscribe() {
147 if( subscriber.isAutoUnsubscribe() ) {
148 logger.warn("auto-unsubscribing "+subscriber);
149 delete();
150 return true;
151 } else {
152 return false;
153 }
154 }
155
156 static SubscriptionImpl getSubscription(SiteKey siteKey,ResultSet rs) throws SQLException {
157 UserImpl subscriber = UserImpl.getUser( siteKey, rs.getLong("user_id") );
158 NodeImpl node = NodeImpl.getNode( siteKey, rs.getLong("node_id") );
159 To to = rs.getBoolean("direct_only") ? To.CHILDREN : To.DESCENDANTS;
160 Type type = rs.getBoolean("daily_digest") ? Type.DAILY_DIGEST : Type.INSTANT;
161 SubscriptionImpl subscription = new SubscriptionImpl(subscriber,node,to,type);
162 return subscription.autoUnsubscribe() ? null : subscription;
163 }
164
165 static SubscriptionImpl getSubscription(UserImpl subscriber,NodeImpl node) {
166 if( !node.getDbRecord().isInDb() )
167 return null;
168 try {
169 Connection con = node.siteKey.getDb().getConnection();
170 PreparedStatement stmt = con.prepareStatement(
171 "select * from subscription where user_id = ? and node_id = ?"
172 );
173 stmt.setLong( 1, subscriber.getId() );
174 stmt.setLong( 2, node.getId() );
175 ResultSet rs = stmt.executeQuery();
176 try {
177 return rs.next() ? SubscriptionImpl.getSubscription(node.siteKey,rs) : null;
178 } finally {
179 stmt.close();
180 con.close();
181 }
182 } catch(SQLException e) {
183 throw new RuntimeException(e);
184 }
185 }
186
187 static boolean isSubscribed(UserImpl subscriber,NodeImpl node) {
188 try {
189 Connection con = node.siteKey.getDb().getConnection();
190 PreparedStatement stmt = con.prepareStatement(
191 "select 1 from subscription where user_id = ? and node_id = ?"
192 );
193 stmt.setLong( 1, subscriber.getId() );
194 stmt.setLong( 2, node.getId() );
195 try {
196 return stmt.executeQuery().next();
197 } finally {
198 stmt.close();
199 con.close();
200 }
201 } catch(SQLException e) {
202 throw new RuntimeException(e);
203 }
204 }
205
206
207 static {
208 if( Init.hasDaemons && MailHome.isSendingMail() ) {
209
210 NodeImpl.addPostInsertListener( new Listener<NodeImpl>() {
211 public void event(final NodeImpl node) {
212 if( ModelHome.insideImportProcedure.get() )
213 return;
214 DbDatabase db = node.siteKey.getDb();
215 Executors.executeAfterCommit( db, new Runnable(){public void run(){
216 sendInstantMailsFor( DbUtils.getGoodCopy(node) );
217 }});
218 }
219 } );
220
221 long secondsPerDay = 24*60*60;
222 long runTime = 60*60; // 1 am
223 Date now = new Date();
224 long time = (now.getTime() - DateUtils.roundToDay(now).getTime())/1000;
225 long initialDelay = time < runTime
226 ? runTime - time
227 : secondsPerDay - (time - runTime)
228 ;
229 Executors.scheduleWithFixedDelay(new Runnable(){public void run(){
230 Date yesterday = DateUtils.addDays( new Date(), -1 );
231 sendDailyDigestsFor(yesterday);
232 ModelHome.lastDigestRun = System.currentTimeMillis();
233 }}, initialDelay, secondsPerDay, TimeUnit.SECONDS );
234
235 }
236 }
237
238 private static final String DAILY_DIGEST_TASK = "daily_digest";
239
240 static Map<User,Subscription> getSubscribersToNotify(NodeImpl node) {
241 Map<User,Subscription> map = new HashMap<User,Subscription>();
242 NodeImpl subscribed = node.getParentImpl();
243 if( subscribed==null )
244 return map;
245 Set<User> subscribers = new HashSet<User>();
246 boolean addedTask = false;
247 for( Subscription subscription : subscribed.getSubscriptions(
248 "select * from subscription"
249 +" where node_id = ?"
250 ) ) {
251 if( !addedTask && subscription.getType()==Type.DAILY_DIGEST ) {
252 node.getSiteImpl().addTask(DAILY_DIGEST_TASK);
253 addedTask = true;
254 }
255 User subscriber = subscription.getSubscriber();
256 if( subscribers.add(subscriber) && subscription.getType()==Type.INSTANT )
257 map.put( subscriber, subscription );
258 }
259 while( (subscribed = subscribed.getParentImpl()) != null ) {
260 for( Subscription subscription : subscribed.getSubscriptions(
261 "select * from subscription"
262 +" where node_id = ?"
263 +" and not direct_only"
264 ) ) {
265 if( !addedTask && subscription.getType()==Type.DAILY_DIGEST ) {
266 node.getSiteImpl().addTask(DAILY_DIGEST_TASK);
267 addedTask = true;
268 }
269 User subscriber = subscription.getSubscriber();
270 if( subscribers.add(subscriber ) && subscription.getType()==Type.INSTANT )
271 map.put( subscriber, subscription );
272 }
273 }
274 return map;
275 }
276
277 private static void sendInstantMailsFor(NodeImpl node) {
278 if( node==null )
279 return;
280 Template template = node.getSite().getTemplate( "notify_subscribers",
281 BasicNamespace.class, NabbleNamespace.class, NodeNamespace.class
282 );
283 template.run( TemplatePrintWriter.NULL, Collections.<String,Object>emptyMap(),
284 new BasicNamespace(template),
285 new NabbleNamespace(node.getSite()),
286 new NodeNamespace(node)
287 );
288 }
289
290 private static void sendDailyDigestsFor(Date date) {
291 logger.info("starting daily digests");
292 try {
293 Query day = Lucene.day(date);
294 for( SiteKey siteKey : SiteKey.getSiteKeys(DAILY_DIGEST_TASK) ) {
295 logger.info("daily digests for site = " + siteKey.getId());
296 Connection con = siteKey.getDb().getConnection();
297 Statement stmt = con.createStatement();
298 ResultSet rs = stmt.executeQuery(
299 "select *"
300 +" from subscription"
301 +" where daily_digest"
302 );
303 while( rs.next() ) {
304 SubscriptionImpl subscription = getSubscription(siteKey,rs);
305 if( subscription==null )
306 continue;
307 NodeImpl subscribed = subscription.node;
308 SiteImpl site = subscribed.getSiteImpl();
309 User subscriber = subscription.getSubscriber();
310 List<Node> nodes = new ArrayList<Node>();
311 BooleanQuery query = new BooleanQuery();
312 query.add( day, BooleanClause.Occur.MUST );
313 if( subscription.getTo() == To.CHILDREN ) {
314 query.add( Lucene.children(subscribed), BooleanClause.Occur.MUST );
315 } else {
316 query.add( Lucene.descendants(subscribed), BooleanClause.Occur.MUST );
317 query.add( Lucene.node(subscribed), BooleanClause.Occur.MUST_NOT );
318 }
319 LuceneSearcher searcher = Lucene.newSearcher(subscribed.getSite());
320 try {
321 for( ScoreDoc sdoc : searcher.search(query,1000).scoreDocs ) {
322 Node node = Lucene.getNode( site, searcher, sdoc.doc );
323 if( node != null )
324 nodes.add(node);
325 }
326 } finally {
327 searcher.close();
328 }
329 if( !nodes.isEmpty() ) {
330 Collections.sort(nodes, Node.dateComparator);
331 sendDigestEmail(subscribed, subscriber, nodes);
332 }
333 }
334 stmt.close();
335 con.close();
336 }
337 } catch(SQLException e) {
338 logger.error("SQLException", e);
339 throw new RuntimeException(e);
340 } catch(IOException e) {
341 logger.error("IOException", e);
342 throw new RuntimeException(e);
343 }
344 logger.info("finished daily digests");
345 }
346
347 static void nop() {}
348
349
350 private static void sendDigestEmail(Node subscriptionNode, User subscriber, List<Node> nodes) {
351 Template template = subscriptionNode.getSite().getTemplate( "digest email",
352 BasicNamespace.class, NabbleNamespace.class, NodeList.class, SubscriptionNamespace.class
353 );
354 template.run( TemplatePrintWriter.NULL, Collections.<String,Object>emptyMap(),
355 new BasicNamespace(template),
356 new NabbleNamespace(subscriptionNode.getSite()),
357 new NodeList(nodes,null,false),
358 new SubscriptionNamespace(subscriptionNode, subscriber)
359 );
360 }
361
362 // luan shell
363 public static void testDigest() {
364 sendDailyDigestsFor(new Date());
365 }
366
367 }