changeset 1738:9713f7fd50b3

server-sent events
author Franklin Schmidt <fschmidt@gmail.com>
date Thu, 03 Nov 2022 19:23:53 -0600
parents 6c9aea554691
children 3e9f26404433
files src/goodjava/webserver/Connection.java src/goodjava/webserver/ServerSentEvents.java src/luan/modules/http/Http.luan website/src/examples/sse_pull.html website/src/examples/sse_push.html.luan
diffstat 5 files changed, 214 insertions(+), 67 deletions(-) [+]
line wrap: on
line diff
diff -r 6c9aea554691 -r 9713f7fd50b3 src/goodjava/webserver/Connection.java
--- a/src/goodjava/webserver/Connection.java	Tue Oct 18 22:08:29 2022 -0600
+++ b/src/goodjava/webserver/Connection.java	Thu Nov 03 19:23:53 2022 -0600
@@ -32,81 +32,85 @@
 			Response response;
 			String contentType = null;
 			try {
-				{
-					InputStream in = socket.getInputStream();
-					byte[] a = new byte[8192];
-					int endOfHeader;
-					int size = 0;
-					int left = a.length;
-					outer: while(true) {
-						int n = in.read(a,size,left);
+				InputStream in = socket.getInputStream();
+				byte[] a = new byte[8192];
+				int endOfHeader;
+				int size = 0;
+				int left = a.length;
+				outer: while(true) {
+					int n = in.read(a,size,left);
+					if( n == -1 ) {
+						if( size == 0 ) {
+							socket.close();
+							return;
+						}
+						throw new IOException("unexpected end of input at "+size);
+					}
+					size += n;
+					for( int i=0; i<=size-4; i++ ) {
+						if( a[i]=='\r' && a[i+1]=='\n' && a[i+2]=='\r' && a[i+3]=='\n' ) {
+							endOfHeader = i + 4;
+							break outer;
+						}
+					}
+					left -= n;
+					if( left == 0 ) {
+						byte[] a2 = new byte[2*a.length];
+						System.arraycopy(a,0,a2,0,size);
+						a = a2;
+						left = a.length - size;
+					}
+				}
+				rawHead = new String(a,0,endOfHeader);
+				//System.out.println(rawHead);
+				request.rawHead = rawHead;
+				RequestParser parser = new RequestParser(request);
+				parser.parseHead();
+	
+				String lenStr = (String)request.headers.get("content-length");
+				if( lenStr != null ) {
+					int len = Integer.parseInt(lenStr);
+					byte[] body = new byte[len];
+					size -= endOfHeader;
+					System.arraycopy(a,endOfHeader,body,0,size);
+					while( size < len ) {
+						int n = in.read(body,size,len-size);
 						if( n == -1 ) {
-							if( size == 0 ) {
-								socket.close();
-								return;
-							}
 							throw new IOException("unexpected end of input at "+size);
 						}
 						size += n;
-						for( int i=0; i<=size-4; i++ ) {
-							if( a[i]=='\r' && a[i+1]=='\n' && a[i+2]=='\r' && a[i+3]=='\n' ) {
-								endOfHeader = i + 4;
-								break outer;
-							}
-						}
-						left -= n;
-						if( left == 0 ) {
-							byte[] a2 = new byte[2*a.length];
-							System.arraycopy(a,0,a2,0,size);
-							a = a2;
-							left = a.length - size;
-						}
 					}
-					rawHead = new String(a,0,endOfHeader);
-					//System.out.println(rawHead);
-					request.rawHead = rawHead;
-					RequestParser parser = new RequestParser(request);
-					parser.parseHead();
-		
-					String lenStr = (String)request.headers.get("content-length");
-					if( lenStr != null ) {
-						int len = Integer.parseInt(lenStr);
-						byte[] body = new byte[len];
-						size -= endOfHeader;
-						System.arraycopy(a,endOfHeader,body,0,size);
-						while( size < len ) {
-							int n = in.read(body,size,len-size);
-							if( n == -1 ) {
-								throw new IOException("unexpected end of input at "+size);
-							}
-							size += n;
-						}
-						request.body = body;
-						//System.out.println(new String(request.body));
+					request.body = body;
+					//System.out.println(new String(request.body));
+				}
+
+				contentType = (String)request.headers.get("content-type");
+				if( contentType != null ) {
+					contentType = contentType.toLowerCase();
+					if( contentType.equals("application/x-www-form-urlencoded") ) {
+						parser.parseUrlencoded(null);
+					} else if( contentType.equals("application/x-www-form-urlencoded; charset=utf-8") ) {
+						parser.parseUrlencoded("utf-8");
+					} else if( contentType.startsWith("multipart/form-data;") ) {
+						parser.parseMultipart();
+					} else if( contentType.equals("application/json") ) {
+						parser.parseJson(null);
+					} else if( contentType.equals("application/json; charset=utf-8") ) {
+						parser.parseJson("utf-8");
+					} else {
+						logger.info("unknown request content-type: "+contentType);
 					}
-	
-					contentType = (String)request.headers.get("content-type");
-					if( contentType != null ) {
-						contentType = contentType.toLowerCase();
-						if( contentType.equals("application/x-www-form-urlencoded") ) {
-							parser.parseUrlencoded(null);
-						} else if( contentType.equals("application/x-www-form-urlencoded; charset=utf-8") ) {
-							parser.parseUrlencoded("utf-8");
-						} else if( contentType.startsWith("multipart/form-data;") ) {
-							parser.parseMultipart();
-						} else if( contentType.equals("application/json") ) {
-							parser.parseJson(null);
-						} else if( contentType.equals("application/json; charset=utf-8") ) {
-							parser.parseJson("utf-8");
-						} else {
-							logger.info("unknown request content-type: "+contentType);
-						}
-					}
+				}
+
+				String scheme = (String)request.headers.get("x-forwarded-proto");
+				if( scheme != null )
+					request.scheme = scheme;
 
-					String scheme = (String)request.headers.get("x-forwarded-proto");
-					if( scheme != null )
-						request.scheme = scheme;
+				if( "text/event-stream".equals(request.headers.get("accept")) ) {
+					ServerSentEvents.add(socket,request);
+					return;
 				}
+
 				response = server.handler.handle(request);
 			} catch(ParseException e) {
 				logger.warn("parse error\n"+request.rawHead.trim()+"\n",e);
diff -r 6c9aea554691 -r 9713f7fd50b3 src/goodjava/webserver/ServerSentEvents.java
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/goodjava/webserver/ServerSentEvents.java	Thu Nov 03 19:23:53 2022 -0600
@@ -0,0 +1,96 @@
+package goodjava.webserver;
+
+import java.io.Writer;
+import java.io.OutputStreamWriter;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.net.Socket;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Collections;
+import java.util.Iterator;
+
+
+public class ServerSentEvents {
+
+	private static class Con {
+		final Socket socket;
+		final Writer writer;
+
+		Con(Socket socket) throws IOException {
+			this.socket = socket;
+			this.writer = new BufferedWriter( new OutputStreamWriter( socket.getOutputStream(), "UTF-8" ) );
+		}
+	}
+
+	private static class EventHandler {
+		private final List<Con> cons = new ArrayList<Con>();
+
+		synchronized void add(Con con) {
+			cons.add(con);
+		}
+
+		synchronized void write( String url, String content ) {
+			Iterator<Con> iter = cons.iterator();
+			while( iter.hasNext() ) {
+				Con con = iter.next();
+				Writer writer = con.writer;
+				try {
+					writer.write(content);
+					writer.write("\n\n");
+					writer.flush();
+				} catch(IOException e) {
+					iter.remove();
+				}
+			}
+			if( cons.isEmpty() )
+				map.remove(url);
+		}
+	}
+
+	private static final Map<String,EventHandler> map
+		= Collections.synchronizedMap(new HashMap<String,EventHandler>());
+
+	static void add(Socket socket,Request request) throws IOException {
+		Con con = new Con(socket);
+
+		Writer writer = con.writer;
+		writer.write("HTTP/1.1 200 OK\r\n");
+		writer.write("Access-Control-Allow-Origin: *\r\n");
+		writer.write("Cache-Control: no-cache\r\n");
+		writer.write("Content-Type: text/event-stream\r\n");
+		writer.write("\r\n");
+		writer.flush();
+
+		String url = request.url();
+		EventHandler handler;
+		synchronized(map) {
+			handler = map.get(url);
+			if( handler==null ) {
+				handler = new EventHandler();
+				map.put(url,handler);
+			}
+		}
+		handler.add(con);
+	}
+
+	public static void write( String url, String content ) {
+		EventHandler handler = map.get(url);
+		if( handler != null )
+			handler.write(url,content);
+	}
+
+	public static String toData(String message) {
+		if( message.endsWith("\n") )
+			message = message.substring( 0, message.length() - 1 );
+		return "data: " + message.replace( "\n", "\ndata: " ) + "\n";
+	}
+
+	public static void writeMessage( String url, String message ) {
+		write( url, toData(message) );
+	}
+
+	private ServerSentEvents() {}  // never
+}
diff -r 6c9aea554691 -r 9713f7fd50b3 src/luan/modules/http/Http.luan
--- a/src/luan/modules/http/Http.luan	Tue Oct 18 22:08:29 2022 -0600
+++ b/src/luan/modules/http/Http.luan	Thu Nov 03 19:23:53 2022 -0600
@@ -28,6 +28,7 @@
 local Response = require "java:goodjava.webserver.Response"
 local ResponseOutputStream = require "java:goodjava.webserver.ResponseOutputStream"
 local Status = require "java:goodjava.webserver.Status"
+local ServerSentEvents = require "java:goodjava.webserver.ServerSentEvents"
 local OutputStreamWriter = require "java:java.io.OutputStreamWriter"
 local HashMap = require "java:java.util.HashMap"
 local Logging = require "luan:logging/Logging.luan"
@@ -211,4 +212,6 @@
 	return time_format(date,"EEE, dd MMM yyyy HH:mm:ss z","GMT")
 end
 
+Http.push = ServerSentEvents.writeMessage  -- ( url, message )
+
 return Http
diff -r 6c9aea554691 -r 9713f7fd50b3 website/src/examples/sse_pull.html
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/website/src/examples/sse_pull.html	Thu Nov 03 19:23:53 2022 -0600
@@ -0,0 +1,17 @@
+<!doctype html>
+<html>
+	<head>
+		<style>
+			body {
+				white-space: pre;
+			}
+		</style>
+		<script>
+			let eventSource = new EventSource(location.origin+'/examples/sse_push.html');
+			eventSource.onmessage = function(event) {
+				document.body.textContent = event.data;
+			};
+		</script>
+	</head>
+	<body>pull server-sent events</body>
+</html>
diff -r 6c9aea554691 -r 9713f7fd50b3 website/src/examples/sse_push.html.luan
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/website/src/examples/sse_push.html.luan	Thu Nov 03 19:23:53 2022 -0600
@@ -0,0 +1,27 @@
+local Io = require "luan:Io.luan"
+local Http = require "luan:http/Http.luan"
+local Logging = require "luan:logging/Logging.luan"
+local logger = Logging.logger "init"
+
+
+return function()
+	local request = Http.request
+	local message = request.parameters.message
+	if message ~= nil then
+		local url = request.url()
+		Http.push(url,message)
+	end
+	Io.stdout = Http.response.text_writer()
+%>
+<!doctype html>
+<html>
+	<body>
+		<h1>push server-sent events</h1>
+		<form method=post>
+			<p><textarea name=message></textarea></p>
+			<p><input type=submit></p>
+		</form>
+	</body>
+</html>
+<%
+end