Mercurial Hosting > nabble
diff src/nabble/model/SubscriptionImpl.java @ 0:7ecd1a4ef557
add content
author | Franklin Schmidt <fschmidt@gmail.com> |
---|---|
date | Thu, 21 Mar 2019 19:15:52 -0600 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/nabble/model/SubscriptionImpl.java Thu Mar 21 19:15:52 2019 -0600 @@ -0,0 +1,367 @@ +package nabble.model; + +import fschmidt.db.DbDatabase; +import fschmidt.db.DbUtils; +import fschmidt.db.Listener; +import fschmidt.util.java.DateUtils; +import fschmidt.util.mail.MailHome; +import nabble.model.lucene.LuceneSearcher; +import nabble.naml.compiler.Template; +import nabble.naml.compiler.TemplatePrintWriter; +import nabble.naml.namespaces.BasicNamespace; +import nabble.view.web.template.NodeNamespace; +import nabble.view.web.template.NabbleNamespace; +import nabble.view.web.template.NodeList; +import nabble.view.web.template.SubscriptionNamespace; +import org.apache.lucene.search.BooleanClause; +import org.apache.lucene.search.BooleanQuery; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.ScoreDoc; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + + +final class SubscriptionImpl implements Subscription { + private static final Logger logger = LoggerFactory.getLogger(SubscriptionImpl.class); + + private final UserImpl subscriber; + private final NodeImpl node; + private To to; + private Type type; + + private SubscriptionImpl(UserImpl subscriber,NodeImpl node,To to,Type type) { + this.subscriber = subscriber; + this.node = node; + this.to = to; + this.type = type; + } + + private DbDatabase db() { + return node.siteKey.getDb(); + } + + public User getSubscriber() { + return subscriber; + } + + public Node getNode() { + return node; + } + + public To getTo() { + return to; + } + + public void setTo(To to) { + if( this.to == to ) + return; + this.to = to; + set( "direct_only", to==To.CHILDREN ); + } + + public Type getType() { + return type; + } + + public void setType(Type type) { + if( this.type == type ) + return; + this.type = type; + set( "daily_digest", type==Type.DAILY_DIGEST ); + } + + private void set(String field,boolean b) { + try { + Connection con = db().getConnection(); + PreparedStatement stmt = con.prepareStatement( + "update subscription set "+field+"= ? where user_id = ? and node_id = ?" + ); + stmt.setBoolean( 1, b ); + stmt.setLong( 2, subscriber.getId() ); + stmt.setLong( 3, node.getId() ); + stmt.executeUpdate(); + stmt.close(); + con.close(); + } catch(SQLException e) { + throw new RuntimeException(e); + } + } + + public void delete() { + try { + Connection con = db().getConnection(); + PreparedStatement stmt = con.prepareStatement( + "delete from subscription where user_id = ? and node_id = ?" + ); + stmt.setLong( 1, subscriber.getId() ); + stmt.setLong( 2, node.getId() ); + stmt.executeUpdate(); + stmt.close(); + con.close(); + } catch(SQLException e) { + throw new RuntimeException(e); + } + } + + static SubscriptionImpl insert(UserImpl subscriber,NodeImpl node,To to,Type type) { + SubscriptionImpl subscription = new SubscriptionImpl(subscriber,node,to,type); + subscription.insert(); + return subscription; + } + + private void insert() { + try { + Connection con = db().getConnection(); + PreparedStatement stmt = con.prepareStatement( + "insert into subscription (user_id,node_id,direct_only,daily_digest) values (?,?,?,?)" + ); + stmt.setLong( 1, subscriber.getId() ); + stmt.setLong( 2, node.getId() ); + stmt.setBoolean( 3, to==To.CHILDREN ); + stmt.setBoolean( 4, type==Type.DAILY_DIGEST ); + stmt.executeUpdate(); + stmt.close(); + con.close(); + } catch(SQLException e) { + throw new RuntimeException(e); + } + } + + private boolean autoUnsubscribe() { + if( subscriber.isAutoUnsubscribe() ) { + logger.warn("auto-unsubscribing "+subscriber); + delete(); + return true; + } else { + return false; + } + } + + static SubscriptionImpl getSubscription(SiteKey siteKey,ResultSet rs) throws SQLException { + UserImpl subscriber = UserImpl.getUser( siteKey, rs.getLong("user_id") ); + NodeImpl node = NodeImpl.getNode( siteKey, rs.getLong("node_id") ); + To to = rs.getBoolean("direct_only") ? To.CHILDREN : To.DESCENDANTS; + Type type = rs.getBoolean("daily_digest") ? Type.DAILY_DIGEST : Type.INSTANT; + SubscriptionImpl subscription = new SubscriptionImpl(subscriber,node,to,type); + return subscription.autoUnsubscribe() ? null : subscription; + } + + static SubscriptionImpl getSubscription(UserImpl subscriber,NodeImpl node) { + if( !node.getDbRecord().isInDb() ) + return null; + try { + Connection con = node.siteKey.getDb().getConnection(); + PreparedStatement stmt = con.prepareStatement( + "select * from subscription where user_id = ? and node_id = ?" + ); + stmt.setLong( 1, subscriber.getId() ); + stmt.setLong( 2, node.getId() ); + ResultSet rs = stmt.executeQuery(); + try { + return rs.next() ? SubscriptionImpl.getSubscription(node.siteKey,rs) : null; + } finally { + stmt.close(); + con.close(); + } + } catch(SQLException e) { + throw new RuntimeException(e); + } + } + + static boolean isSubscribed(UserImpl subscriber,NodeImpl node) { + try { + Connection con = node.siteKey.getDb().getConnection(); + PreparedStatement stmt = con.prepareStatement( + "select 1 from subscription where user_id = ? and node_id = ?" + ); + stmt.setLong( 1, subscriber.getId() ); + stmt.setLong( 2, node.getId() ); + try { + return stmt.executeQuery().next(); + } finally { + stmt.close(); + con.close(); + } + } catch(SQLException e) { + throw new RuntimeException(e); + } + } + + + static { + if( Init.hasDaemons && MailHome.isSendingMail() ) { + + NodeImpl.addPostInsertListener( new Listener<NodeImpl>() { + public void event(final NodeImpl node) { + if( ModelHome.insideImportProcedure.get() ) + return; + DbDatabase db = node.siteKey.getDb(); + Executors.executeAfterCommit( db, new Runnable(){public void run(){ + sendInstantMailsFor( DbUtils.getGoodCopy(node) ); + }}); + } + } ); + + long secondsPerDay = 24*60*60; + long runTime = 60*60; // 1 am + Date now = new Date(); + long time = (now.getTime() - DateUtils.roundToDay(now).getTime())/1000; + long initialDelay = time < runTime + ? runTime - time + : secondsPerDay - (time - runTime) + ; + Executors.scheduleWithFixedDelay(new Runnable(){public void run(){ + Date yesterday = DateUtils.addDays( new Date(), -1 ); + sendDailyDigestsFor(yesterday); + ModelHome.lastDigestRun = System.currentTimeMillis(); + }}, initialDelay, secondsPerDay, TimeUnit.SECONDS ); + + } + } + + private static final String DAILY_DIGEST_TASK = "daily_digest"; + + static Map<User,Subscription> getSubscribersToNotify(NodeImpl node) { + Map<User,Subscription> map = new HashMap<User,Subscription>(); + NodeImpl subscribed = node.getParentImpl(); + if( subscribed==null ) + return map; + Set<User> subscribers = new HashSet<User>(); + boolean addedTask = false; + for( Subscription subscription : subscribed.getSubscriptions( + "select * from subscription" + +" where node_id = ?" + ) ) { + if( !addedTask && subscription.getType()==Type.DAILY_DIGEST ) { + node.getSiteImpl().addTask(DAILY_DIGEST_TASK); + addedTask = true; + } + User subscriber = subscription.getSubscriber(); + if( subscribers.add(subscriber) && subscription.getType()==Type.INSTANT ) + map.put( subscriber, subscription ); + } + while( (subscribed = subscribed.getParentImpl()) != null ) { + for( Subscription subscription : subscribed.getSubscriptions( + "select * from subscription" + +" where node_id = ?" + +" and not direct_only" + ) ) { + if( !addedTask && subscription.getType()==Type.DAILY_DIGEST ) { + node.getSiteImpl().addTask(DAILY_DIGEST_TASK); + addedTask = true; + } + User subscriber = subscription.getSubscriber(); + if( subscribers.add(subscriber ) && subscription.getType()==Type.INSTANT ) + map.put( subscriber, subscription ); + } + } + return map; + } + + private static void sendInstantMailsFor(NodeImpl node) { + if( node==null ) + return; + Template template = node.getSite().getTemplate( "notify_subscribers", + BasicNamespace.class, NabbleNamespace.class, NodeNamespace.class + ); + template.run( TemplatePrintWriter.NULL, Collections.<String,Object>emptyMap(), + new BasicNamespace(template), + new NabbleNamespace(node.getSite()), + new NodeNamespace(node) + ); + } + + private static void sendDailyDigestsFor(Date date) { + logger.info("starting daily digests"); + try { + Query day = Lucene.day(date); + for( SiteKey siteKey : SiteKey.getSiteKeys(DAILY_DIGEST_TASK) ) { + logger.info("daily digests for site = " + siteKey.getId()); + Connection con = siteKey.getDb().getConnection(); + Statement stmt = con.createStatement(); + ResultSet rs = stmt.executeQuery( + "select *" + +" from subscription" + +" where daily_digest" + ); + while( rs.next() ) { + SubscriptionImpl subscription = getSubscription(siteKey,rs); + if( subscription==null ) + continue; + NodeImpl subscribed = subscription.node; + SiteImpl site = subscribed.getSiteImpl(); + User subscriber = subscription.getSubscriber(); + List<Node> nodes = new ArrayList<Node>(); + BooleanQuery query = new BooleanQuery(); + query.add( day, BooleanClause.Occur.MUST ); + if( subscription.getTo() == To.CHILDREN ) { + query.add( Lucene.children(subscribed), BooleanClause.Occur.MUST ); + } else { + query.add( Lucene.descendants(subscribed), BooleanClause.Occur.MUST ); + query.add( Lucene.node(subscribed), BooleanClause.Occur.MUST_NOT ); + } + LuceneSearcher searcher = Lucene.newSearcher(subscribed.getSite()); + try { + for( ScoreDoc sdoc : searcher.search(query,1000).scoreDocs ) { + Node node = Lucene.getNode( site, searcher, sdoc.doc ); + if( node != null ) + nodes.add(node); + } + } finally { + searcher.close(); + } + if( !nodes.isEmpty() ) { + Collections.sort(nodes, Node.dateComparator); + sendDigestEmail(subscribed, subscriber, nodes); + } + } + stmt.close(); + con.close(); + } + } catch(SQLException e) { + logger.error("SQLException", e); + throw new RuntimeException(e); + } catch(IOException e) { + logger.error("IOException", e); + throw new RuntimeException(e); + } + logger.info("finished daily digests"); + } + + static void nop() {} + + + private static void sendDigestEmail(Node subscriptionNode, User subscriber, List<Node> nodes) { + Template template = subscriptionNode.getSite().getTemplate( "digest email", + BasicNamespace.class, NabbleNamespace.class, NodeList.class, SubscriptionNamespace.class + ); + template.run( TemplatePrintWriter.NULL, Collections.<String,Object>emptyMap(), + new BasicNamespace(template), + new NabbleNamespace(subscriptionNode.getSite()), + new NodeList(nodes,null,false), + new SubscriptionNamespace(subscriptionNode, subscriber) + ); + } + + // luan shell + public static void testDigest() { + sendDailyDigestsFor(new Date()); + } + +}