Mercurial Hosting > luan
view src/luan/modules/ThreadLuan.java @ 1503:74c534de211f
use InheritableThreadLocal in ThreadLocalAppender
author | Franklin Schmidt <fschmidt@gmail.com> |
---|---|
date | Sun, 10 May 2020 23:28:16 -0600 |
parents | 219f2b937f2b |
children | 78d937870762 |
line wrap: on
line source
package luan.modules; import java.io.Closeable; import java.util.Arrays; import java.util.Iterator; import java.util.Map; import java.util.HashMap; import java.util.LinkedHashMap; import java.lang.ref.Reference; import java.lang.ref.WeakReference; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import luan.Luan; import luan.LuanFunction; import luan.LuanTable; import luan.LuanException; import luan.LuanCloner; import luan.LuanCloneable; import luan.modules.logging.LuanLogger; import goodjava.logging.Logger; import goodjava.logging.LoggerFactory; public final class ThreadLuan { private static final Logger logger = LoggerFactory.getLogger(ThreadLuan.class); private static final Executor exec = Executors.newCachedThreadPool(); public static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); private static Runnable runnable(final LuanFunction fn) { return new Runnable() { public synchronized void run() { LuanLogger.startThreadLogging(fn.luan()); try { fn.call(); } catch(LuanException e) { e.printStackTrace(); } finally { LuanLogger.endThreadLogging(); } } }; } public static void fork(LuanFunction fn) { LuanCloner cloner = new LuanCloner(LuanCloner.Type.COMPLETE); final LuanFunction newFn = (LuanFunction)cloner.get(fn); exec.execute(runnable(newFn)); } private static Map<String,Reference<ScheduledFuture>> scheduleds = new ConcurrentHashMap<String,Reference<ScheduledFuture>>(); private static void cancel(ScheduledFuture sf,String src) { boolean b = sf.cancel(false); if( !sf.isCancelled() ) logger.error(src+" cancel="+b+" isCancelled="+sf.isCancelled()+" isDone="+sf.isDone()+" "+sf); } public static void schedule(LuanFunction fn,LuanTable options) throws LuanException { options = new LuanTable(options); Number delay = Utils.removeNumber(options,"delay"); Number repeatingDelay = Utils.removeNumber(options,"repeating_delay"); Number repeatingRate = Utils.removeNumber(options,"repeating_rate"); String id = Utils.removeString(options,"id"); if( repeatingDelay!=null && repeatingRate!=null ) throw new LuanException("can't define both repeating_delay and repeating_rate"); boolean repeating = repeatingDelay!=null || repeatingRate!=null; Utils.checkEmpty(options); if( id != null ) { Reference<ScheduledFuture> ref = scheduleds.remove(id); if( ref != null ) { ScheduledFuture sf = ref.get(); if( sf != null ) cancel(sf,"id "+id); } } Luan luan = fn.luan(); LuanCloner cloner = new LuanCloner(LuanCloner.Type.COMPLETE); final Luan newLuan = (Luan)cloner.clone(luan); final LuanFunction newFn = (LuanFunction)cloner.get(fn); final Runnable r = runnable(newFn); final ScheduledFuture sf; if( repeatingDelay != null ) { if( delay==null ) delay = repeatingDelay; sf = scheduler.scheduleWithFixedDelay(r,delay.longValue(),repeatingDelay.longValue(),TimeUnit.MILLISECONDS); } else if( repeatingRate != null ) { if( delay==null ) delay = repeatingRate; sf = scheduler.scheduleWithFixedDelay(r,delay.longValue(),repeatingRate.longValue(),TimeUnit.MILLISECONDS); } else if( delay != null ) { sf = scheduler.schedule(r,delay.longValue(),TimeUnit.MILLISECONDS); } else { scheduler.schedule(r,0L,TimeUnit.MILLISECONDS); return; } Object c = new Object() { protected void finalize() throws Throwable { cancel(sf,"gc"); } }; luan.registry().put(c,c); // cancel on gc if( id != null ) scheduleds.put(id,new WeakReference<ScheduledFuture>(sf)); } /* public static class GlobalMap { private static class Value { final long time = System.currentTimeMillis(); final Object v; Value(Object v) { this.v = v; } } public long timeout = 60000L; // one minute private Map<String,Value> map = new LinkedHashMap<String,Value>() { protected boolean removeEldestEntry(Map.Entry<String,Value> eldest) { return eldest.getValue().time < System.currentTimeMillis() - timeout; } }; public synchronized Object get(String key) { Value val = map.get(key); return val==null ? null : val.v; } public synchronized Object put(String key,Object v) throws LuanException { Value val; if( v == null ) { val = map.remove(key); } else { if( !(v instanceof String || v instanceof Boolean || v instanceof Number) ) throw new LuanException("can't assign type "+Luan.type(v)+" to Thread.global"); val = map.put(key,new Value(v)); } return val==null ? null : val.v; } } */ public static void sleep(long millis) throws InterruptedException { Thread.sleep(millis); } private static class Unsafe { private final String reason; Unsafe(String reason) { this.reason = reason; } } private static Object makeSafe(Luan luan,Object v) throws LuanException { if( v instanceof LuanTable ) { LuanTable tbl = (LuanTable)v; if( tbl.getMetatable() != null ) return new Unsafe("table with metatable"); LuanTable rtn = new LuanTable(luan); for( Map.Entry entry : tbl.rawIterable() ) { Object key = makeSafe( luan, entry.getKey() ); if( key instanceof Unsafe ) return key; Object value = makeSafe( luan, entry.getValue() ); if( value instanceof Unsafe ) return value; rtn.rawPut(key,value); } return rtn; } else if( v instanceof Object[] ) { Object[] a = (Object[])v; for( int i=0; i<a.length; i++ ) { Object obj = makeSafe(luan,a[i]); if( obj instanceof Unsafe ) return obj; a[i] = obj; } return a; } else { if( v instanceof LuanCloneable ) return new Unsafe("type "+Luan.type(v)); return v; } } public static final class Callable { private long expires; private final Luan luan = new Luan(); private final LuanTable fns; Callable(LuanTable fns) { LuanCloner cloner = new LuanCloner(LuanCloner.Type.COMPLETE); this.fns = (LuanTable)cloner.get(fns); } public synchronized Object call(Luan callerLuan,String fnName,Object... args) throws LuanException { Object obj = makeSafe(luan,args); if( obj instanceof Unsafe ) throw new LuanException("can't pass "+((Unsafe)obj).reason+" to global_callable "+Arrays.asList(args)); args = (Object[])obj; Object f = fns.get(fnName); if( f == null ) throw new LuanException("function '"+fnName+"' not found in global_callable"); if( !(f instanceof LuanFunction) ) throw new LuanException("value of '"+fnName+"' not a function in global_callable"); LuanFunction fn = (LuanFunction)f; Object rtn = fn.call(args); rtn = makeSafe(callerLuan,rtn); if( rtn instanceof Unsafe ) throw new LuanException("can't return "+((Unsafe)rtn).reason+" from global_callable"); return rtn; } } private static Map<String,Callable> callableMap = new HashMap<String,Callable>(); private static void sweep() { long now = System.currentTimeMillis(); for( Iterator<Callable> iter = callableMap.values().iterator(); iter.hasNext(); ) { Callable callable = iter.next(); if( callable.expires < now ) iter.remove(); } } public static synchronized Callable globalCallable(String name,LuanTable fns,long timeout) { Callable callable = callableMap.get(name); if( callable == null ) { sweep(); callable = new Callable(fns); callableMap.put(name,callable); } callable.expires = System.currentTimeMillis() + timeout; return callable; } public static synchronized void removeGlobalCallable(String name) { callableMap.remove(name); } public static void lock(Lock lock) throws LuanException, InterruptedException { if( !lock.tryLock(10,TimeUnit.MINUTES) ) throw new LuanException("failed to acquire lock"); } public static Object runInLock(Lock lock,LuanFunction fn,Object... args) throws LuanException, InterruptedException { lock(lock); try { return fn.call(args); } finally { lock.unlock(); } } }