0
|
1 package nabble.model;
|
|
2
|
|
3 import fschmidt.db.DbDatabase;
|
|
4 import fschmidt.db.DbUtils;
|
|
5 import fschmidt.db.Listener;
|
|
6 import fschmidt.util.java.DateUtils;
|
|
7 import fschmidt.util.mail.MailHome;
|
|
8 import nabble.model.lucene.LuceneSearcher;
|
|
9 import nabble.naml.compiler.Template;
|
|
10 import nabble.naml.compiler.TemplatePrintWriter;
|
|
11 import nabble.naml.namespaces.BasicNamespace;
|
|
12 import nabble.view.web.template.NodeNamespace;
|
|
13 import nabble.view.web.template.NabbleNamespace;
|
|
14 import nabble.view.web.template.NodeList;
|
|
15 import nabble.view.web.template.SubscriptionNamespace;
|
|
16 import org.apache.lucene.search.BooleanClause;
|
|
17 import org.apache.lucene.search.BooleanQuery;
|
|
18 import org.apache.lucene.search.Query;
|
|
19 import org.apache.lucene.search.ScoreDoc;
|
|
20 import org.slf4j.Logger;
|
|
21 import org.slf4j.LoggerFactory;
|
|
22
|
|
23 import java.io.IOException;
|
|
24 import java.sql.Connection;
|
|
25 import java.sql.PreparedStatement;
|
|
26 import java.sql.ResultSet;
|
|
27 import java.sql.SQLException;
|
|
28 import java.sql.Statement;
|
|
29 import java.util.ArrayList;
|
|
30 import java.util.Collections;
|
|
31 import java.util.Date;
|
|
32 import java.util.HashMap;
|
|
33 import java.util.HashSet;
|
|
34 import java.util.List;
|
|
35 import java.util.Map;
|
|
36 import java.util.Set;
|
|
37 import java.util.concurrent.TimeUnit;
|
|
38
|
|
39
|
|
40 final class SubscriptionImpl implements Subscription {
|
|
41 private static final Logger logger = LoggerFactory.getLogger(SubscriptionImpl.class);
|
|
42
|
|
43 private final UserImpl subscriber;
|
|
44 private final NodeImpl node;
|
|
45 private To to;
|
|
46 private Type type;
|
|
47
|
|
48 private SubscriptionImpl(UserImpl subscriber,NodeImpl node,To to,Type type) {
|
|
49 this.subscriber = subscriber;
|
|
50 this.node = node;
|
|
51 this.to = to;
|
|
52 this.type = type;
|
|
53 }
|
|
54
|
|
55 private DbDatabase db() {
|
|
56 return node.siteKey.getDb();
|
|
57 }
|
|
58
|
|
59 public User getSubscriber() {
|
|
60 return subscriber;
|
|
61 }
|
|
62
|
|
63 public Node getNode() {
|
|
64 return node;
|
|
65 }
|
|
66
|
|
67 public To getTo() {
|
|
68 return to;
|
|
69 }
|
|
70
|
|
71 public void setTo(To to) {
|
|
72 if( this.to == to )
|
|
73 return;
|
|
74 this.to = to;
|
|
75 set( "direct_only", to==To.CHILDREN );
|
|
76 }
|
|
77
|
|
78 public Type getType() {
|
|
79 return type;
|
|
80 }
|
|
81
|
|
82 public void setType(Type type) {
|
|
83 if( this.type == type )
|
|
84 return;
|
|
85 this.type = type;
|
|
86 set( "daily_digest", type==Type.DAILY_DIGEST );
|
|
87 }
|
|
88
|
|
89 private void set(String field,boolean b) {
|
|
90 try {
|
|
91 Connection con = db().getConnection();
|
|
92 PreparedStatement stmt = con.prepareStatement(
|
|
93 "update subscription set "+field+"= ? where user_id = ? and node_id = ?"
|
|
94 );
|
|
95 stmt.setBoolean( 1, b );
|
|
96 stmt.setLong( 2, subscriber.getId() );
|
|
97 stmt.setLong( 3, node.getId() );
|
|
98 stmt.executeUpdate();
|
|
99 stmt.close();
|
|
100 con.close();
|
|
101 } catch(SQLException e) {
|
|
102 throw new RuntimeException(e);
|
|
103 }
|
|
104 }
|
|
105
|
|
106 public void delete() {
|
|
107 try {
|
|
108 Connection con = db().getConnection();
|
|
109 PreparedStatement stmt = con.prepareStatement(
|
|
110 "delete from subscription where user_id = ? and node_id = ?"
|
|
111 );
|
|
112 stmt.setLong( 1, subscriber.getId() );
|
|
113 stmt.setLong( 2, node.getId() );
|
|
114 stmt.executeUpdate();
|
|
115 stmt.close();
|
|
116 con.close();
|
|
117 } catch(SQLException e) {
|
|
118 throw new RuntimeException(e);
|
|
119 }
|
|
120 }
|
|
121
|
|
122 static SubscriptionImpl insert(UserImpl subscriber,NodeImpl node,To to,Type type) {
|
|
123 SubscriptionImpl subscription = new SubscriptionImpl(subscriber,node,to,type);
|
|
124 subscription.insert();
|
|
125 return subscription;
|
|
126 }
|
|
127
|
|
128 private void insert() {
|
|
129 try {
|
|
130 Connection con = db().getConnection();
|
|
131 PreparedStatement stmt = con.prepareStatement(
|
|
132 "insert into subscription (user_id,node_id,direct_only,daily_digest) values (?,?,?,?)"
|
|
133 );
|
|
134 stmt.setLong( 1, subscriber.getId() );
|
|
135 stmt.setLong( 2, node.getId() );
|
|
136 stmt.setBoolean( 3, to==To.CHILDREN );
|
|
137 stmt.setBoolean( 4, type==Type.DAILY_DIGEST );
|
|
138 stmt.executeUpdate();
|
|
139 stmt.close();
|
|
140 con.close();
|
|
141 } catch(SQLException e) {
|
|
142 throw new RuntimeException(e);
|
|
143 }
|
|
144 }
|
|
145
|
|
146 private boolean autoUnsubscribe() {
|
|
147 if( subscriber.isAutoUnsubscribe() ) {
|
|
148 logger.warn("auto-unsubscribing "+subscriber);
|
|
149 delete();
|
|
150 return true;
|
|
151 } else {
|
|
152 return false;
|
|
153 }
|
|
154 }
|
|
155
|
|
156 static SubscriptionImpl getSubscription(SiteKey siteKey,ResultSet rs) throws SQLException {
|
|
157 UserImpl subscriber = UserImpl.getUser( siteKey, rs.getLong("user_id") );
|
|
158 NodeImpl node = NodeImpl.getNode( siteKey, rs.getLong("node_id") );
|
|
159 To to = rs.getBoolean("direct_only") ? To.CHILDREN : To.DESCENDANTS;
|
|
160 Type type = rs.getBoolean("daily_digest") ? Type.DAILY_DIGEST : Type.INSTANT;
|
|
161 SubscriptionImpl subscription = new SubscriptionImpl(subscriber,node,to,type);
|
|
162 return subscription.autoUnsubscribe() ? null : subscription;
|
|
163 }
|
|
164
|
|
165 static SubscriptionImpl getSubscription(UserImpl subscriber,NodeImpl node) {
|
|
166 if( !node.getDbRecord().isInDb() )
|
|
167 return null;
|
|
168 try {
|
|
169 Connection con = node.siteKey.getDb().getConnection();
|
|
170 PreparedStatement stmt = con.prepareStatement(
|
|
171 "select * from subscription where user_id = ? and node_id = ?"
|
|
172 );
|
|
173 stmt.setLong( 1, subscriber.getId() );
|
|
174 stmt.setLong( 2, node.getId() );
|
|
175 ResultSet rs = stmt.executeQuery();
|
|
176 try {
|
|
177 return rs.next() ? SubscriptionImpl.getSubscription(node.siteKey,rs) : null;
|
|
178 } finally {
|
|
179 stmt.close();
|
|
180 con.close();
|
|
181 }
|
|
182 } catch(SQLException e) {
|
|
183 throw new RuntimeException(e);
|
|
184 }
|
|
185 }
|
|
186
|
|
187 static boolean isSubscribed(UserImpl subscriber,NodeImpl node) {
|
|
188 try {
|
|
189 Connection con = node.siteKey.getDb().getConnection();
|
|
190 PreparedStatement stmt = con.prepareStatement(
|
|
191 "select 1 from subscription where user_id = ? and node_id = ?"
|
|
192 );
|
|
193 stmt.setLong( 1, subscriber.getId() );
|
|
194 stmt.setLong( 2, node.getId() );
|
|
195 try {
|
|
196 return stmt.executeQuery().next();
|
|
197 } finally {
|
|
198 stmt.close();
|
|
199 con.close();
|
|
200 }
|
|
201 } catch(SQLException e) {
|
|
202 throw new RuntimeException(e);
|
|
203 }
|
|
204 }
|
|
205
|
|
206
|
|
207 static {
|
|
208 if( Init.hasDaemons && MailHome.isSendingMail() ) {
|
|
209
|
|
210 NodeImpl.addPostInsertListener( new Listener<NodeImpl>() {
|
|
211 public void event(final NodeImpl node) {
|
|
212 if( ModelHome.insideImportProcedure.get() )
|
|
213 return;
|
|
214 DbDatabase db = node.siteKey.getDb();
|
|
215 Executors.executeAfterCommit( db, new Runnable(){public void run(){
|
|
216 sendInstantMailsFor( DbUtils.getGoodCopy(node) );
|
|
217 }});
|
|
218 }
|
|
219 } );
|
|
220
|
|
221 long secondsPerDay = 24*60*60;
|
|
222 long runTime = 60*60; // 1 am
|
|
223 Date now = new Date();
|
|
224 long time = (now.getTime() - DateUtils.roundToDay(now).getTime())/1000;
|
|
225 long initialDelay = time < runTime
|
|
226 ? runTime - time
|
|
227 : secondsPerDay - (time - runTime)
|
|
228 ;
|
|
229 Executors.scheduleWithFixedDelay(new Runnable(){public void run(){
|
|
230 Date yesterday = DateUtils.addDays( new Date(), -1 );
|
|
231 sendDailyDigestsFor(yesterday);
|
|
232 ModelHome.lastDigestRun = System.currentTimeMillis();
|
|
233 }}, initialDelay, secondsPerDay, TimeUnit.SECONDS );
|
|
234
|
|
235 }
|
|
236 }
|
|
237
|
|
238 private static final String DAILY_DIGEST_TASK = "daily_digest";
|
|
239
|
|
240 static Map<User,Subscription> getSubscribersToNotify(NodeImpl node) {
|
|
241 Map<User,Subscription> map = new HashMap<User,Subscription>();
|
|
242 NodeImpl subscribed = node.getParentImpl();
|
|
243 if( subscribed==null )
|
|
244 return map;
|
|
245 Set<User> subscribers = new HashSet<User>();
|
|
246 boolean addedTask = false;
|
|
247 for( Subscription subscription : subscribed.getSubscriptions(
|
|
248 "select * from subscription"
|
|
249 +" where node_id = ?"
|
|
250 ) ) {
|
|
251 if( !addedTask && subscription.getType()==Type.DAILY_DIGEST ) {
|
|
252 node.getSiteImpl().addTask(DAILY_DIGEST_TASK);
|
|
253 addedTask = true;
|
|
254 }
|
|
255 User subscriber = subscription.getSubscriber();
|
|
256 if( subscribers.add(subscriber) && subscription.getType()==Type.INSTANT )
|
|
257 map.put( subscriber, subscription );
|
|
258 }
|
|
259 while( (subscribed = subscribed.getParentImpl()) != null ) {
|
|
260 for( Subscription subscription : subscribed.getSubscriptions(
|
|
261 "select * from subscription"
|
|
262 +" where node_id = ?"
|
|
263 +" and not direct_only"
|
|
264 ) ) {
|
|
265 if( !addedTask && subscription.getType()==Type.DAILY_DIGEST ) {
|
|
266 node.getSiteImpl().addTask(DAILY_DIGEST_TASK);
|
|
267 addedTask = true;
|
|
268 }
|
|
269 User subscriber = subscription.getSubscriber();
|
|
270 if( subscribers.add(subscriber ) && subscription.getType()==Type.INSTANT )
|
|
271 map.put( subscriber, subscription );
|
|
272 }
|
|
273 }
|
|
274 return map;
|
|
275 }
|
|
276
|
|
277 private static void sendInstantMailsFor(NodeImpl node) {
|
|
278 if( node==null )
|
|
279 return;
|
|
280 Template template = node.getSite().getTemplate( "notify_subscribers",
|
|
281 BasicNamespace.class, NabbleNamespace.class, NodeNamespace.class
|
|
282 );
|
|
283 template.run( TemplatePrintWriter.NULL, Collections.<String,Object>emptyMap(),
|
|
284 new BasicNamespace(template),
|
|
285 new NabbleNamespace(node.getSite()),
|
|
286 new NodeNamespace(node)
|
|
287 );
|
|
288 }
|
|
289
|
|
290 private static void sendDailyDigestsFor(Date date) {
|
|
291 logger.info("starting daily digests");
|
|
292 try {
|
|
293 Query day = Lucene.day(date);
|
|
294 for( SiteKey siteKey : SiteKey.getSiteKeys(DAILY_DIGEST_TASK) ) {
|
|
295 logger.info("daily digests for site = " + siteKey.getId());
|
|
296 Connection con = siteKey.getDb().getConnection();
|
|
297 Statement stmt = con.createStatement();
|
|
298 ResultSet rs = stmt.executeQuery(
|
|
299 "select *"
|
|
300 +" from subscription"
|
|
301 +" where daily_digest"
|
|
302 );
|
|
303 while( rs.next() ) {
|
|
304 SubscriptionImpl subscription = getSubscription(siteKey,rs);
|
|
305 if( subscription==null )
|
|
306 continue;
|
|
307 NodeImpl subscribed = subscription.node;
|
|
308 SiteImpl site = subscribed.getSiteImpl();
|
|
309 User subscriber = subscription.getSubscriber();
|
|
310 List<Node> nodes = new ArrayList<Node>();
|
|
311 BooleanQuery query = new BooleanQuery();
|
|
312 query.add( day, BooleanClause.Occur.MUST );
|
|
313 if( subscription.getTo() == To.CHILDREN ) {
|
|
314 query.add( Lucene.children(subscribed), BooleanClause.Occur.MUST );
|
|
315 } else {
|
|
316 query.add( Lucene.descendants(subscribed), BooleanClause.Occur.MUST );
|
|
317 query.add( Lucene.node(subscribed), BooleanClause.Occur.MUST_NOT );
|
|
318 }
|
|
319 LuceneSearcher searcher = Lucene.newSearcher(subscribed.getSite());
|
|
320 try {
|
|
321 for( ScoreDoc sdoc : searcher.search(query,1000).scoreDocs ) {
|
|
322 Node node = Lucene.getNode( site, searcher, sdoc.doc );
|
|
323 if( node != null )
|
|
324 nodes.add(node);
|
|
325 }
|
|
326 } finally {
|
|
327 searcher.close();
|
|
328 }
|
|
329 if( !nodes.isEmpty() ) {
|
|
330 Collections.sort(nodes, Node.dateComparator);
|
|
331 sendDigestEmail(subscribed, subscriber, nodes);
|
|
332 }
|
|
333 }
|
|
334 stmt.close();
|
|
335 con.close();
|
|
336 }
|
|
337 } catch(SQLException e) {
|
|
338 logger.error("SQLException", e);
|
|
339 throw new RuntimeException(e);
|
|
340 } catch(IOException e) {
|
|
341 logger.error("IOException", e);
|
|
342 throw new RuntimeException(e);
|
|
343 }
|
|
344 logger.info("finished daily digests");
|
|
345 }
|
|
346
|
|
347 static void nop() {}
|
|
348
|
|
349
|
|
350 private static void sendDigestEmail(Node subscriptionNode, User subscriber, List<Node> nodes) {
|
|
351 Template template = subscriptionNode.getSite().getTemplate( "digest email",
|
|
352 BasicNamespace.class, NabbleNamespace.class, NodeList.class, SubscriptionNamespace.class
|
|
353 );
|
|
354 template.run( TemplatePrintWriter.NULL, Collections.<String,Object>emptyMap(),
|
|
355 new BasicNamespace(template),
|
|
356 new NabbleNamespace(subscriptionNode.getSite()),
|
|
357 new NodeList(nodes,null,false),
|
|
358 new SubscriptionNamespace(subscriptionNode, subscriber)
|
|
359 );
|
|
360 }
|
|
361
|
|
362 // luan shell
|
|
363 public static void testDigest() {
|
|
364 sendDailyDigestsFor(new Date());
|
|
365 }
|
|
366
|
|
367 }
|