Mercurial Hosting > luan
changeset 2171:8b77bd42864d
add add Thread.map_reduce
| author | Franklin Schmidt <fschmidt@gmail.com> |
|---|---|
| date | Sun, 22 Mar 2026 19:24:52 -0600 |
| parents | a59d0bf68830 |
| children | 75c45f1a743e |
| files | src/goodjava/util/MapReduce.java src/luan/modules/Thread.luan src/luan/modules/ThreadLuan.java website/src/examples/map_reduce.txt.luan |
| diffstat | 4 files changed, 137 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/goodjava/util/MapReduce.java Sun Mar 22 19:24:52 2026 -0600 @@ -0,0 +1,48 @@ +package goodjava.util; + +import java.util.Collections; +import java.util.List; +import java.util.ArrayList; +import java.util.Map; +import java.util.HashMap; +import java.util.WeakHashMap; +import java.util.Set; + + +public class MapReduce { + + public interface Mapper { + public List map(Object arg); + } + + public interface Handler extends Mapper { + public List reduce(List<List> lists); + } + + private static Map<String,MapReduce> instances = new HashMap<String,MapReduce>(); + + // an active handler will be arbitrarily chosen for reduce + public static synchronized MapReduce register(String key,Mapper mapper) { + MapReduce mr = instances.get(key); + if( mr == null ) { + mr = new MapReduce(); + instances.put(key,mr); + } + mr.mappers.add(mapper); + return mr; + } + + private Set<Mapper> mappers = Collections.newSetFromMap(new WeakHashMap<Mapper, Boolean>()); + + public List run(Object arg) { + List<List> lists = new ArrayList<List>(); + Handler lastHandler = null; + Mapper[] snapshot = mappers.toArray(new Mapper[0]); + for( Mapper m : snapshot ) { + lists.add( m.map(arg) ); + if( m instanceof Handler ) + lastHandler = (Handler)m; + } + return lastHandler==null ? Collections.emptyList() : lastHandler.reduce(lists); + } +}
--- a/src/luan/modules/Thread.luan Tue Feb 17 22:24:02 2026 -0700 +++ b/src/luan/modules/Thread.luan Sun Mar 22 19:24:52 2026 -0600 @@ -9,6 +9,8 @@ local new_error = Luan.new_error or error() local set_metatable = Luan.set_metatable or error() local Time = require "luan:Time.luan" +local Table = require "luan:Table.luan" +local java_to_table_shallow = Table.java_to_table_shallow or error() local Logging = require "luan:logging/Logging.luan" local logger = Logging.logger "Thread" @@ -137,4 +139,13 @@ end +function Thread.map_reduce(key,map,reduce) + local mr = ThreadLuan.registerMapReduce(key,map,reduce) + return function(arg) + local list = mr.run(arg) + return java_to_table_shallow(list) + end +end + + return Thread
--- a/src/luan/modules/ThreadLuan.java Tue Feb 17 22:24:02 2026 -0700 +++ b/src/luan/modules/ThreadLuan.java Sun Mar 22 19:24:52 2026 -0600 @@ -4,6 +4,7 @@ import java.util.Iterator; import java.util.Map; import java.util.HashMap; +import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; @@ -16,6 +17,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import goodjava.util.WeakCacheMap; +import goodjava.util.MapReduce; import luan.Luan; import luan.LuanFunction; import luan.LuanTable; @@ -287,4 +289,52 @@ return new FutureLuan(ft); } + + public static MapReduce registerMapReduce(Luan luan,String key,LuanFunction map,LuanFunction reduce) + throws LuanException + { + MapReduce.Mapper m; + final Luan newLuan = new Luan(luan); + LuanMutable.makeImmutable(map); + if( reduce == null ) { + m = new MapReduce.Mapper() { + @Override public List map(Object arg) { + try { + LuanTable t = (LuanTable)map.call(newLuan,arg); + return t.asList(); + } catch(LuanException e) { + throw new LuanRuntimeException(e); + } + } + }; + } else { + Luan.checkSecurity(luan,"java"); + LuanMutable.makeImmutable(reduce); + m = new MapReduce.Handler() { + @Override public List map(Object arg) { + try { + LuanTable t = (LuanTable)map.call(newLuan,arg); + return t.asList(); + } catch(LuanException e) { + throw new LuanRuntimeException(e); + } + } + @Override public List reduce(List<List> lists) { + try { + LuanTable tLists = new LuanTable(); + for( List list : lists ) { + tLists.rawAdd( new LuanTable(list) ); + } + LuanTable t = (LuanTable)reduce.call(newLuan,tLists); + return t.asList(); + } catch(LuanException e) { + throw new LuanRuntimeException(e); + } + } + }; + } + luan.registry.put("MapReduce_"+key,m); + return MapReduce.register(key,m); + } + }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/website/src/examples/map_reduce.txt.luan Sun Mar 22 19:24:52 2026 -0600 @@ -0,0 +1,28 @@ +local Luan = require "luan:Luan.luan" +local error = Luan.error +local ipairs = Luan.ipairs or error() +local Thread = require "luan:Thread.luan" +local Io = require "luan:Io.luan" +local Http = require "luan:http/Http.luan" + + +local function map(_) + return {1} +end + +local function reduce(lists) + local n = 0 + for _, list in ipairs(lists) do + n = n + list[1] + end + return {n} +end + +local mr = Thread.map_reduce("count",map,reduce) + +return function() + local list = mr(nil) + local n = list[1] + Io.stdout = Http.response.text_writer() + %>count: <%=n%><% +end
