view src/nabble/model/ViewCount.java @ 0:7ecd1a4ef557

add content
author Franklin Schmidt <fschmidt@gmail.com>
date Thu, 21 Mar 2019 19:15:52 -0600
parents
children
line wrap: on
line source

package nabble.model;

import fschmidt.db.DbDatabase;
import fschmidt.db.postgres.PostgresExceptionHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;


public final class ViewCount {
	private static final Logger logger = LoggerFactory.getLogger(ViewCount.class);


	// the ViewCount class is views per site
	private Map<Long,int[]> nodeMap = new HashMap<Long,int[]>();
	private int userViews = 0;

	private ViewCount() {}

	private void inc(long nodeId,boolean isUser) {
		int[] i = nodeMap.get(nodeId);
		if( i == null ) {
			i = new int[1];
			nodeMap.put(nodeId,i);
		}
		i[0]++;
		if( isUser )
			userViews++;
	}


	public static long lastSaved = System.currentTimeMillis();
	private static final Object lock = new Object();
	private static Map<Long,ViewCount> counts = new HashMap<Long,ViewCount>();

	public static void inc(Site site,long nodeId,boolean isUser) {
		Long siteId = site.getId();
		synchronized(lock) {
			ViewCount viewCount = counts.get(siteId);
			if( viewCount == null ) {
				viewCount = new ViewCount();
				counts.put(siteId,viewCount);
			}
			viewCount.inc(nodeId,isUser);
		}
	}

	public static void save() {
		Map<Long,ViewCount> map;
		synchronized(lock) {
			map = counts;
			counts = new HashMap<Long,ViewCount>();
		}
		Db.dbGlobal().beginTransaction();
		try {
			for( Map.Entry<Long,ViewCount> entry : map.entrySet() ) {
				long siteId = entry.getKey();
				ViewCount viewCount = entry.getValue();
				Map<Long,int[]> nodeMap =viewCount.nodeMap;
				SiteKey siteKey = SiteKey.getInstance(siteId);
				DbDatabase db;
				try {
					db = siteKey.getDb();
				} catch(UpdatingException e) {
					continue;
				}
				int siteViews = 0;
				{
					Connection con = db.getConnection();
					try {
						PreparedStatement pstmtUpdate = con.prepareStatement(
							"update view_count set views = views + ? where node_id = ?"
						);
						PreparedStatement pstmtInsert = null;
						for( Map.Entry<Long,int[]> nodeEntry : nodeMap.entrySet() ) {
							long nodeId = nodeEntry.getKey();
							int views = nodeEntry.getValue()[0];
							siteViews += views;
							{
								pstmtUpdate.setInt(1,views);
								pstmtUpdate.setLong(2,nodeId);
								PostgresExceptionHandler peh = new PostgresExceptionHandler(pstmtUpdate);
								try {
									int n = pstmtUpdate.executeUpdate();
									if( n==0 ) {
										if( pstmtInsert == null ) {
											pstmtInsert = con.prepareStatement(
												"insert into view_count (node_id,views) values (?,?)"
											);
										}
										pstmtInsert.setLong(1,nodeId);
										pstmtInsert.setInt(2,views);
										pstmtInsert.executeUpdate();
									}
								} catch(SQLException e) {
									if( e.getMessage().contains("violates foreign key constraint") ) {
										logger.info("in site "+siteId+" node "+nodeId+" was viewed and then deleted");
										peh.handleException();
									} else if( e.getMessage().contains("relation \"view_count\" does not exist") ) {
										logger.info("site "+siteId+" was deleted");
										peh.handleException();
									} else {
										throw e;
									}
								} finally {
									peh.close();
								}
							}
						}
						if( pstmtInsert != null )
							pstmtInsert.close();
						pstmtUpdate.close();
					} finally {
						con.close();
					}
				}
				{
					Connection con = Db.dbGlobal().getConnection();
					PreparedStatement pstmt = con.prepareStatement(
						"update site_global"
						+" set views = views + ?"
						+" , user_views = user_views + ?"
						+" where site_id = ?"
					);
					pstmt.setInt(1,siteViews);
					pstmt.setInt(2,viewCount.userViews);
					pstmt.setLong(3,siteId);
					pstmt.executeUpdate();
					con.close();
				}
			}
			Db.dbGlobal().commitTransaction();
			lastSaved = System.currentTimeMillis();
		} catch(SQLException e) {
			logger.error("Views not saved", e);
		} finally {
			Db.dbGlobal().endTransaction();
		}
	}

	private static final long secondsBetweenViewCountSaves = Init.get("secondsBetweenViewCountSaves",60);

	static {
		Executors.scheduleWithFixedDelay(new Runnable(){public void run(){
			save();
		}},secondsBetweenViewCountSaves,secondsBetweenViewCountSaves,TimeUnit.SECONDS);
		logger.info("Started ViewCounter");

		Executors.runDaily(
			new Runnable(){public void run(){
				try {
					calculateActivity();
				} catch(SQLException e) {
					logger.error("",e);
				} catch(IOException e) {
					logger.error("",e);
				}
			}}
		);
	}

	private static final float DECAY = Init.get("activityDecay",0.95f);
	static final int initialActivity = Init.get("initialActivity",1000);
	static final int updateChunks = Init.get("updateChunks",100);

	public static final Map<Long,Integer> activityBoosts = Init.get("activityBoosts",new HashMap<Long,Integer>());

	private  static final List<Runnable> calculateActivityListeners = new ArrayList<Runnable>();

	public static void addCalculateActivityListener(Runnable listener) {
		synchronized(calculateActivityListeners) {
			calculateActivityListeners.add(listener);
		}
	}

	private static void fireCalculateActivityListeners() {
		synchronized(calculateActivityListeners) {
			for( Runnable listener : calculateActivityListeners ) {
				listener.run();
			}
		}
	}

	public static void calculateActivity() throws SQLException, IOException {
		logger.error("calculateActivity start");
		Connection con = Db.dbGlobal().getConnection();
		try {
			{
				Statement stmt = con.createStatement();

				stmt.executeUpdate(
					"update site_global"
					+"	set activity = " + initialActivity
					+"	where activity is null"
				);

				PreparedStatement pstmt = con.prepareStatement(
					"update site_global"
					+"	set activity = floor(activity * ?)"
					+"	where activity > 0"
					+"		and site_id % ? = ?"
				);
				for( int chunk = 0; chunk < updateChunks && !Executors.isShuttingDown(); chunk++ ) {
					long start = System.currentTimeMillis();
					pstmt.setFloat(1,DECAY);
					pstmt.setInt(2,updateChunks);
					pstmt.setInt(3,chunk);
					pstmt.executeUpdate();
					logger.info("calculateActivity - chunk #"+chunk+" took " + (System.currentTimeMillis()-start)/1000 + " seconds");
				}
				pstmt.close();

				fireCalculateActivityListeners();

				stmt.executeUpdate(
					"update site_global"
					+"	set activity = activity + views"
					+"		, views = 0"
					+"		, user_views = 0"
					+"	where views > 0"
				);

				stmt.close();
			}
			{
				PreparedStatement stmt = con.prepareStatement(
					"update site set activity = activity + ? where site_id = ?"
				);
				for( Map.Entry<Long,Integer> boost : activityBoosts.entrySet() ) {
					stmt.setInt(1,boost.getValue());
					stmt.setLong(2,boost.getKey());
					stmt.executeQuery();
				}
				stmt.close();
			}
		} finally {
			con.close();
		}
		logger.error("calculateActivity end");
	}

	public static Map<Long,Integer> getCounts(Site site,Collection<Long> nodeIds) {
		if( nodeIds.isEmpty() )
			return Collections.emptyMap();
		Map<Long,Integer> map = new HashMap<Long,Integer>();
		for( Long nodeId : nodeIds ) {
			map.put(nodeId,0);
		}
		try {
			Connection con = site.getDb().getConnection();
			try {
				StringBuilder buf = new StringBuilder();
				buf.append( "select * from view_count where node_id in (" );
				Iterator<Long> iter = nodeIds.iterator();
				buf.append( iter.next() );
				while( iter.hasNext() ) {
					buf.append( ',' ).append( iter.next() );
				}
				buf.append(')');
				Statement stmt = con.createStatement();
				ResultSet rs = stmt.executeQuery( buf.toString() );
				while( rs.next() ) {
					map.put( rs.getLong("node_id"), rs.getInt("views") );
				}
				stmt.close();
			} finally {
				con.close();
			}
		} catch(SQLException e) {
			throw new RuntimeException(e);
		}
		synchronized(lock) {
			ViewCount viewCount = counts.get(site.getId());
			if( viewCount != null ) {
				for( Map.Entry<Long,Integer> entry : map.entrySet() ) {
					Long nodeId = entry.getKey();
					int[] views = viewCount.nodeMap.get(nodeId);
					if( views != null )
						map.put( nodeId, entry.getValue() + views[0] );
				}
			}
		}
		return map;
	}
}