View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.maven.surefire.api.util.internal;
20  
21  import java.io.BufferedInputStream;
22  import java.io.BufferedOutputStream;
23  import java.io.IOException;
24  import java.io.InputStream;
25  import java.io.OutputStream;
26  import java.net.InetAddress;
27  import java.net.InetSocketAddress;
28  import java.net.SocketOption;
29  import java.nio.channels.AsynchronousChannelGroup;
30  import java.nio.channels.AsynchronousServerSocketChannel;
31  import java.nio.channels.AsynchronousSocketChannel;
32  import java.nio.charset.StandardCharsets;
33  import java.util.ArrayList;
34  import java.util.List;
35  import java.util.concurrent.CountDownLatch;
36  import java.util.concurrent.ExecutorService;
37  import java.util.concurrent.Executors;
38  import java.util.concurrent.Future;
39  import java.util.concurrent.ThreadFactory;
40  import java.util.concurrent.ThreadPoolExecutor;
41  import java.util.concurrent.TimeUnit;
42  import java.util.concurrent.atomic.AtomicLong;
43  
44  import org.junit.Test;
45  
46  import static java.net.StandardSocketOptions.SO_KEEPALIVE;
47  import static java.net.StandardSocketOptions.SO_REUSEADDR;
48  import static java.net.StandardSocketOptions.TCP_NODELAY;
49  import static org.apache.maven.surefire.api.util.internal.Channels.newInputStream;
50  import static org.apache.maven.surefire.api.util.internal.Channels.newOutputStream;
51  import static org.assertj.core.api.Assertions.assertThat;
52  
53  /**
54   * Low level Java benchmark test.
55   */
56  @SuppressWarnings("checkstyle:magicnumber")
57  public class AsyncSocketTest {
58      private static final String LONG_STRING =
59              "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789";
60  
61      private final CountDownLatch barrier = new CountDownLatch(1);
62      private final AtomicLong writeTime = new AtomicLong();
63      private final AtomicLong readTime = new AtomicLong();
64  
65      private volatile InetSocketAddress address;
66  
67      @Test(timeout = 10_000L)
68      public void test() throws Exception {
69          int forks = 2;
70          ThreadFactory factory = DaemonThreadFactory.newDaemonThreadFactory();
71          ExecutorService executorService = Executors.newCachedThreadPool(factory);
72          if (executorService instanceof ThreadPoolExecutor) {
73              ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executorService;
74              threadPoolExecutor.setCorePoolSize(
75                      Math.min(forks, Runtime.getRuntime().availableProcessors()));
76              threadPoolExecutor.prestartCoreThread();
77          }
78          AsynchronousChannelGroup group = AsynchronousChannelGroup.withThreadPool(executorService);
79          AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open(group);
80          setTrueOptions(server, SO_REUSEADDR, TCP_NODELAY, SO_KEEPALIVE);
81          InetAddress ip = InetAddress.getLoopbackAddress();
82          server.bind(new InetSocketAddress(ip, 0), 1);
83          address = (InetSocketAddress) server.getLocalAddress();
84  
85          System.gc();
86          TimeUnit.SECONDS.sleep(3L);
87  
88          Thread tc = new Thread() {
89              @Override
90              public void run() {
91                  try {
92                      client();
93                  } catch (Exception e) {
94                      e.printStackTrace();
95                  }
96              }
97          };
98          tc.setDaemon(true);
99          tc.start();
100 
101         Future<AsynchronousSocketChannel> acceptFuture = server.accept();
102         AsynchronousSocketChannel worker = acceptFuture.get();
103         if (!worker.isOpen()) {
104             throw new IOException("client socket closed");
105         }
106         final InputStream is = newInputStream(worker);
107         final OutputStream os = new BufferedOutputStream(newOutputStream(worker), 64 * 1024);
108 
109         Thread tt = new Thread() {
110             public void run() {
111                 try {
112                     byte[] b = new byte[1024];
113                     is.read(b);
114                 } catch (Exception e) {
115                     // e.printStackTrace();
116                 }
117             }
118         };
119         tt.setName("fork-1-event-thread-");
120         tt.setDaemon(true);
121         tt.start();
122 
123         Thread t = new Thread() {
124             @SuppressWarnings("checkstyle:magicnumber")
125             public void run() {
126                 try {
127                     byte[] data = LONG_STRING.getBytes(StandardCharsets.US_ASCII);
128                     long t1 = System.currentTimeMillis();
129                     int i = 0;
130                     for (; i < 320_000; i++) {
131                         os.write(data);
132                         long t2 = System.currentTimeMillis();
133                         long spent = t2 - t1;
134 
135                         if (i % 100_000 == 0) {
136                             System.out.println(spent + "ms: " + i);
137                         }
138                     }
139                     os.flush();
140                     long spent = System.currentTimeMillis() - t1;
141                     writeTime.set(spent);
142                     System.out.println(spent + "ms: " + i);
143                 } catch (IOException e) {
144                     e.printStackTrace();
145                 }
146             }
147         };
148         t.setName("commands-fork-1");
149         t.setDaemon(true);
150         t.start();
151 
152         barrier.await();
153         tt.join();
154         t.join();
155         tc.join();
156         worker.close();
157         server.close();
158 
159         // 160 millis on write using the asynchronous sockets
160         // 320 millis on NIO blocking sockets
161         assertThat(writeTime.get()).isLessThan(1000L);
162 
163         // 160 millis on read using the asynchronous sockets
164         // 320 millis on NIO blocking sockets
165         assertThat(readTime.get()).isLessThan(1000L);
166     }
167 
168     @SuppressWarnings("checkstyle:magicnumber")
169     private void client() throws Exception {
170         InetSocketAddress hostAddress = new InetSocketAddress(InetAddress.getLoopbackAddress(), address.getPort());
171         AsynchronousSocketChannel clientSocketChannel = AsynchronousSocketChannel.open();
172         clientSocketChannel.connect(hostAddress).get(); // Wait until connection is done.
173         InputStream is = new BufferedInputStream(newInputStream(clientSocketChannel), 64 * 1024);
174         List<byte[]> bytes = new ArrayList<>();
175         long t1 = System.currentTimeMillis();
176         for (int i = 0; i < 320_000; i++) {
177             byte[] b = new byte[100];
178             is.read(b);
179             bytes.add(b);
180         }
181         long t2 = System.currentTimeMillis();
182         long spent = t2 - t1;
183         readTime.set(spent);
184         System.out.println(new String(bytes.get(bytes.size() - 1)));
185         System.out.println("received within " + spent + "ms");
186         clientSocketChannel.close();
187         barrier.countDown();
188     }
189 
190     @SafeVarargs
191     private static void setTrueOptions(AsynchronousServerSocketChannel server, SocketOption<Boolean>... options)
192             throws IOException {
193         for (SocketOption<Boolean> option : options) {
194             if (server.supportedOptions().contains(option)) {
195                 server.setOption(option, true);
196             }
197         }
198     }
199 }