view src/nabble/model/SubscriptionImpl.java @ 0:7ecd1a4ef557

add content
author Franklin Schmidt <fschmidt@gmail.com>
date Thu, 21 Mar 2019 19:15:52 -0600
parents
children
line wrap: on
line source

package nabble.model;

import fschmidt.db.DbDatabase;
import fschmidt.db.DbUtils;
import fschmidt.db.Listener;
import fschmidt.util.java.DateUtils;
import fschmidt.util.mail.MailHome;
import nabble.model.lucene.LuceneSearcher;
import nabble.naml.compiler.Template;
import nabble.naml.compiler.TemplatePrintWriter;
import nabble.naml.namespaces.BasicNamespace;
import nabble.view.web.template.NodeNamespace;
import nabble.view.web.template.NabbleNamespace;
import nabble.view.web.template.NodeList;
import nabble.view.web.template.SubscriptionNamespace;
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;


final class SubscriptionImpl implements Subscription {
	private static final Logger logger = LoggerFactory.getLogger(SubscriptionImpl.class);

	private final UserImpl subscriber;
	private final NodeImpl node;
	private To to;
	private Type type;

	private SubscriptionImpl(UserImpl subscriber,NodeImpl node,To to,Type type) {
		this.subscriber = subscriber;
		this.node = node;
		this.to = to;
		this.type = type;
	}

	private DbDatabase db() {
		return node.siteKey.getDb();
	}

	public User getSubscriber() {
		return subscriber;
	}

	public Node getNode() {
		return node;
	}

	public To getTo() {
		return to;
	}

	public void setTo(To to) {
		if( this.to == to )
			return;
		this.to = to;
		set( "direct_only", to==To.CHILDREN );
	}

	public Type getType() {
		return type;
	}

	public void setType(Type type) {
		if( this.type == type )
			return;
		this.type = type;
		set( "daily_digest", type==Type.DAILY_DIGEST );
	}

	private void set(String field,boolean b) {
		try {
			Connection con = db().getConnection();
			PreparedStatement stmt = con.prepareStatement(
				"update subscription set "+field+"= ? where user_id = ? and node_id = ?"
			);
			stmt.setBoolean( 1, b );
			stmt.setLong( 2, subscriber.getId() );
			stmt.setLong( 3, node.getId() );
			stmt.executeUpdate();
			stmt.close();
			con.close();
		} catch(SQLException e) {
			throw new RuntimeException(e);
		}
	}

	public void delete() {
		try {
			Connection con = db().getConnection();
			PreparedStatement stmt = con.prepareStatement(
				"delete from subscription where user_id = ? and node_id = ?"
			);
			stmt.setLong( 1, subscriber.getId() );
			stmt.setLong( 2, node.getId() );
			stmt.executeUpdate();
			stmt.close();
			con.close();
		} catch(SQLException e) {
			throw new RuntimeException(e);
		}
	}

	static SubscriptionImpl insert(UserImpl subscriber,NodeImpl node,To to,Type type) {
		SubscriptionImpl subscription = new SubscriptionImpl(subscriber,node,to,type);
		subscription.insert();
		return subscription;
	}

	private void insert() {
		try {
			Connection con = db().getConnection();
			PreparedStatement stmt = con.prepareStatement(
				"insert into subscription (user_id,node_id,direct_only,daily_digest) values (?,?,?,?)"
			);
			stmt.setLong( 1, subscriber.getId() );
			stmt.setLong( 2, node.getId() );
			stmt.setBoolean( 3, to==To.CHILDREN );
			stmt.setBoolean( 4, type==Type.DAILY_DIGEST );
			stmt.executeUpdate();
			stmt.close();
			con.close();
		} catch(SQLException e) {
			throw new RuntimeException(e);
		}
	}

	private boolean autoUnsubscribe() {
		if( subscriber.isAutoUnsubscribe() ) {
			logger.warn("auto-unsubscribing "+subscriber);
			delete();
			return true;
		} else {
			return false;
		}
	}

	static SubscriptionImpl getSubscription(SiteKey siteKey,ResultSet rs) throws SQLException {
		UserImpl subscriber = UserImpl.getUser( siteKey, rs.getLong("user_id") );
		NodeImpl node = NodeImpl.getNode( siteKey, rs.getLong("node_id") );
		To to = rs.getBoolean("direct_only") ? To.CHILDREN : To.DESCENDANTS;
		Type type = rs.getBoolean("daily_digest") ? Type.DAILY_DIGEST : Type.INSTANT;
		SubscriptionImpl subscription = new SubscriptionImpl(subscriber,node,to,type);
		return subscription.autoUnsubscribe() ? null : subscription;
	}

	static SubscriptionImpl getSubscription(UserImpl subscriber,NodeImpl node) {
		if( !node.getDbRecord().isInDb() )
			return null;
		try {
			Connection con = node.siteKey.getDb().getConnection();
			PreparedStatement stmt = con.prepareStatement(
				"select * from subscription where user_id = ? and node_id = ?"
			);
			stmt.setLong( 1, subscriber.getId() );
			stmt.setLong( 2, node.getId() );
			ResultSet rs = stmt.executeQuery();
			try {
				return rs.next() ? SubscriptionImpl.getSubscription(node.siteKey,rs) : null;
			} finally {
				stmt.close();
				con.close();
			}
		} catch(SQLException e) {
			throw new RuntimeException(e);
		}
	}

	static boolean isSubscribed(UserImpl subscriber,NodeImpl node) {
		try {
			Connection con = node.siteKey.getDb().getConnection();
			PreparedStatement stmt = con.prepareStatement(
				"select 1 from subscription where user_id = ? and node_id = ?"
			);
			stmt.setLong( 1, subscriber.getId() );
			stmt.setLong( 2, node.getId() );
			try {
				return stmt.executeQuery().next();
			} finally {
				stmt.close();
				con.close();
			}
		} catch(SQLException e) {
			throw new RuntimeException(e);
		}
	}


	static {
		if( Init.hasDaemons && MailHome.isSendingMail() ) {

			NodeImpl.addPostInsertListener( new Listener<NodeImpl>() {
				public void event(final NodeImpl node) {
					if( ModelHome.insideImportProcedure.get() )
						return;
					DbDatabase db = node.siteKey.getDb();
					Executors.executeAfterCommit( db, new Runnable(){public void run(){
						sendInstantMailsFor( DbUtils.getGoodCopy(node) );
					}});
				}
			} );

			long secondsPerDay = 24*60*60;
			long runTime = 60*60;  // 1 am
			Date now = new Date();
			long time = (now.getTime() - DateUtils.roundToDay(now).getTime())/1000;
			long initialDelay = time < runTime
				? runTime - time
				: secondsPerDay - (time - runTime)
			;
			Executors.scheduleWithFixedDelay(new Runnable(){public void run(){
				Date yesterday = DateUtils.addDays( new Date(), -1 );
				sendDailyDigestsFor(yesterday);
				ModelHome.lastDigestRun = System.currentTimeMillis();
			}}, initialDelay, secondsPerDay, TimeUnit.SECONDS );

		}
	}

	private static final String DAILY_DIGEST_TASK = "daily_digest";

	static Map<User,Subscription> getSubscribersToNotify(NodeImpl node) {
		Map<User,Subscription> map = new HashMap<User,Subscription>();
		NodeImpl subscribed = node.getParentImpl();
		if( subscribed==null )
			return map;
		Set<User> subscribers = new HashSet<User>();
		boolean addedTask = false;
		for( Subscription subscription : subscribed.getSubscriptions(
			"select * from subscription"
			+" where node_id = ?"
		) ) {
			if( !addedTask && subscription.getType()==Type.DAILY_DIGEST ) {
				node.getSiteImpl().addTask(DAILY_DIGEST_TASK);
				addedTask = true;
			}
			User subscriber = subscription.getSubscriber();
			if( subscribers.add(subscriber) && subscription.getType()==Type.INSTANT )
				map.put( subscriber, subscription );
		}
		while( (subscribed = subscribed.getParentImpl()) != null ) {
			for( Subscription subscription : subscribed.getSubscriptions(
				"select * from subscription"
				+" where node_id = ?"
				+" and not direct_only"
			) ) {
				if( !addedTask && subscription.getType()==Type.DAILY_DIGEST ) {
					node.getSiteImpl().addTask(DAILY_DIGEST_TASK);
					addedTask = true;
				}
				User subscriber = subscription.getSubscriber();
				if( subscribers.add(subscriber ) && subscription.getType()==Type.INSTANT )
					map.put( subscriber, subscription );
			}
		}
		return map;
	}

	private static void sendInstantMailsFor(NodeImpl node) {
		if( node==null )
			return;
		Template template = node.getSite().getTemplate( "notify_subscribers",
			BasicNamespace.class, NabbleNamespace.class, NodeNamespace.class
		);
		template.run( TemplatePrintWriter.NULL, Collections.<String,Object>emptyMap(),
			new BasicNamespace(template),
			new NabbleNamespace(node.getSite()),
			new NodeNamespace(node)
		);
	}

	private static void sendDailyDigestsFor(Date date) {
		logger.info("starting daily digests");
		try {
			Query day = Lucene.day(date);
			for( SiteKey siteKey : SiteKey.getSiteKeys(DAILY_DIGEST_TASK) ) {
				logger.info("daily digests for site = " + siteKey.getId());
				Connection con = siteKey.getDb().getConnection();
				Statement stmt = con.createStatement();
				ResultSet rs = stmt.executeQuery(
					"select *"
					+" from subscription"
					+" where daily_digest"
				);
				while( rs.next() ) {
					SubscriptionImpl subscription = getSubscription(siteKey,rs);
					if( subscription==null )
						continue;
					NodeImpl subscribed = subscription.node;
					SiteImpl site = subscribed.getSiteImpl();
					User subscriber = subscription.getSubscriber();
					List<Node> nodes = new ArrayList<Node>();
					BooleanQuery query = new BooleanQuery();
					query.add( day, BooleanClause.Occur.MUST );
					if( subscription.getTo() == To.CHILDREN ) {
						query.add( Lucene.children(subscribed), BooleanClause.Occur.MUST );
					} else {
						query.add( Lucene.descendants(subscribed), BooleanClause.Occur.MUST );
						query.add( Lucene.node(subscribed), BooleanClause.Occur.MUST_NOT );
					}
					LuceneSearcher searcher = Lucene.newSearcher(subscribed.getSite());
					try {
						for( ScoreDoc sdoc : searcher.search(query,1000).scoreDocs ) {
							Node node = Lucene.getNode( site, searcher, sdoc.doc );
							if( node != null )
								nodes.add(node);
						}
					} finally {
						searcher.close();
					}
					if( !nodes.isEmpty() ) {
						Collections.sort(nodes, Node.dateComparator);
						sendDigestEmail(subscribed, subscriber, nodes);
					}
				}
				stmt.close();
				con.close();
			}
		} catch(SQLException e) {
			logger.error("SQLException", e);
			throw new RuntimeException(e);
		} catch(IOException e) {
			logger.error("IOException", e);
			throw new RuntimeException(e);
		}
		logger.info("finished daily digests");
	}

	static void nop() {}


	private static void sendDigestEmail(Node subscriptionNode, User subscriber, List<Node> nodes) {
		Template template = subscriptionNode.getSite().getTemplate( "digest email",
			BasicNamespace.class, NabbleNamespace.class, NodeList.class, SubscriptionNamespace.class
		);
		template.run( TemplatePrintWriter.NULL, Collections.<String,Object>emptyMap(),
			new BasicNamespace(template),
			new NabbleNamespace(subscriptionNode.getSite()),
			new NodeList(nodes,null,false),
			new SubscriptionNamespace(subscriptionNode, subscriber)
		);
	}

	// luan shell
	public static void testDigest() {
		sendDailyDigestsFor(new Date());
	}

}