jetty 9.3.0.M2を使ってHTTP2の通信を試す (サーバ側のフレーム受信をハンドリング)

masakiが2015/07/06 10:06:03に投稿

jetty 9.3.0.M2を使ってHTTP/2の通信を試す (サーバ側のフレーム受信をハンドリング)

前回に引き続きjetty 9.3.0.M2を使ってサンプルプログラムを作成しHTTP/2の通信を試す。

今回の確認する内容

  • サーバプッシュは、契機となったレスポンスとは別スレッドで動作すること
  • サーバ側のフレーム受信をハンドリング方法

サンプルプログラム

前回のソースを以下の通り改変した。

サーバ側

public class App
{
    private Server server;
    private ServerConnector connector;

    public static void main( String[] args ) throws Exception
    {
       new App().start();
    }

    private void start() throws Exception
    {
        ConnectionFactory connectionFactory = new MyHTTP2CServerConnectionFactory(new HttpConfiguration());

        server = new Server();
        connector = new ServerConnector(server, 1,1, connectionFactory);
        connector.setPort(5443);
        server.addConnector(connector);
        ServletContextHandler context = new ServletContextHandler(server, "/", true, false);

        HttpServlet servlet = new HttpServlet()
        {
            @Override
            protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
            {
                System.out.println("service() thread: " + Thread.currentThread().getId());
                Request jettyRequest = (Request) request;
                if (jettyRequest.getRequestURI().equals("/") && jettyRequest.isPushSupported()) {
                    jettyRequest.getPushBuilder()
                                .path("/push")
                                .push();
                }

                if (jettyRequest.getRequestURI().equals("/")) {
                  final byte[] content = "<html>hello world</html>".getBytes(StandardCharsets.UTF_8);
                  response.getOutputStream().write(content);
                  System.out.println("response helloworld thread: " + Thread.currentThread().getId());
                } else if (jettyRequest.getRequestURI().equals("/push")) {
                  try {
                    Thread.sleep(1000);
                  } catch (InterruptedException e) {}
                  final byte[] content = "<html>push</html>".getBytes(StandardCharsets.UTF_8);
                  response.getOutputStream().write(content);
                  System.out.println("response push thread: " + Thread.currentThread().getId());
                } else if (jettyRequest.getRequestURI().equals("/clientSendData")) {
                  System.out.println("response clientSendData thread: " + Thread.currentThread().getId());
                }
            }
        };

        context.addServlet(new ServletHolder(servlet), "/");
        server.start();

   }
}

public class MyHTTP2CServerConnectionFactory extends HTTP2CServerConnectionFactory
{

    public MyHTTP2CServerConnectionFactory(@Name("config") HttpConfiguration httpConfiguration)
    {
        super(httpConfiguration);
    }


    protected ServerSessionListener newSessionListener(Connector connector, EndPoint endPoint)
    {
        return new MyHTTP2CServerConnectionFactory.MyHTTPServerSessionListener(connector, endPoint);
    }

    private class MyHTTPServerSessionListener extends ServerSessionListener.Adapter implements Stream.Listener
    {
        private final Connector connector;
        private final EndPoint endPoint;

        public MyHTTPServerSessionListener(Connector connector, EndPoint endPoint)
        {
            this.connector = connector;
            this.endPoint = endPoint;
        }

        @Override
        public Map<Integer, Integer> onPreface(Session session)
        {
            Map<Integer, Integer> settings = new HashMap<>();
            settings.put(SettingsFrame.HEADER_TABLE_SIZE, getMaxDynamicTableSize());
            settings.put(SettingsFrame.INITIAL_WINDOW_SIZE, getInitialStreamSendWindow());
            int maxConcurrentStreams = getMaxConcurrentStreams();
            if (maxConcurrentStreams >= 0)
                settings.put(SettingsFrame.MAX_CONCURRENT_STREAMS, maxConcurrentStreams);
            return settings;
        }

        @Override
        public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
        {
            System.out.println("onNewStream thread: " + Thread.currentThread().getId() + " stream:" + stream.getId());
            ((HTTP2ServerConnection)endPoint.getConnection()).onNewStream(connector, (IStream)stream, frame);
            return frame.isEndStream() ? null : this;
        }

        @Override
        public void onHeaders(Stream stream, HeadersFrame frame)
        {
            System.out.println("onHeaders");
            // Servers do not receive responses.
            close(stream, "response_headers");
        }

        @Override
        public Stream.Listener onPush(Stream stream, PushPromiseFrame frame)
        {
            System.out.println("onPush");
            // Servers do not receive pushes.
            close(stream, "push_promise");
            return null;
        }

        @Override
        public void onData(Stream stream, DataFrame frame, Callback callback)
        {
            byte[] bytes = new byte[frame.getData().remaining()];
            frame.getData().get(bytes);
            System.out.println("onData thread: " + Thread.currentThread().getId() + " stream:" + stream.getId() + " DATA:" + new String(bytes));

            HttpChannelOverHTTP2 channel = (HttpChannelOverHTTP2)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE);
            channel.requestContent(frame, callback);
        }

        @Override
        public void onReset(Stream stream, ResetFrame frame)
        {
            // TODO:
        }

        @Override
        public void onTimeout(Stream stream, Throwable x)
        {
            // TODO
        }

        private void close(Stream stream, String reason)
        {

            final Session session = stream.getSession();
            session.close(ErrorCode.PROTOCOL_ERROR.code, reason, Callback.Adapter.INSTANCE);
        }
    }
}

「/」のレスポンスとサーバープッシュが別スレッドになっていることを確認するためにsleepが入っている。
サーバ側のフレーム受信をハンドリングするためにはHTTP2ServerConnectionFactory内のHTTPServerSessionListenerに手を入れる必要がある。
しかし、HTTP2ServerConnectionFactory.MyHTTPServerSessionListenerは(jetty 9.3.0M2時点では)privateな入れ子クラスで継承できないため、HTTP2ServerConnectionFactoryの内容をコピペしている。

クライアント側

public class App 
{
    public static void main( String[] args ) throws Exception
    {    	
        HTTP2Client client = new HTTP2Client();
        client.start();

        String host = "192.168.1.55";
        int port = 5443;
        
        FuturePromise<Session> sessionPromise = new FuturePromise<>();
        client.connect(new InetSocketAddress(host, port), new ServerSessionListener.Adapter(), sessionPromise);
        Session session = sessionPromise.get(5, TimeUnit.SECONDS);

        HttpFields requestFields = new HttpFields();
        requestFields.put("User-Agent", client.getClass().getName() + "/" + Jetty.VERSION);
        
        
        App app = new App();
        app.getAndServerPushed(host, port, session, requestFields);
        app.getAndSendData(host, port, session, requestFields);
       
        client.stop();
    }
    
    public void getAndServerPushed(String host, int port, Session session, HttpFields requestFields)
    		                                      throws InterruptedException, ExecutionException, TimeoutException {
    	MetaData.Request metaData 
    	        = new MetaData.Request("GET", new HttpURI("http://" + host + ":" + port + "/"), HttpVersion.HTTP_2, requestFields);
        HeadersFrame headersFrame = new HeadersFrame(0, metaData, null, true);
        final Phaser phaser = new Phaser(2);
        FuturePromise<Stream> promise = new FuturePromise<>();
        session.newStream(headersFrame, promise, new Stream.Listener.Adapter()
        {
            @Override
            public void onHeaders(Stream stream, HeadersFrame frame)
            {
            	
            	System.out.println("HEADERS " + frame.getMetaData().toString() + " stream:" + stream.getId());
                if (frame.isEndStream())
                    phaser.arrive();
            }

            @Override
            public void onData(Stream stream, DataFrame frame, Callback callback)
            {
                byte[] bytes = new byte[frame.getData().remaining()];
                frame.getData().get(bytes);
                System.out.println("DATA " + new String(bytes) + " stream:" + stream.getId());
                callback.succeeded();
                if (frame.isEndStream())
                    phaser.arrive();
            }

            @Override
            public Stream.Listener onPush(Stream stream, PushPromiseFrame frame)
            {
            	System.out.println("PUSH_PROMISE " + frame.getMetaData().toString() + " stream:" + stream.getId());
                phaser.register();
                return this;
            }
        });
        Stream stream = promise.get(5, TimeUnit.SECONDS);
        System.out.println(metaData.getMethod() + " URI:" + metaData.getURIString() + " stream:" + stream.getId());
        phaser.awaitAdvanceInterruptibly(phaser.arrive(), 5, TimeUnit.SECONDS);
    }
    
    public void getAndSendData(String host, int port, Session session, HttpFields requestFields) 
    		                                                       throws InterruptedException, ExecutionException, TimeoutException {
    	 final Phaser phaser = new Phaser(1);
         FuturePromise<Stream> promise = new FuturePromise<>();
         MetaData.Request metaData 
             = new MetaData.Request("GET", new HttpURI("http://" + host + ":" + port + "/clientSendData"), HttpVersion.HTTP_2, requestFields);
         System.out.println();
         session.newStream(new HeadersFrame(0, metaData, null, false), promise, new Stream.Listener.Adapter());
         Stream stream = promise.get(5, TimeUnit.SECONDS);
         
         stream.data(new DataFrame(stream.getId(),
        		 ByteBuffer.wrap("<html>client data</html>".getBytes(StandardCharsets.UTF_8)), false), Callback.Adapter.INSTANCE);
         System.out.println(metaData.getMethod() + " URI:" + metaData.getURIString() + " stream:" + stream.getId());
         
         phaser.arriveAndAwaitAdvance();
    }
}

App.getAndServerPushed()は前回と同じ内容である。
App.getAndSendData()は、「/clientSendData」をGETリクエストし、クライアントからサーバにデータ「<html>client data</html>」を送信している。

クライアント側実行結果

2015-06-16 14:48:24.586:INFO::jp.co.linkode.http2.client.example.App.main(): Logging initialized @3665ms
GET URIhttp://192.168.1.55:5443/ stream:1
PUSH_PROMISE GET{uhttp://192.168.1.55:5443/push,HTTP/2.0,h=3} stream:2
HEADERS HTTP/2.0{s=200,h=4} stream:1
DATA <html>hello world</html> stream:1
HEADERS HTTP/2.0{s=200,h=1} stream:2
DATA <html>push</html> stream:2

GET URIhttp://192.168.1.55:5443/clientSendData stream:3

サーバ側実行結果 クライアント接続後

onNewStream thread: 11 stream:1
service() thread: 11
service() thread: 15
response helloworld thread: 11
response push thread: 15
onNewStream thread: 17 stream:3
service() thread: 17
response clientSendData thread: 17
onData thread: 14 stream:3 DATA:<html>client data</html>

前回の結果と異なり、「/push」のレスポンスが「/」のレスポンスより遅くなっていることがわかる。
また、クライアントから送ったデータがサーバに届いていることがわかる。
サーバ側の出力により
Stream.Listener.Adapter.onNewStream()
Stream.Listener.Adapter.onData()
をハンドリングできていることがわかるが
Stream.Listener.Adapter.onHeaders()
の出力がない。
なぜ出力がないのかは未調査。(jetty 9.3.0.M2では実装未対応?)

上記サンプルプログラムの通信内容をwireshark(version:1.99.6-676-gdc 14e3c)でのぞいた結果は以下の通り。

HTTP2フレームシーケンス図としてまとめた結果は以下の通り。

今回のテーマとは外れるが、この通信内容を見るとSETTINGSフレームによるネゴシエーションが完了する前にクライアントからのリクエストが行われていることがわかる。
必ずしもSETTINGSネゴシエーションが完了してからクライアントのリクエストが始まるわけではないらしい。