diff src/nabble/model/SubscriptionImpl.java @ 0:7ecd1a4ef557

add content
author Franklin Schmidt <fschmidt@gmail.com>
date Thu, 21 Mar 2019 19:15:52 -0600
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/nabble/model/SubscriptionImpl.java	Thu Mar 21 19:15:52 2019 -0600
@@ -0,0 +1,367 @@
+package nabble.model;
+
+import fschmidt.db.DbDatabase;
+import fschmidt.db.DbUtils;
+import fschmidt.db.Listener;
+import fschmidt.util.java.DateUtils;
+import fschmidt.util.mail.MailHome;
+import nabble.model.lucene.LuceneSearcher;
+import nabble.naml.compiler.Template;
+import nabble.naml.compiler.TemplatePrintWriter;
+import nabble.naml.namespaces.BasicNamespace;
+import nabble.view.web.template.NodeNamespace;
+import nabble.view.web.template.NabbleNamespace;
+import nabble.view.web.template.NodeList;
+import nabble.view.web.template.SubscriptionNamespace;
+import org.apache.lucene.search.BooleanClause;
+import org.apache.lucene.search.BooleanQuery;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.ScoreDoc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+
+final class SubscriptionImpl implements Subscription {
+	private static final Logger logger = LoggerFactory.getLogger(SubscriptionImpl.class);
+
+	private final UserImpl subscriber;
+	private final NodeImpl node;
+	private To to;
+	private Type type;
+
+	private SubscriptionImpl(UserImpl subscriber,NodeImpl node,To to,Type type) {
+		this.subscriber = subscriber;
+		this.node = node;
+		this.to = to;
+		this.type = type;
+	}
+
+	private DbDatabase db() {
+		return node.siteKey.getDb();
+	}
+
+	public User getSubscriber() {
+		return subscriber;
+	}
+
+	public Node getNode() {
+		return node;
+	}
+
+	public To getTo() {
+		return to;
+	}
+
+	public void setTo(To to) {
+		if( this.to == to )
+			return;
+		this.to = to;
+		set( "direct_only", to==To.CHILDREN );
+	}
+
+	public Type getType() {
+		return type;
+	}
+
+	public void setType(Type type) {
+		if( this.type == type )
+			return;
+		this.type = type;
+		set( "daily_digest", type==Type.DAILY_DIGEST );
+	}
+
+	private void set(String field,boolean b) {
+		try {
+			Connection con = db().getConnection();
+			PreparedStatement stmt = con.prepareStatement(
+				"update subscription set "+field+"= ? where user_id = ? and node_id = ?"
+			);
+			stmt.setBoolean( 1, b );
+			stmt.setLong( 2, subscriber.getId() );
+			stmt.setLong( 3, node.getId() );
+			stmt.executeUpdate();
+			stmt.close();
+			con.close();
+		} catch(SQLException e) {
+			throw new RuntimeException(e);
+		}
+	}
+
+	public void delete() {
+		try {
+			Connection con = db().getConnection();
+			PreparedStatement stmt = con.prepareStatement(
+				"delete from subscription where user_id = ? and node_id = ?"
+			);
+			stmt.setLong( 1, subscriber.getId() );
+			stmt.setLong( 2, node.getId() );
+			stmt.executeUpdate();
+			stmt.close();
+			con.close();
+		} catch(SQLException e) {
+			throw new RuntimeException(e);
+		}
+	}
+
+	static SubscriptionImpl insert(UserImpl subscriber,NodeImpl node,To to,Type type) {
+		SubscriptionImpl subscription = new SubscriptionImpl(subscriber,node,to,type);
+		subscription.insert();
+		return subscription;
+	}
+
+	private void insert() {
+		try {
+			Connection con = db().getConnection();
+			PreparedStatement stmt = con.prepareStatement(
+				"insert into subscription (user_id,node_id,direct_only,daily_digest) values (?,?,?,?)"
+			);
+			stmt.setLong( 1, subscriber.getId() );
+			stmt.setLong( 2, node.getId() );
+			stmt.setBoolean( 3, to==To.CHILDREN );
+			stmt.setBoolean( 4, type==Type.DAILY_DIGEST );
+			stmt.executeUpdate();
+			stmt.close();
+			con.close();
+		} catch(SQLException e) {
+			throw new RuntimeException(e);
+		}
+	}
+
+	private boolean autoUnsubscribe() {
+		if( subscriber.isAutoUnsubscribe() ) {
+			logger.warn("auto-unsubscribing "+subscriber);
+			delete();
+			return true;
+		} else {
+			return false;
+		}
+	}
+
+	static SubscriptionImpl getSubscription(SiteKey siteKey,ResultSet rs) throws SQLException {
+		UserImpl subscriber = UserImpl.getUser( siteKey, rs.getLong("user_id") );
+		NodeImpl node = NodeImpl.getNode( siteKey, rs.getLong("node_id") );
+		To to = rs.getBoolean("direct_only") ? To.CHILDREN : To.DESCENDANTS;
+		Type type = rs.getBoolean("daily_digest") ? Type.DAILY_DIGEST : Type.INSTANT;
+		SubscriptionImpl subscription = new SubscriptionImpl(subscriber,node,to,type);
+		return subscription.autoUnsubscribe() ? null : subscription;
+	}
+
+	static SubscriptionImpl getSubscription(UserImpl subscriber,NodeImpl node) {
+		if( !node.getDbRecord().isInDb() )
+			return null;
+		try {
+			Connection con = node.siteKey.getDb().getConnection();
+			PreparedStatement stmt = con.prepareStatement(
+				"select * from subscription where user_id = ? and node_id = ?"
+			);
+			stmt.setLong( 1, subscriber.getId() );
+			stmt.setLong( 2, node.getId() );
+			ResultSet rs = stmt.executeQuery();
+			try {
+				return rs.next() ? SubscriptionImpl.getSubscription(node.siteKey,rs) : null;
+			} finally {
+				stmt.close();
+				con.close();
+			}
+		} catch(SQLException e) {
+			throw new RuntimeException(e);
+		}
+	}
+
+	static boolean isSubscribed(UserImpl subscriber,NodeImpl node) {
+		try {
+			Connection con = node.siteKey.getDb().getConnection();
+			PreparedStatement stmt = con.prepareStatement(
+				"select 1 from subscription where user_id = ? and node_id = ?"
+			);
+			stmt.setLong( 1, subscriber.getId() );
+			stmt.setLong( 2, node.getId() );
+			try {
+				return stmt.executeQuery().next();
+			} finally {
+				stmt.close();
+				con.close();
+			}
+		} catch(SQLException e) {
+			throw new RuntimeException(e);
+		}
+	}
+
+
+	static {
+		if( Init.hasDaemons && MailHome.isSendingMail() ) {
+
+			NodeImpl.addPostInsertListener( new Listener<NodeImpl>() {
+				public void event(final NodeImpl node) {
+					if( ModelHome.insideImportProcedure.get() )
+						return;
+					DbDatabase db = node.siteKey.getDb();
+					Executors.executeAfterCommit( db, new Runnable(){public void run(){
+						sendInstantMailsFor( DbUtils.getGoodCopy(node) );
+					}});
+				}
+			} );
+
+			long secondsPerDay = 24*60*60;
+			long runTime = 60*60;  // 1 am
+			Date now = new Date();
+			long time = (now.getTime() - DateUtils.roundToDay(now).getTime())/1000;
+			long initialDelay = time < runTime
+				? runTime - time
+				: secondsPerDay - (time - runTime)
+			;
+			Executors.scheduleWithFixedDelay(new Runnable(){public void run(){
+				Date yesterday = DateUtils.addDays( new Date(), -1 );
+				sendDailyDigestsFor(yesterday);
+				ModelHome.lastDigestRun = System.currentTimeMillis();
+			}}, initialDelay, secondsPerDay, TimeUnit.SECONDS );
+
+		}
+	}
+
+	private static final String DAILY_DIGEST_TASK = "daily_digest";
+
+	static Map<User,Subscription> getSubscribersToNotify(NodeImpl node) {
+		Map<User,Subscription> map = new HashMap<User,Subscription>();
+		NodeImpl subscribed = node.getParentImpl();
+		if( subscribed==null )
+			return map;
+		Set<User> subscribers = new HashSet<User>();
+		boolean addedTask = false;
+		for( Subscription subscription : subscribed.getSubscriptions(
+			"select * from subscription"
+			+" where node_id = ?"
+		) ) {
+			if( !addedTask && subscription.getType()==Type.DAILY_DIGEST ) {
+				node.getSiteImpl().addTask(DAILY_DIGEST_TASK);
+				addedTask = true;
+			}
+			User subscriber = subscription.getSubscriber();
+			if( subscribers.add(subscriber) && subscription.getType()==Type.INSTANT )
+				map.put( subscriber, subscription );
+		}
+		while( (subscribed = subscribed.getParentImpl()) != null ) {
+			for( Subscription subscription : subscribed.getSubscriptions(
+				"select * from subscription"
+				+" where node_id = ?"
+				+" and not direct_only"
+			) ) {
+				if( !addedTask && subscription.getType()==Type.DAILY_DIGEST ) {
+					node.getSiteImpl().addTask(DAILY_DIGEST_TASK);
+					addedTask = true;
+				}
+				User subscriber = subscription.getSubscriber();
+				if( subscribers.add(subscriber ) && subscription.getType()==Type.INSTANT )
+					map.put( subscriber, subscription );
+			}
+		}
+		return map;
+	}
+
+	private static void sendInstantMailsFor(NodeImpl node) {
+		if( node==null )
+			return;
+		Template template = node.getSite().getTemplate( "notify_subscribers",
+			BasicNamespace.class, NabbleNamespace.class, NodeNamespace.class
+		);
+		template.run( TemplatePrintWriter.NULL, Collections.<String,Object>emptyMap(),
+			new BasicNamespace(template),
+			new NabbleNamespace(node.getSite()),
+			new NodeNamespace(node)
+		);
+	}
+
+	private static void sendDailyDigestsFor(Date date) {
+		logger.info("starting daily digests");
+		try {
+			Query day = Lucene.day(date);
+			for( SiteKey siteKey : SiteKey.getSiteKeys(DAILY_DIGEST_TASK) ) {
+				logger.info("daily digests for site = " + siteKey.getId());
+				Connection con = siteKey.getDb().getConnection();
+				Statement stmt = con.createStatement();
+				ResultSet rs = stmt.executeQuery(
+					"select *"
+					+" from subscription"
+					+" where daily_digest"
+				);
+				while( rs.next() ) {
+					SubscriptionImpl subscription = getSubscription(siteKey,rs);
+					if( subscription==null )
+						continue;
+					NodeImpl subscribed = subscription.node;
+					SiteImpl site = subscribed.getSiteImpl();
+					User subscriber = subscription.getSubscriber();
+					List<Node> nodes = new ArrayList<Node>();
+					BooleanQuery query = new BooleanQuery();
+					query.add( day, BooleanClause.Occur.MUST );
+					if( subscription.getTo() == To.CHILDREN ) {
+						query.add( Lucene.children(subscribed), BooleanClause.Occur.MUST );
+					} else {
+						query.add( Lucene.descendants(subscribed), BooleanClause.Occur.MUST );
+						query.add( Lucene.node(subscribed), BooleanClause.Occur.MUST_NOT );
+					}
+					LuceneSearcher searcher = Lucene.newSearcher(subscribed.getSite());
+					try {
+						for( ScoreDoc sdoc : searcher.search(query,1000).scoreDocs ) {
+							Node node = Lucene.getNode( site, searcher, sdoc.doc );
+							if( node != null )
+								nodes.add(node);
+						}
+					} finally {
+						searcher.close();
+					}
+					if( !nodes.isEmpty() ) {
+						Collections.sort(nodes, Node.dateComparator);
+						sendDigestEmail(subscribed, subscriber, nodes);
+					}
+				}
+				stmt.close();
+				con.close();
+			}
+		} catch(SQLException e) {
+			logger.error("SQLException", e);
+			throw new RuntimeException(e);
+		} catch(IOException e) {
+			logger.error("IOException", e);
+			throw new RuntimeException(e);
+		}
+		logger.info("finished daily digests");
+	}
+
+	static void nop() {}
+
+
+	private static void sendDigestEmail(Node subscriptionNode, User subscriber, List<Node> nodes) {
+		Template template = subscriptionNode.getSite().getTemplate( "digest email",
+			BasicNamespace.class, NabbleNamespace.class, NodeList.class, SubscriptionNamespace.class
+		);
+		template.run( TemplatePrintWriter.NULL, Collections.<String,Object>emptyMap(),
+			new BasicNamespace(template),
+			new NabbleNamespace(subscriptionNode.getSite()),
+			new NodeList(nodes,null,false),
+			new SubscriptionNamespace(subscriptionNode, subscriber)
+		);
+	}
+
+	// luan shell
+	public static void testDigest() {
+		sendDailyDigestsFor(new Date());
+	}
+
+}