View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.maven.plugin.surefire.extensions;
20  
21  import javax.annotation.Nonnull;
22  
23  import java.io.Closeable;
24  import java.io.File;
25  import java.nio.Buffer;
26  import java.nio.ByteBuffer;
27  import java.nio.channels.ReadableByteChannel;
28  import java.util.concurrent.atomic.AtomicInteger;
29  
30  import org.apache.maven.plugin.surefire.log.api.ConsoleLogger;
31  import org.apache.maven.surefire.api.event.Event;
32  import org.apache.maven.surefire.api.fork.ForkNodeArguments;
33  import org.apache.maven.surefire.extensions.EventHandler;
34  import org.apache.maven.surefire.extensions.util.CountdownCloseable;
35  import org.junit.Test;
36  
37  import static java.lang.Math.min;
38  import static java.nio.charset.StandardCharsets.UTF_8;
39  import static java.util.Arrays.copyOfRange;
40  import static java.util.concurrent.TimeUnit.SECONDS;
41  import static org.assertj.core.api.Assertions.assertThat;
42  
43  /**
44   *
45   */
46  public class EventConsumerThreadTest {
47      @SuppressWarnings("checkstyle:magicnumber")
48      @Test(timeout = 60_000L)
49      public void performanceTest() throws Exception {
50          final long[] staredAt = {0};
51          final long[] finishedAt = {0};
52          final AtomicInteger calls = new AtomicInteger();
53          final int totalCalls = 1_000_000; // 400_000; // 1_000_000; // 10_000_000;
54  
55          EventHandler<Event> handler = new EventHandler<Event>() {
56              @Override
57              public void handleEvent(@Nonnull Event event) {
58                  if (staredAt[0] == 0) {
59                      staredAt[0] = System.currentTimeMillis();
60                  }
61  
62                  if (calls.incrementAndGet() == totalCalls) {
63                      finishedAt[0] = System.currentTimeMillis();
64                  }
65              }
66          };
67  
68          final ByteBuffer event = ByteBuffer.allocate(192);
69          event.put(":maven-surefire-event:".getBytes(UTF_8));
70          event.put((byte) 14);
71          event.put(":std-out-stream:".getBytes(UTF_8));
72          event.put((byte) 10);
73          event.put(":normal-run:".getBytes(UTF_8));
74          event.put("\u0001\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0001:".getBytes(UTF_8));
75          event.put((byte) 5);
76          event.put(":UTF-8:".getBytes(UTF_8));
77          event.putInt(100);
78          event.put(
79                  ":0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789:"
80                          .getBytes(UTF_8));
81  
82          ((Buffer) event).flip();
83          byte[] frame = copyOfRange(event.array(), event.arrayOffset(), event.arrayOffset() + event.remaining());
84          ReadableByteChannel channel = new Channel(frame, 100) {
85              private int countRounds;
86  
87              @Override
88              public synchronized int read(ByteBuffer dst) {
89                  if (countRounds == totalCalls) {
90                      return -1;
91                  }
92  
93                  if (remaining() == 0) {
94                      countRounds++;
95                      i = 0;
96                  }
97  
98                  return super.read(dst);
99              }
100         };
101 
102         EventConsumerThread thread = new EventConsumerThread(
103                 "t", channel, handler, new CountdownCloseable(new MockCloseable(), 1), new MockForkNodeArguments());
104 
105         SECONDS.sleep(2);
106         System.gc();
107         SECONDS.sleep(5);
108 
109         System.out.println("Starting the event thread...");
110 
111         thread.start();
112         thread.join();
113 
114         long execTime = finishedAt[0] - staredAt[0];
115         System.out.println(execTime);
116 
117         // 0.6 seconds while using the encoder/decoder for 10 million messages
118         assertThat(execTime)
119                 .describedAs("The performance test should assert 0.75s of read time. "
120                         + "The limit 3.65s guarantees that the read time does not exceed this limit on overloaded CPU.")
121                 .isPositive()
122                 .isLessThanOrEqualTo(3_650L);
123     }
124 
125     private static class Channel implements ReadableByteChannel {
126         private final byte[] bytes;
127         private final int chunkSize;
128         protected int i;
129 
130         Channel(byte[] bytes, int chunkSize) {
131             this.bytes = bytes;
132             this.chunkSize = chunkSize;
133         }
134 
135         @Override
136         public int read(ByteBuffer dst) {
137             if (remaining() == 0) {
138                 return -1;
139             } else if (dst.hasRemaining()) {
140                 int length = min(min(chunkSize, remaining()), dst.remaining());
141                 dst.put(bytes, i, length);
142                 i += length;
143                 return length;
144             } else {
145                 return 0;
146             }
147         }
148 
149         protected final int remaining() {
150             return bytes.length - i;
151         }
152 
153         @Override
154         public boolean isOpen() {
155             return false;
156         }
157 
158         @Override
159         public void close() {}
160     }
161 
162     private static class MockCloseable implements Closeable {
163         @Override
164         public void close() {}
165     }
166 
167     private static class MockForkNodeArguments implements ForkNodeArguments {
168         @Nonnull
169         @Override
170         public String getSessionId() {
171             return null;
172         }
173 
174         @Override
175         public int getForkChannelId() {
176             return 0;
177         }
178 
179         @Nonnull
180         @Override
181         public File dumpStreamText(@Nonnull String text) {
182             return null;
183         }
184 
185         @Nonnull
186         @Override
187         public File dumpStreamException(@Nonnull Throwable t) {
188             return null;
189         }
190 
191         @Override
192         public void logWarningAtEnd(@Nonnull String text) {}
193 
194         @Nonnull
195         @Override
196         public ConsoleLogger getConsoleLogger() {
197             return null;
198         }
199 
200         @Nonnull
201         @Override
202         public Object getConsoleLock() {
203             return new Object();
204         }
205 
206         @Override
207         public File getEventStreamBinaryFile() {
208             return null;
209         }
210 
211         @Override
212         public File getCommandStreamBinaryFile() {
213             return null;
214         }
215     }
216 }