changeset 2171:8b77bd42864d

add add Thread.map_reduce
author Franklin Schmidt <fschmidt@gmail.com>
date Sun, 22 Mar 2026 19:24:52 -0600
parents a59d0bf68830
children 75c45f1a743e
files src/goodjava/util/MapReduce.java src/luan/modules/Thread.luan src/luan/modules/ThreadLuan.java website/src/examples/map_reduce.txt.luan
diffstat 4 files changed, 137 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/goodjava/util/MapReduce.java	Sun Mar 22 19:24:52 2026 -0600
@@ -0,0 +1,48 @@
+package goodjava.util;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.WeakHashMap;
+import java.util.Set;
+
+
+public class MapReduce {
+
+	public interface Mapper {
+		public List map(Object arg);
+	}
+
+	public interface Handler extends Mapper {
+		public List reduce(List<List> lists);
+	}
+
+	private static Map<String,MapReduce> instances = new HashMap<String,MapReduce>();
+
+	// an active handler will be arbitrarily chosen for reduce
+	public static synchronized MapReduce register(String key,Mapper mapper) {
+		MapReduce mr = instances.get(key);
+		if( mr == null ) {
+			mr = new MapReduce();
+			instances.put(key,mr);
+		}
+		mr.mappers.add(mapper);
+		return mr;
+	}
+
+	private Set<Mapper> mappers = Collections.newSetFromMap(new WeakHashMap<Mapper, Boolean>());
+
+	public List run(Object arg) {
+		List<List> lists = new ArrayList<List>();
+		Handler lastHandler = null;
+		Mapper[] snapshot = mappers.toArray(new Mapper[0]);
+		for( Mapper m : snapshot ) {
+			lists.add( m.map(arg) );
+			if( m instanceof Handler )
+				lastHandler = (Handler)m;
+		}
+		return lastHandler==null ? Collections.emptyList() : lastHandler.reduce(lists);
+	}
+}
--- a/src/luan/modules/Thread.luan	Tue Feb 17 22:24:02 2026 -0700
+++ b/src/luan/modules/Thread.luan	Sun Mar 22 19:24:52 2026 -0600
@@ -9,6 +9,8 @@
 local new_error = Luan.new_error or error()
 local set_metatable = Luan.set_metatable or error()
 local Time = require "luan:Time.luan"
+local Table = require "luan:Table.luan"
+local java_to_table_shallow = Table.java_to_table_shallow or error()
 local Logging = require "luan:logging/Logging.luan"
 local logger = Logging.logger "Thread"
 
@@ -137,4 +139,13 @@
 end
 
 
+function Thread.map_reduce(key,map,reduce)
+	local mr = ThreadLuan.registerMapReduce(key,map,reduce)
+	return function(arg)
+		local list = mr.run(arg)
+		return java_to_table_shallow(list)
+	end
+end
+
+
 return Thread
--- a/src/luan/modules/ThreadLuan.java	Tue Feb 17 22:24:02 2026 -0700
+++ b/src/luan/modules/ThreadLuan.java	Sun Mar 22 19:24:52 2026 -0600
@@ -4,6 +4,7 @@
 import java.util.Iterator;
 import java.util.Map;
 import java.util.HashMap;
+import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Future;
 import java.util.concurrent.FutureTask;
@@ -16,6 +17,7 @@
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import goodjava.util.WeakCacheMap;
+import goodjava.util.MapReduce;
 import luan.Luan;
 import luan.LuanFunction;
 import luan.LuanTable;
@@ -287,4 +289,52 @@
 		return new FutureLuan(ft);
 	}
 
+
+	public static MapReduce registerMapReduce(Luan luan,String key,LuanFunction map,LuanFunction reduce)
+		throws LuanException
+	{
+		MapReduce.Mapper m;
+		final Luan newLuan = new Luan(luan);
+		LuanMutable.makeImmutable(map);
+		if( reduce == null ) {
+			m = new MapReduce.Mapper() {
+				@Override public List map(Object arg) {
+					try {
+						LuanTable t = (LuanTable)map.call(newLuan,arg);
+						return t.asList();
+					} catch(LuanException e) {
+						throw new LuanRuntimeException(e);
+					}
+				}
+			};
+		} else {
+			Luan.checkSecurity(luan,"java");
+			LuanMutable.makeImmutable(reduce);
+			m = new MapReduce.Handler() {
+				@Override public List map(Object arg) {
+					try {
+						LuanTable t = (LuanTable)map.call(newLuan,arg);
+						return t.asList();
+					} catch(LuanException e) {
+						throw new LuanRuntimeException(e);
+					}
+				}
+				@Override public List reduce(List<List> lists) {
+					try {
+						LuanTable tLists = new LuanTable();
+						for( List list : lists ) {
+							tLists.rawAdd( new LuanTable(list) );
+						}
+						LuanTable t = (LuanTable)reduce.call(newLuan,tLists);
+						return t.asList();
+					} catch(LuanException e) {
+						throw new LuanRuntimeException(e);
+					}
+				}
+			};
+		}
+		luan.registry.put("MapReduce_"+key,m);
+		return MapReduce.register(key,m);
+	}
+
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/website/src/examples/map_reduce.txt.luan	Sun Mar 22 19:24:52 2026 -0600
@@ -0,0 +1,28 @@
+local Luan = require "luan:Luan.luan"
+local error = Luan.error
+local ipairs = Luan.ipairs or error()
+local Thread = require "luan:Thread.luan"
+local Io = require "luan:Io.luan"
+local Http = require "luan:http/Http.luan"
+
+
+local function map(_)
+	return {1}
+end
+
+local function reduce(lists)
+	local n = 0
+	for _, list in ipairs(lists) do
+		n = n + list[1]
+	end
+	return {n}
+end
+
+local mr = Thread.map_reduce("count",map,reduce)
+
+return function()
+	local list = mr(nil)
+	local n = list[1]
+	Io.stdout = Http.response.text_writer()
+	%>count: <%=n%><%
+end