Mercurial Hosting > luan
view src/org/eclipse/jetty/server/session/JDBCSessionIdManager.java @ 802:3428c60d7cfc
replace jetty jars with source
author | Franklin Schmidt <fschmidt@gmail.com> |
---|---|
date | Wed, 07 Sep 2016 21:15:48 -0600 |
parents | |
children |
line wrap: on
line source
// // ======================================================================== // Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd. // ------------------------------------------------------------------------ // All rights reserved. This program and the accompanying materials // are made available under the terms of the Eclipse Public License v1.0 // and Apache License v2.0 which accompanies this distribution. // // The Eclipse Public License is available at // http://www.eclipse.org/legal/epl-v10.html // // The Apache License v2.0 is available at // http://www.opensource.org/licenses/apache2.0.php // // You may elect to redistribute this code under either of these licenses. // ======================================================================== // package org.eclipse.jetty.server.session; import java.io.ByteArrayInputStream; import java.io.InputStream; import java.sql.Blob; import java.sql.Connection; import java.sql.DatabaseMetaData; import java.sql.Driver; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Locale; import java.util.Random; import java.util.Timer; import java.util.TimerTask; import javax.naming.InitialContext; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpSession; import javax.sql.DataSource; import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.SessionManager; import org.eclipse.jetty.server.handler.ContextHandler; import org.eclipse.jetty.util.log.Logger; /** * JDBCSessionIdManager * * SessionIdManager implementation that uses a database to store in-use session ids, * to support distributed sessions. * */ public class JDBCSessionIdManager extends AbstractSessionIdManager { final static Logger LOG = SessionHandler.LOG; protected final HashSet<String> _sessionIds = new HashSet<String>(); protected Server _server; protected Driver _driver; protected String _driverClassName; protected String _connectionUrl; protected DataSource _datasource; protected String _jndiName; protected String _sessionIdTable = "JettySessionIds"; protected String _sessionTable = "JettySessions"; protected String _sessionTableRowId = "rowId"; protected Timer _timer; //scavenge timer protected TimerTask _task; //scavenge task protected long _lastScavengeTime; protected long _scavengeIntervalMs = 1000L * 60 * 10; //10mins protected String _blobType; //if not set, is deduced from the type of the database at runtime protected String _longType; //if not set, is deduced from the type of the database at runtime protected String _createSessionIdTable; protected String _createSessionTable; protected String _selectBoundedExpiredSessions; protected String _deleteOldExpiredSessions; protected String _insertId; protected String _deleteId; protected String _queryId; protected String _insertSession; protected String _deleteSession; protected String _updateSession; protected String _updateSessionNode; protected String _updateSessionAccessTime; protected DatabaseAdaptor _dbAdaptor; private String _selectExpiredSessions; /** * DatabaseAdaptor * * Handles differences between databases. * * Postgres uses the getBytes and setBinaryStream methods to access * a "bytea" datatype, which can be up to 1Gb of binary data. MySQL * is happy to use the "blob" type and getBlob() methods instead. * * TODO if the differences become more major it would be worthwhile * refactoring this class. */ public class DatabaseAdaptor { String _dbName; boolean _isLower; boolean _isUpper; public DatabaseAdaptor (DatabaseMetaData dbMeta) throws SQLException { _dbName = dbMeta.getDatabaseProductName().toLowerCase(Locale.ENGLISH); LOG.debug ("Using database {}",_dbName); _isLower = dbMeta.storesLowerCaseIdentifiers(); _isUpper = dbMeta.storesUpperCaseIdentifiers(); } /** * Convert a camel case identifier into either upper or lower * depending on the way the db stores identifiers. * * @param identifier * @return the converted identifier */ public String convertIdentifier (String identifier) { if (_isLower) return identifier.toLowerCase(Locale.ENGLISH); if (_isUpper) return identifier.toUpperCase(Locale.ENGLISH); return identifier; } public String getDBName () { return _dbName; } public String getBlobType () { if (_blobType != null) return _blobType; if (_dbName.startsWith("postgres")) return "bytea"; return "blob"; } public String getLongType () { if (_longType != null) return _longType; if (_dbName.startsWith("oracle")) return "number(20)"; return "bigint"; } public InputStream getBlobInputStream (ResultSet result, String columnName) throws SQLException { if (_dbName.startsWith("postgres")) { byte[] bytes = result.getBytes(columnName); return new ByteArrayInputStream(bytes); } Blob blob = result.getBlob(columnName); return blob.getBinaryStream(); } /** * rowId is a reserved word for Oracle, so change the name of this column * @return */ public String getRowIdColumnName () { if (_dbName != null && _dbName.startsWith("oracle")) return "srowId"; return "rowId"; } public boolean isEmptyStringNull () { return (_dbName.startsWith("oracle")); } public PreparedStatement getLoadStatement (Connection connection, String rowId, String contextPath, String virtualHosts) throws SQLException { if (contextPath == null || "".equals(contextPath)) { if (isEmptyStringNull()) { PreparedStatement statement = connection.prepareStatement("select * from "+_sessionTable+ " where sessionId = ? and contextPath is null and virtualHost = ?"); statement.setString(1, rowId); statement.setString(2, virtualHosts); return statement; } } PreparedStatement statement = connection.prepareStatement("select * from "+_sessionTable+ " where sessionId = ? and contextPath = ? and virtualHost = ?"); statement.setString(1, rowId); statement.setString(2, contextPath); statement.setString(3, virtualHosts); return statement; } } public JDBCSessionIdManager(Server server) { super(); _server=server; } public JDBCSessionIdManager(Server server, Random random) { super(random); _server=server; } /** * Configure jdbc connection information via a jdbc Driver * * @param driverClassName * @param connectionUrl */ public void setDriverInfo (String driverClassName, String connectionUrl) { _driverClassName=driverClassName; _connectionUrl=connectionUrl; } /** * Configure jdbc connection information via a jdbc Driver * * @param driverClass * @param connectionUrl */ public void setDriverInfo (Driver driverClass, String connectionUrl) { _driver=driverClass; _connectionUrl=connectionUrl; } public void setDatasource (DataSource ds) { _datasource = ds; } public DataSource getDataSource () { return _datasource; } public String getDriverClassName() { return _driverClassName; } public String getConnectionUrl () { return _connectionUrl; } public void setDatasourceName (String jndi) { _jndiName=jndi; } public String getDatasourceName () { return _jndiName; } public void setBlobType (String name) { _blobType = name; } public String getBlobType () { return _blobType; } public String getLongType() { return _longType; } public void setLongType(String longType) { this._longType = longType; } public void setScavengeInterval (long sec) { if (sec<=0) sec=60; long old_period=_scavengeIntervalMs; long period=sec*1000L; _scavengeIntervalMs=period; //add a bit of variability into the scavenge time so that not all //nodes with the same scavenge time sync up long tenPercent = _scavengeIntervalMs/10; if ((System.currentTimeMillis()%2) == 0) _scavengeIntervalMs += tenPercent; if (LOG.isDebugEnabled()) LOG.debug("Scavenging every "+_scavengeIntervalMs+" ms"); if (_timer!=null && (period!=old_period || _task==null)) { synchronized (this) { if (_task!=null) _task.cancel(); _task = new TimerTask() { @Override public void run() { scavenge(); } }; _timer.schedule(_task,_scavengeIntervalMs,_scavengeIntervalMs); } } } public long getScavengeInterval () { return _scavengeIntervalMs/1000; } public void addSession(HttpSession session) { if (session == null) return; synchronized (_sessionIds) { String id = ((JDBCSessionManager.Session)session).getClusterId(); try { insert(id); _sessionIds.add(id); } catch (Exception e) { LOG.warn("Problem storing session id="+id, e); } } } public void removeSession(HttpSession session) { if (session == null) return; removeSession(((JDBCSessionManager.Session)session).getClusterId()); } public void removeSession (String id) { if (id == null) return; synchronized (_sessionIds) { if (LOG.isDebugEnabled()) LOG.debug("Removing session id="+id); try { _sessionIds.remove(id); delete(id); } catch (Exception e) { LOG.warn("Problem removing session id="+id, e); } } } /** * Get the session id without any node identifier suffix. * * @see org.eclipse.jetty.server.SessionIdManager#getClusterId(java.lang.String) */ public String getClusterId(String nodeId) { int dot=nodeId.lastIndexOf('.'); return (dot>0)?nodeId.substring(0,dot):nodeId; } /** * Get the session id, including this node's id as a suffix. * * @see org.eclipse.jetty.server.SessionIdManager#getNodeId(java.lang.String, javax.servlet.http.HttpServletRequest) */ public String getNodeId(String clusterId, HttpServletRequest request) { if (_workerName!=null) return clusterId+'.'+_workerName; return clusterId; } public boolean idInUse(String id) { if (id == null) return false; String clusterId = getClusterId(id); boolean inUse = false; synchronized (_sessionIds) { inUse = _sessionIds.contains(clusterId); } if (inUse) return true; //optimisation - if this session is one we've been managing, we can check locally //otherwise, we need to go to the database to check try { return exists(clusterId); } catch (Exception e) { LOG.warn("Problem checking inUse for id="+clusterId, e); return false; } } /** * Invalidate the session matching the id on all contexts. * * @see org.eclipse.jetty.server.SessionIdManager#invalidateAll(java.lang.String) */ public void invalidateAll(String id) { //take the id out of the list of known sessionids for this node removeSession(id); synchronized (_sessionIds) { //tell all contexts that may have a session object with this id to //get rid of them Handler[] contexts = _server.getChildHandlersByClass(ContextHandler.class); for (int i=0; contexts!=null && i<contexts.length; i++) { SessionHandler sessionHandler = (SessionHandler)((ContextHandler)contexts[i]).getChildHandlerByClass(SessionHandler.class); if (sessionHandler != null) { SessionManager manager = sessionHandler.getSessionManager(); if (manager != null && manager instanceof JDBCSessionManager) { ((JDBCSessionManager)manager).invalidateSession(id); } } } } } /** * Start up the id manager. * * Makes necessary database tables and starts a Session * scavenger thread. */ @Override public void doStart() throws Exception { initializeDatabase(); prepareTables(); cleanExpiredSessions(); super.doStart(); if (LOG.isDebugEnabled()) LOG.debug("Scavenging interval = "+getScavengeInterval()+" sec"); _timer=new Timer("JDBCSessionScavenger", true); setScavengeInterval(getScavengeInterval()); } /** * Stop the scavenger. */ @Override public void doStop () throws Exception { synchronized(this) { if (_task!=null) _task.cancel(); if (_timer!=null) _timer.cancel(); _timer=null; } _sessionIds.clear(); super.doStop(); } /** * Get a connection from the driver or datasource. * * @return the connection for the datasource * @throws SQLException */ protected Connection getConnection () throws SQLException { if (_datasource != null) return _datasource.getConnection(); else return DriverManager.getConnection(_connectionUrl); } /** * Set up the tables in the database * @throws SQLException */ private void prepareTables() throws SQLException { _createSessionIdTable = "create table "+_sessionIdTable+" (id varchar(120), primary key(id))"; _selectBoundedExpiredSessions = "select * from "+_sessionTable+" where expiryTime >= ? and expiryTime <= ?"; _selectExpiredSessions = "select * from "+_sessionTable+" where expiryTime >0 and expiryTime <= ?"; _deleteOldExpiredSessions = "delete from "+_sessionTable+" where expiryTime >0 and expiryTime <= ?"; _insertId = "insert into "+_sessionIdTable+" (id) values (?)"; _deleteId = "delete from "+_sessionIdTable+" where id = ?"; _queryId = "select * from "+_sessionIdTable+" where id = ?"; Connection connection = null; try { //make the id table connection = getConnection(); connection.setAutoCommit(true); DatabaseMetaData metaData = connection.getMetaData(); _dbAdaptor = new DatabaseAdaptor(metaData); _sessionTableRowId = _dbAdaptor.getRowIdColumnName(); //checking for table existence is case-sensitive, but table creation is not String tableName = _dbAdaptor.convertIdentifier(_sessionIdTable); ResultSet result = metaData.getTables(null, null, tableName, null); if (!result.next()) { //table does not exist, so create it connection.createStatement().executeUpdate(_createSessionIdTable); } //make the session table if necessary tableName = _dbAdaptor.convertIdentifier(_sessionTable); result = metaData.getTables(null, null, tableName, null); if (!result.next()) { //table does not exist, so create it String blobType = _dbAdaptor.getBlobType(); String longType = _dbAdaptor.getLongType(); _createSessionTable = "create table "+_sessionTable+" ("+_sessionTableRowId+" varchar(120), sessionId varchar(120), "+ " contextPath varchar(60), virtualHost varchar(60), lastNode varchar(60), accessTime "+longType+", "+ " lastAccessTime "+longType+", createTime "+longType+", cookieTime "+longType+", "+ " lastSavedTime "+longType+", expiryTime "+longType+", map "+blobType+", primary key("+_sessionTableRowId+"))"; connection.createStatement().executeUpdate(_createSessionTable); } //make some indexes on the JettySessions table String index1 = "idx_"+_sessionTable+"_expiry"; String index2 = "idx_"+_sessionTable+"_session"; result = metaData.getIndexInfo(null, null, tableName, false, false); boolean index1Exists = false; boolean index2Exists = false; while (result.next()) { String idxName = result.getString("INDEX_NAME"); if (index1.equalsIgnoreCase(idxName)) index1Exists = true; else if (index2.equalsIgnoreCase(idxName)) index2Exists = true; } if (!(index1Exists && index2Exists)) { Statement statement = connection.createStatement(); try { if (!index1Exists) statement.executeUpdate("create index "+index1+" on "+_sessionTable+" (expiryTime)"); if (!index2Exists) statement.executeUpdate("create index "+index2+" on "+_sessionTable+" (sessionId, contextPath)"); } finally { if (statement!=null) { try { statement.close(); } catch(Exception e) { LOG.warn(e); } } } } //set up some strings representing the statements for session manipulation _insertSession = "insert into "+_sessionTable+ " ("+_sessionTableRowId+", sessionId, contextPath, virtualHost, lastNode, accessTime, lastAccessTime, createTime, cookieTime, lastSavedTime, expiryTime, map) "+ " values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; _deleteSession = "delete from "+_sessionTable+ " where "+_sessionTableRowId+" = ?"; _updateSession = "update "+_sessionTable+ " set lastNode = ?, accessTime = ?, lastAccessTime = ?, lastSavedTime = ?, expiryTime = ?, map = ? where "+_sessionTableRowId+" = ?"; _updateSessionNode = "update "+_sessionTable+ " set lastNode = ? where "+_sessionTableRowId+" = ?"; _updateSessionAccessTime = "update "+_sessionTable+ " set lastNode = ?, accessTime = ?, lastAccessTime = ?, lastSavedTime = ?, expiryTime = ? where "+_sessionTableRowId+" = ?"; } finally { if (connection != null) connection.close(); } } /** * Insert a new used session id into the table. * * @param id * @throws SQLException */ private void insert (String id) throws SQLException { Connection connection = null; PreparedStatement statement = null; PreparedStatement query = null; try { connection = getConnection(); connection.setAutoCommit(true); query = connection.prepareStatement(_queryId); query.setString(1, id); ResultSet result = query.executeQuery(); //only insert the id if it isn't in the db already if (!result.next()) { statement = connection.prepareStatement(_insertId); statement.setString(1, id); statement.executeUpdate(); } } finally { if (query!=null) { try { query.close(); } catch(Exception e) { LOG.warn(e); } } if (statement!=null) { try { statement.close(); } catch(Exception e) { LOG.warn(e); } } if (connection != null) connection.close(); } } /** * Remove a session id from the table. * * @param id * @throws SQLException */ private void delete (String id) throws SQLException { Connection connection = null; PreparedStatement statement = null; try { connection = getConnection(); connection.setAutoCommit(true); statement = connection.prepareStatement(_deleteId); statement.setString(1, id); statement.executeUpdate(); } finally { if (statement!=null) { try { statement.close(); } catch(Exception e) { LOG.warn(e); } } if (connection != null) connection.close(); } } /** * Check if a session id exists. * * @param id * @return * @throws SQLException */ private boolean exists (String id) throws SQLException { Connection connection = null; PreparedStatement statement = null; try { connection = getConnection(); connection.setAutoCommit(true); statement = connection.prepareStatement(_queryId); statement.setString(1, id); ResultSet result = statement.executeQuery(); return result.next(); } finally { if (statement!=null) { try { statement.close(); } catch(Exception e) { LOG.warn(e); } } if (connection != null) connection.close(); } } /** * Look for sessions in the database that have expired. * * We do this in the SessionIdManager and not the SessionManager so * that we only have 1 scavenger, otherwise if there are n SessionManagers * there would be n scavengers, all contending for the database. * * We look first for sessions that expired in the previous interval, then * for sessions that expired previously - these are old sessions that no * node is managing any more and have become stuck in the database. */ private void scavenge () { Connection connection = null; PreparedStatement statement = null; List<String> expiredSessionIds = new ArrayList<String>(); try { if (LOG.isDebugEnabled()) LOG.debug("Scavenge sweep started at "+System.currentTimeMillis()); if (_lastScavengeTime > 0) { connection = getConnection(); connection.setAutoCommit(true); //"select sessionId from JettySessions where expiryTime > (lastScavengeTime - scanInterval) and expiryTime < lastScavengeTime"; statement = connection.prepareStatement(_selectBoundedExpiredSessions); long lowerBound = (_lastScavengeTime - _scavengeIntervalMs); long upperBound = _lastScavengeTime; if (LOG.isDebugEnabled()) LOG.debug (" Searching for sessions expired between "+lowerBound + " and "+upperBound); statement.setLong(1, lowerBound); statement.setLong(2, upperBound); ResultSet result = statement.executeQuery(); while (result.next()) { String sessionId = result.getString("sessionId"); expiredSessionIds.add(sessionId); if (LOG.isDebugEnabled()) LOG.debug (" Found expired sessionId="+sessionId); } //tell the SessionManagers to expire any sessions with a matching sessionId in memory Handler[] contexts = _server.getChildHandlersByClass(ContextHandler.class); for (int i=0; contexts!=null && i<contexts.length; i++) { SessionHandler sessionHandler = (SessionHandler)((ContextHandler)contexts[i]).getChildHandlerByClass(SessionHandler.class); if (sessionHandler != null) { SessionManager manager = sessionHandler.getSessionManager(); if (manager != null && manager instanceof JDBCSessionManager) { ((JDBCSessionManager)manager).expire(expiredSessionIds); } } } //find all sessions that have expired at least a couple of scanIntervals ago and just delete them upperBound = _lastScavengeTime - (2 * _scavengeIntervalMs); if (upperBound > 0) { if (LOG.isDebugEnabled()) LOG.debug("Deleting old expired sessions expired before "+upperBound); try { statement = connection.prepareStatement(_deleteOldExpiredSessions); statement.setLong(1, upperBound); int rows = statement.executeUpdate(); if (LOG.isDebugEnabled()) LOG.debug("Deleted "+rows+" rows of old sessions expired before "+upperBound); } finally { if (statement!=null) { try { statement.close(); } catch(Exception e) { LOG.warn(e); } } } } } } catch (Exception e) { if (isRunning()) LOG.warn("Problem selecting expired sessions", e); else LOG.ignore(e); } finally { _lastScavengeTime=System.currentTimeMillis(); if (LOG.isDebugEnabled()) LOG.debug("Scavenge sweep ended at "+_lastScavengeTime); if (connection != null) { try { connection.close(); } catch (SQLException e) { LOG.warn(e); } } } } /** * Get rid of sessions and sessionids from sessions that have already expired * @throws Exception */ private void cleanExpiredSessions () { Connection connection = null; PreparedStatement statement = null; Statement sessionsTableStatement = null; Statement sessionIdsTableStatement = null; List<String> expiredSessionIds = new ArrayList<String>(); try { connection = getConnection(); connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); connection.setAutoCommit(false); statement = connection.prepareStatement(_selectExpiredSessions); long now = System.currentTimeMillis(); if (LOG.isDebugEnabled()) LOG.debug ("Searching for sessions expired before {}", now); statement.setLong(1, now); ResultSet result = statement.executeQuery(); while (result.next()) { String sessionId = result.getString("sessionId"); expiredSessionIds.add(sessionId); if (LOG.isDebugEnabled()) LOG.debug ("Found expired sessionId={}", sessionId); } sessionsTableStatement = null; sessionIdsTableStatement = null; if (!expiredSessionIds.isEmpty()) { sessionsTableStatement = connection.createStatement(); sessionsTableStatement.executeUpdate(createCleanExpiredSessionsSql("delete from "+_sessionTable+" where sessionId in ", expiredSessionIds)); sessionIdsTableStatement = connection.createStatement(); sessionIdsTableStatement.executeUpdate(createCleanExpiredSessionsSql("delete from "+_sessionIdTable+" where id in ", expiredSessionIds)); } connection.commit(); synchronized (_sessionIds) { _sessionIds.removeAll(expiredSessionIds); //in case they were in our local cache of session ids } } catch (Exception e) { if (connection != null) { try { LOG.warn("Rolling back clean of expired sessions", e); connection.rollback(); } catch (Exception x) { LOG.warn("Rollback of expired sessions failed", x);} } } finally { if (sessionIdsTableStatement!=null) { try { sessionIdsTableStatement.close(); } catch(Exception e) { LOG.warn(e); } } if (sessionsTableStatement!=null) { try { sessionsTableStatement.close(); } catch(Exception e) { LOG.warn(e); } } if (statement!=null) { try { statement.close(); } catch(Exception e) { LOG.warn(e); } } try { if (connection != null) connection.close(); } catch (SQLException e) { LOG.warn(e); } } } /** * * @param sql * @param connection * @param expiredSessionIds * @throws Exception */ private String createCleanExpiredSessionsSql (String sql,Collection<String> expiredSessionIds) throws Exception { StringBuffer buff = new StringBuffer(); buff.append(sql); buff.append("("); Iterator<String> itor = expiredSessionIds.iterator(); while (itor.hasNext()) { buff.append("'"+(itor.next())+"'"); if (itor.hasNext()) buff.append(","); } buff.append(")"); if (LOG.isDebugEnabled()) LOG.debug("Cleaning expired sessions with: {}", buff); return buff.toString(); } private void initializeDatabase () throws Exception { if (_datasource != null) return; //already set up if (_jndiName!=null) { InitialContext ic = new InitialContext(); _datasource = (DataSource)ic.lookup(_jndiName); } else if ( _driver != null && _connectionUrl != null ) { DriverManager.registerDriver(_driver); } else if (_driverClassName != null && _connectionUrl != null) { Class.forName(_driverClassName); } else throw new IllegalStateException("No database configured for sessions"); } }