Mercurial Hosting > nabble
view src/nabble/model/SubscriptionImpl.java @ 0:7ecd1a4ef557
add content
author | Franklin Schmidt <fschmidt@gmail.com> |
---|---|
date | Thu, 21 Mar 2019 19:15:52 -0600 |
parents | |
children |
line wrap: on
line source
package nabble.model; import fschmidt.db.DbDatabase; import fschmidt.db.DbUtils; import fschmidt.db.Listener; import fschmidt.util.java.DateUtils; import fschmidt.util.mail.MailHome; import nabble.model.lucene.LuceneSearcher; import nabble.naml.compiler.Template; import nabble.naml.compiler.TemplatePrintWriter; import nabble.naml.namespaces.BasicNamespace; import nabble.view.web.template.NodeNamespace; import nabble.view.web.template.NabbleNamespace; import nabble.view.web.template.NodeList; import nabble.view.web.template.SubscriptionNamespace; import org.apache.lucene.search.BooleanClause; import org.apache.lucene.search.BooleanQuery; import org.apache.lucene.search.Query; import org.apache.lucene.search.ScoreDoc; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; final class SubscriptionImpl implements Subscription { private static final Logger logger = LoggerFactory.getLogger(SubscriptionImpl.class); private final UserImpl subscriber; private final NodeImpl node; private To to; private Type type; private SubscriptionImpl(UserImpl subscriber,NodeImpl node,To to,Type type) { this.subscriber = subscriber; this.node = node; this.to = to; this.type = type; } private DbDatabase db() { return node.siteKey.getDb(); } public User getSubscriber() { return subscriber; } public Node getNode() { return node; } public To getTo() { return to; } public void setTo(To to) { if( this.to == to ) return; this.to = to; set( "direct_only", to==To.CHILDREN ); } public Type getType() { return type; } public void setType(Type type) { if( this.type == type ) return; this.type = type; set( "daily_digest", type==Type.DAILY_DIGEST ); } private void set(String field,boolean b) { try { Connection con = db().getConnection(); PreparedStatement stmt = con.prepareStatement( "update subscription set "+field+"= ? where user_id = ? and node_id = ?" ); stmt.setBoolean( 1, b ); stmt.setLong( 2, subscriber.getId() ); stmt.setLong( 3, node.getId() ); stmt.executeUpdate(); stmt.close(); con.close(); } catch(SQLException e) { throw new RuntimeException(e); } } public void delete() { try { Connection con = db().getConnection(); PreparedStatement stmt = con.prepareStatement( "delete from subscription where user_id = ? and node_id = ?" ); stmt.setLong( 1, subscriber.getId() ); stmt.setLong( 2, node.getId() ); stmt.executeUpdate(); stmt.close(); con.close(); } catch(SQLException e) { throw new RuntimeException(e); } } static SubscriptionImpl insert(UserImpl subscriber,NodeImpl node,To to,Type type) { SubscriptionImpl subscription = new SubscriptionImpl(subscriber,node,to,type); subscription.insert(); return subscription; } private void insert() { try { Connection con = db().getConnection(); PreparedStatement stmt = con.prepareStatement( "insert into subscription (user_id,node_id,direct_only,daily_digest) values (?,?,?,?)" ); stmt.setLong( 1, subscriber.getId() ); stmt.setLong( 2, node.getId() ); stmt.setBoolean( 3, to==To.CHILDREN ); stmt.setBoolean( 4, type==Type.DAILY_DIGEST ); stmt.executeUpdate(); stmt.close(); con.close(); } catch(SQLException e) { throw new RuntimeException(e); } } private boolean autoUnsubscribe() { if( subscriber.isAutoUnsubscribe() ) { logger.warn("auto-unsubscribing "+subscriber); delete(); return true; } else { return false; } } static SubscriptionImpl getSubscription(SiteKey siteKey,ResultSet rs) throws SQLException { UserImpl subscriber = UserImpl.getUser( siteKey, rs.getLong("user_id") ); NodeImpl node = NodeImpl.getNode( siteKey, rs.getLong("node_id") ); To to = rs.getBoolean("direct_only") ? To.CHILDREN : To.DESCENDANTS; Type type = rs.getBoolean("daily_digest") ? Type.DAILY_DIGEST : Type.INSTANT; SubscriptionImpl subscription = new SubscriptionImpl(subscriber,node,to,type); return subscription.autoUnsubscribe() ? null : subscription; } static SubscriptionImpl getSubscription(UserImpl subscriber,NodeImpl node) { if( !node.getDbRecord().isInDb() ) return null; try { Connection con = node.siteKey.getDb().getConnection(); PreparedStatement stmt = con.prepareStatement( "select * from subscription where user_id = ? and node_id = ?" ); stmt.setLong( 1, subscriber.getId() ); stmt.setLong( 2, node.getId() ); ResultSet rs = stmt.executeQuery(); try { return rs.next() ? SubscriptionImpl.getSubscription(node.siteKey,rs) : null; } finally { stmt.close(); con.close(); } } catch(SQLException e) { throw new RuntimeException(e); } } static boolean isSubscribed(UserImpl subscriber,NodeImpl node) { try { Connection con = node.siteKey.getDb().getConnection(); PreparedStatement stmt = con.prepareStatement( "select 1 from subscription where user_id = ? and node_id = ?" ); stmt.setLong( 1, subscriber.getId() ); stmt.setLong( 2, node.getId() ); try { return stmt.executeQuery().next(); } finally { stmt.close(); con.close(); } } catch(SQLException e) { throw new RuntimeException(e); } } static { if( Init.hasDaemons && MailHome.isSendingMail() ) { NodeImpl.addPostInsertListener( new Listener<NodeImpl>() { public void event(final NodeImpl node) { if( ModelHome.insideImportProcedure.get() ) return; DbDatabase db = node.siteKey.getDb(); Executors.executeAfterCommit( db, new Runnable(){public void run(){ sendInstantMailsFor( DbUtils.getGoodCopy(node) ); }}); } } ); long secondsPerDay = 24*60*60; long runTime = 60*60; // 1 am Date now = new Date(); long time = (now.getTime() - DateUtils.roundToDay(now).getTime())/1000; long initialDelay = time < runTime ? runTime - time : secondsPerDay - (time - runTime) ; Executors.scheduleWithFixedDelay(new Runnable(){public void run(){ Date yesterday = DateUtils.addDays( new Date(), -1 ); sendDailyDigestsFor(yesterday); ModelHome.lastDigestRun = System.currentTimeMillis(); }}, initialDelay, secondsPerDay, TimeUnit.SECONDS ); } } private static final String DAILY_DIGEST_TASK = "daily_digest"; static Map<User,Subscription> getSubscribersToNotify(NodeImpl node) { Map<User,Subscription> map = new HashMap<User,Subscription>(); NodeImpl subscribed = node.getParentImpl(); if( subscribed==null ) return map; Set<User> subscribers = new HashSet<User>(); boolean addedTask = false; for( Subscription subscription : subscribed.getSubscriptions( "select * from subscription" +" where node_id = ?" ) ) { if( !addedTask && subscription.getType()==Type.DAILY_DIGEST ) { node.getSiteImpl().addTask(DAILY_DIGEST_TASK); addedTask = true; } User subscriber = subscription.getSubscriber(); if( subscribers.add(subscriber) && subscription.getType()==Type.INSTANT ) map.put( subscriber, subscription ); } while( (subscribed = subscribed.getParentImpl()) != null ) { for( Subscription subscription : subscribed.getSubscriptions( "select * from subscription" +" where node_id = ?" +" and not direct_only" ) ) { if( !addedTask && subscription.getType()==Type.DAILY_DIGEST ) { node.getSiteImpl().addTask(DAILY_DIGEST_TASK); addedTask = true; } User subscriber = subscription.getSubscriber(); if( subscribers.add(subscriber ) && subscription.getType()==Type.INSTANT ) map.put( subscriber, subscription ); } } return map; } private static void sendInstantMailsFor(NodeImpl node) { if( node==null ) return; Template template = node.getSite().getTemplate( "notify_subscribers", BasicNamespace.class, NabbleNamespace.class, NodeNamespace.class ); template.run( TemplatePrintWriter.NULL, Collections.<String,Object>emptyMap(), new BasicNamespace(template), new NabbleNamespace(node.getSite()), new NodeNamespace(node) ); } private static void sendDailyDigestsFor(Date date) { logger.info("starting daily digests"); try { Query day = Lucene.day(date); for( SiteKey siteKey : SiteKey.getSiteKeys(DAILY_DIGEST_TASK) ) { logger.info("daily digests for site = " + siteKey.getId()); Connection con = siteKey.getDb().getConnection(); Statement stmt = con.createStatement(); ResultSet rs = stmt.executeQuery( "select *" +" from subscription" +" where daily_digest" ); while( rs.next() ) { SubscriptionImpl subscription = getSubscription(siteKey,rs); if( subscription==null ) continue; NodeImpl subscribed = subscription.node; SiteImpl site = subscribed.getSiteImpl(); User subscriber = subscription.getSubscriber(); List<Node> nodes = new ArrayList<Node>(); BooleanQuery query = new BooleanQuery(); query.add( day, BooleanClause.Occur.MUST ); if( subscription.getTo() == To.CHILDREN ) { query.add( Lucene.children(subscribed), BooleanClause.Occur.MUST ); } else { query.add( Lucene.descendants(subscribed), BooleanClause.Occur.MUST ); query.add( Lucene.node(subscribed), BooleanClause.Occur.MUST_NOT ); } LuceneSearcher searcher = Lucene.newSearcher(subscribed.getSite()); try { for( ScoreDoc sdoc : searcher.search(query,1000).scoreDocs ) { Node node = Lucene.getNode( site, searcher, sdoc.doc ); if( node != null ) nodes.add(node); } } finally { searcher.close(); } if( !nodes.isEmpty() ) { Collections.sort(nodes, Node.dateComparator); sendDigestEmail(subscribed, subscriber, nodes); } } stmt.close(); con.close(); } } catch(SQLException e) { logger.error("SQLException", e); throw new RuntimeException(e); } catch(IOException e) { logger.error("IOException", e); throw new RuntimeException(e); } logger.info("finished daily digests"); } static void nop() {} private static void sendDigestEmail(Node subscriptionNode, User subscriber, List<Node> nodes) { Template template = subscriptionNode.getSite().getTemplate( "digest email", BasicNamespace.class, NabbleNamespace.class, NodeList.class, SubscriptionNamespace.class ); template.run( TemplatePrintWriter.NULL, Collections.<String,Object>emptyMap(), new BasicNamespace(template), new NabbleNamespace(subscriptionNode.getSite()), new NodeList(nodes,null,false), new SubscriptionNamespace(subscriptionNode, subscriber) ); } // luan shell public static void testDigest() { sendDailyDigestsFor(new Date()); } }