View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.maven.surefire.api.stream;
20  
21  import javax.annotation.Nonnegative;
22  import javax.annotation.Nonnull;
23  
24  import java.io.EOFException;
25  import java.io.File;
26  import java.io.IOException;
27  import java.nio.Buffer;
28  import java.nio.ByteBuffer;
29  import java.nio.CharBuffer;
30  import java.nio.channels.ReadableByteChannel;
31  import java.nio.charset.Charset;
32  import java.nio.charset.CharsetDecoder;
33  import java.nio.charset.CoderResult;
34  import java.util.ArrayList;
35  import java.util.List;
36  import java.util.Map;
37  
38  import org.apache.maven.plugin.surefire.log.api.ConsoleLogger;
39  import org.apache.maven.surefire.api.fork.ForkNodeArguments;
40  
41  import static java.lang.Math.max;
42  import static java.lang.Math.min;
43  import static java.nio.charset.CodingErrorAction.REPLACE;
44  import static java.nio.charset.StandardCharsets.US_ASCII;
45  import static java.util.Arrays.copyOf;
46  import static org.apache.maven.surefire.api.booter.Constants.DEFAULT_STREAM_ENCODING;
47  import static org.apache.maven.surefire.api.stream.AbstractStreamDecoder.StreamReadStatus.OVERFLOW;
48  import static org.apache.maven.surefire.api.stream.AbstractStreamDecoder.StreamReadStatus.UNDERFLOW;
49  import static org.apache.maven.surefire.shared.lang3.StringUtils.isBlank;
50  
51  /**
52   * @param <M> message object
53   * @param <MT> enum describing the meaning of the message
54   * @param <ST> enum for segment type
55   */
56  public abstract class AbstractStreamDecoder<M, MT extends Enum<MT>, ST extends Enum<ST>> implements AutoCloseable {
57      public static final int BUFFER_SIZE = 1024;
58  
59      private static final String PRINTABLE_JVM_NATIVE_STREAM = "Listening for transport dt_socket at address:";
60  
61      private static final String[] JVM_ERROR_PATTERNS = {
62          "could not create the java virtual machine",
63          "error occurred during initialization", // of VM, of boot layer
64          "error:", // general errors
65          "could not reserve enough space",
66          "could not allocate",
67          "unable to allocate", // memory errors
68          "java.lang.module.findexception" // JPMS errors
69      };
70  
71      private static final byte[] DEFAULT_STREAM_ENCODING_BYTES =
72              DEFAULT_STREAM_ENCODING.name().getBytes(US_ASCII);
73  
74      private static final int NO_POSITION = -1;
75      private static final int DELIMITER_LENGTH = 1;
76      private static final int BYTE_LENGTH = 1;
77      private static final int INT_LENGTH = 4;
78      private static final int LONG_LENGTH = 8;
79  
80      private final ReadableByteChannel channel;
81      private final ForkNodeArguments arguments;
82      private final Map<Segment, MT> messageTypes;
83      private final ConsoleLogger logger;
84  
85      protected AbstractStreamDecoder(
86              @Nonnull ReadableByteChannel channel,
87              @Nonnull ForkNodeArguments arguments,
88              @Nonnull Map<Segment, MT> messageTypes) {
89          this.channel = channel;
90          this.arguments = arguments;
91          this.messageTypes = messageTypes;
92          logger = arguments.getConsoleLogger();
93      }
94  
95      public abstract M decode(@Nonnull Memento memento) throws MalformedChannelException, IOException;
96  
97      @Nonnull
98      protected abstract byte[] getEncodedMagicNumber();
99  
100     @Nonnull
101     protected abstract ST[] nextSegmentType(@Nonnull MT messageType);
102 
103     @Nonnull
104     protected abstract M toMessage(@Nonnull MT messageType, @Nonnull Memento memento) throws MalformedFrameException;
105 
106     @Nonnull
107     protected final ForkNodeArguments getArguments() {
108         return arguments;
109     }
110 
111     protected void debugStream(byte[] array, int position, int remaining) {}
112 
113     protected MT readMessageType(@Nonnull Memento memento) throws IOException, MalformedFrameException {
114         byte[] header = getEncodedMagicNumber();
115         int readCount = DELIMITER_LENGTH + header.length + DELIMITER_LENGTH + BYTE_LENGTH + DELIMITER_LENGTH;
116         read(memento, readCount);
117         checkHeader(memento);
118         return messageTypes.get(readSegment(memento));
119     }
120 
121     @Nonnull
122     @SuppressWarnings("checkstyle:magicnumber")
123     protected Segment readSegment(@Nonnull Memento memento) throws IOException, MalformedFrameException {
124         int readCount = readByte(memento) & 0xff;
125         read(memento, readCount + DELIMITER_LENGTH);
126         ByteBuffer bb = memento.getByteBuffer();
127         Segment segment = new Segment(bb.array(), bb.arrayOffset() + ((Buffer) bb).position(), readCount);
128         ((Buffer) bb).position(((Buffer) bb).position() + readCount);
129         checkDelimiter(memento);
130         return segment;
131     }
132 
133     @Nonnull
134     @SuppressWarnings("checkstyle:magicnumber")
135     protected Charset readCharset(@Nonnull Memento memento) throws IOException, MalformedFrameException {
136         int length = readByte(memento) & 0xff;
137         read(memento, length + DELIMITER_LENGTH);
138         ByteBuffer bb = memento.getByteBuffer();
139         byte[] array = bb.array();
140         int offset = bb.arrayOffset() + ((Buffer) bb).position();
141         ((Buffer) bb).position(((Buffer) bb).position() + length);
142         boolean isDefaultEncoding = false;
143         if (length == DEFAULT_STREAM_ENCODING_BYTES.length) {
144             isDefaultEncoding = true;
145             for (int i = 0; i < length; i++) {
146                 isDefaultEncoding &= DEFAULT_STREAM_ENCODING_BYTES[i] == array[offset + i];
147             }
148         }
149 
150         try {
151             Charset charset = isDefaultEncoding
152                     ? DEFAULT_STREAM_ENCODING
153                     : Charset.forName(new String(array, offset, length, US_ASCII));
154 
155             checkDelimiter(memento);
156             return charset;
157         } catch (IllegalArgumentException e) {
158             throw new MalformedFrameException(memento.getLine().getPositionByteBuffer(), ((Buffer) bb).position());
159         }
160     }
161 
162     protected String readString(@Nonnull Memento memento) throws IOException, MalformedFrameException {
163         ((Buffer) memento.getCharBuffer()).clear();
164         int readCount = readInt(memento);
165         if (readCount < 0) {
166             throw new MalformedFrameException(
167                     memento.getLine().getPositionByteBuffer(), ((Buffer) memento.getByteBuffer()).position());
168         }
169         read(memento, readCount + DELIMITER_LENGTH);
170 
171         final String string;
172         if (readCount == 0) {
173             string = "";
174         } else if (readCount == 1) {
175             read(memento, 1);
176             byte oneChar = memento.getByteBuffer().get();
177             string = oneChar == 0 ? null : String.valueOf((char) oneChar);
178         } else {
179             string = readString(memento, readCount);
180         }
181         read(memento, 1);
182         checkDelimiter(memento);
183         return string;
184     }
185 
186     protected Integer readInteger(@Nonnull Memento memento) throws IOException, MalformedFrameException {
187         read(memento, BYTE_LENGTH);
188         boolean isNullObject = memento.getByteBuffer().get() == 0;
189         if (isNullObject) {
190             read(memento, DELIMITER_LENGTH);
191             checkDelimiter(memento);
192             return null;
193         }
194         return readInt(memento);
195     }
196 
197     protected byte readByte(@Nonnull Memento memento) throws IOException, MalformedFrameException {
198         read(memento, BYTE_LENGTH + DELIMITER_LENGTH);
199         byte b = memento.getByteBuffer().get();
200         checkDelimiter(memento);
201         return b;
202     }
203 
204     protected int readInt(@Nonnull Memento memento) throws IOException, MalformedFrameException {
205         read(memento, INT_LENGTH + DELIMITER_LENGTH);
206         int i = memento.getByteBuffer().getInt();
207         checkDelimiter(memento);
208         return i;
209     }
210 
211     protected Long readLong(@Nonnull Memento memento) throws IOException, MalformedFrameException {
212         read(memento, BYTE_LENGTH);
213         boolean isNullObject = memento.getByteBuffer().get() == 0;
214         if (isNullObject) {
215             read(memento, DELIMITER_LENGTH);
216             checkDelimiter(memento);
217             return null;
218         }
219         return readLongPrivate(memento);
220     }
221 
222     protected long readLongPrivate(@Nonnull Memento memento) throws IOException, MalformedFrameException {
223         read(memento, LONG_LENGTH + DELIMITER_LENGTH);
224         long num = memento.getByteBuffer().getLong();
225         checkDelimiter(memento);
226         return num;
227     }
228 
229     @SuppressWarnings("checkstyle:magicnumber")
230     protected final void checkDelimiter(Memento memento) throws MalformedFrameException {
231         ByteBuffer bb = memento.bb;
232         if ((0xff & bb.get()) != ':') {
233             throw new MalformedFrameException(memento.getLine().getPositionByteBuffer(), ((Buffer) bb).position());
234         }
235     }
236 
237     protected final void checkHeader(Memento memento) throws MalformedFrameException {
238         ByteBuffer bb = memento.bb;
239 
240         checkDelimiter(memento);
241 
242         int shift = 0;
243         try {
244             byte[] header = getEncodedMagicNumber();
245             byte[] bbArray = bb.array();
246             for (int start = bb.arrayOffset() + ((Buffer) bb).position(), length = header.length;
247                     shift < length;
248                     shift++) {
249                 if (bbArray[shift + start] != header[shift]) {
250                     throw new MalformedFrameException(
251                             memento.getLine().getPositionByteBuffer(), ((Buffer) bb).position() + shift);
252                 }
253             }
254         } finally {
255             ((Buffer) bb).position(((Buffer) bb).position() + shift);
256         }
257 
258         checkDelimiter(memento);
259     }
260 
261     protected void checkArguments(Memento memento, int expectedDataElements) throws MalformedFrameException {
262         if (memento.getData().size() != expectedDataElements) {
263             throw new MalformedFrameException(
264                     memento.getLine().getPositionByteBuffer(), ((Buffer) memento.getByteBuffer()).position());
265         }
266     }
267 
268     private String readString(@Nonnull final Memento memento, @Nonnegative final int totalBytes)
269             throws IOException, MalformedFrameException {
270         memento.getDecoder().reset();
271         final CharBuffer output = memento.getCharBuffer();
272         ((Buffer) output).clear();
273         final ByteBuffer input = memento.getByteBuffer();
274         final List<String> strings = new ArrayList<>();
275         int countDecodedBytes = 0;
276         for (boolean endOfInput = false; !endOfInput; ) {
277             final int bytesToRead = totalBytes - countDecodedBytes;
278             read(memento, bytesToRead);
279             int bytesToDecode = min(input.remaining(), bytesToRead);
280             final boolean isLastChunk = bytesToDecode == bytesToRead;
281             endOfInput = countDecodedBytes + bytesToDecode >= totalBytes;
282             do {
283                 boolean endOfChunk = output.remaining() >= bytesToRead;
284                 boolean endOfOutput = isLastChunk && endOfChunk;
285                 int readInputBytes = decodeString(
286                         memento.getDecoder(),
287                         input,
288                         output,
289                         bytesToDecode,
290                         endOfOutput,
291                         memento.getLine().getPositionByteBuffer());
292                 bytesToDecode -= readInputBytes;
293                 countDecodedBytes += readInputBytes;
294             } while (isLastChunk && bytesToDecode > 0 && output.hasRemaining());
295 
296             strings.add(((Buffer) output).flip().toString());
297             ((Buffer) output).clear();
298         }
299 
300         memento.getDecoder().reset();
301         ((Buffer) output).clear();
302 
303         return toString(strings);
304     }
305 
306     private static int decodeString(
307             @Nonnull CharsetDecoder decoder,
308             @Nonnull ByteBuffer input,
309             @Nonnull CharBuffer output,
310             @Nonnegative int bytesToDecode,
311             boolean endOfInput,
312             @Nonnegative int errorStreamFrom)
313             throws MalformedFrameException {
314         int limit = ((Buffer) input).limit();
315         ((Buffer) input).limit(((Buffer) input).position() + bytesToDecode);
316 
317         CoderResult result = decoder.decode(input, output, endOfInput);
318         if (result.isError() || result.isMalformed()) {
319             throw new MalformedFrameException(errorStreamFrom, ((Buffer) input).position());
320         }
321 
322         int decodedBytes = bytesToDecode - input.remaining();
323         ((Buffer) input).limit(limit);
324         return decodedBytes;
325     }
326 
327     private static String toString(List<String> strings) {
328         if (strings.size() == 1) {
329             return strings.get(0);
330         }
331         StringBuilder concatenated = new StringBuilder(strings.size() * BUFFER_SIZE);
332         for (String s : strings) {
333             concatenated.append(s);
334         }
335         return concatenated.toString();
336     }
337 
338     private void printCorruptedStream(Memento memento) {
339         ByteBuffer bb = memento.getByteBuffer();
340         if (bb.hasRemaining()) {
341             int bytesToWrite = bb.remaining();
342             memento.getLine().write(bb, ((Buffer) bb).position(), bytesToWrite);
343             ((Buffer) bb).position(((Buffer) bb).position() + bytesToWrite);
344         }
345     }
346 
347     /**
348      * Print the last string which has not been finished by a new line character.
349      *
350      * @param memento current memento object
351      */
352     protected final void printRemainingStream(Memento memento) {
353         printCorruptedStream(memento);
354         memento.getLine().printExistingLine();
355         memento.getLine().clear();
356     }
357 
358     /**
359      *
360      */
361     public static final class Segment {
362         private final byte[] array;
363         private final int fromIndex;
364         private final int length;
365         private final int hashCode;
366 
367         public Segment(byte[] array, int fromIndex, int length) {
368             this.array = array;
369             this.fromIndex = fromIndex;
370             this.length = length;
371 
372             int hashCode = 0;
373             int i = fromIndex;
374             for (int loops = length >> 1; loops-- != 0; ) {
375                 hashCode = 31 * hashCode + array[i++];
376                 hashCode = 31 * hashCode + array[i++];
377             }
378             this.hashCode = i == fromIndex + length ? hashCode : 31 * hashCode + array[i];
379         }
380 
381         @Override
382         public int hashCode() {
383             return hashCode;
384         }
385 
386         @Override
387         public boolean equals(Object obj) {
388             if (!(obj instanceof Segment)) {
389                 return false;
390             }
391 
392             Segment that = (Segment) obj;
393             if (that.length != length) {
394                 return false;
395             }
396 
397             for (int i = 0; i < length; i++) {
398                 if (that.array[that.fromIndex + i] != array[fromIndex + i]) {
399                     return false;
400                 }
401             }
402             return true;
403         }
404     }
405 
406     protected @Nonnull StreamReadStatus read(@Nonnull Memento memento, int recommendedCount) throws IOException {
407         ByteBuffer buffer = memento.getByteBuffer();
408         if (buffer.remaining() >= recommendedCount && ((Buffer) buffer).limit() != 0) {
409             return OVERFLOW;
410         } else {
411             if (((Buffer) buffer).position() != 0
412                     && recommendedCount > buffer.capacity() - ((Buffer) buffer).position()) {
413                 ((Buffer) buffer.compact()).flip();
414                 memento.getLine().setPositionByteBuffer(0);
415             }
416             int mark = ((Buffer) buffer).position();
417             ((Buffer) buffer).position(((Buffer) buffer).limit());
418             ((Buffer) buffer).limit(min(((Buffer) buffer).position() + recommendedCount, buffer.capacity()));
419             return read(buffer, mark, recommendedCount);
420         }
421     }
422 
423     private StreamReadStatus read(ByteBuffer buffer, int oldPosition, int recommendedCount) throws IOException {
424         StreamReadStatus readStatus = null;
425         boolean isEnd = false;
426         try {
427             while (!isEnd
428                     && ((Buffer) buffer).position() - oldPosition < recommendedCount
429                     && ((Buffer) buffer).position() < ((Buffer) buffer).limit()) {
430                 isEnd = channel.read(buffer) == -1;
431             }
432         } finally {
433             ((Buffer) buffer).limit(((Buffer) buffer).position());
434             ((Buffer) buffer).position(oldPosition);
435             int readBytes = buffer.remaining();
436             boolean readComplete = readBytes >= recommendedCount;
437             if (!isEnd || readComplete) {
438                 debugStream(buffer.array(), buffer.arrayOffset() + ((Buffer) buffer).position(), buffer.remaining());
439                 readStatus = readComplete ? OVERFLOW : UNDERFLOW;
440             }
441         }
442 
443         if (readStatus == null) {
444             throw new EOFException();
445         } else {
446             return readStatus;
447         }
448     }
449 
450     /**
451      *
452      */
453     public final class Memento {
454         private CharsetDecoder currentDecoder;
455         private final CharsetDecoder defaultDecoder;
456         private final BufferedStream line = new BufferedStream(32);
457         private final List<Object> data = new ArrayList<>();
458         private final CharBuffer cb = CharBuffer.allocate(BUFFER_SIZE);
459         private final ByteBuffer bb = ByteBuffer.allocate(BUFFER_SIZE);
460 
461         public Memento() {
462             defaultDecoder = DEFAULT_STREAM_ENCODING
463                     .newDecoder()
464                     .onMalformedInput(REPLACE)
465                     .onUnmappableCharacter(REPLACE);
466             ((Buffer) bb).limit(0);
467         }
468 
469         public void reset() {
470             currentDecoder = null;
471             data.clear();
472         }
473 
474         public CharsetDecoder getDecoder() {
475             return currentDecoder == null ? defaultDecoder : currentDecoder;
476         }
477 
478         public void setCharset(Charset charset) {
479             if (charset.name().equals(defaultDecoder.charset().name())) {
480                 currentDecoder = defaultDecoder;
481             } else {
482                 currentDecoder = charset.newDecoder().onMalformedInput(REPLACE).onUnmappableCharacter(REPLACE);
483             }
484         }
485 
486         public BufferedStream getLine() {
487             return line;
488         }
489 
490         public List<Object> getData() {
491             return data;
492         }
493 
494         public <T> T ofDataAt(int indexOfData) {
495             //noinspection unchecked
496             return (T) data.get(indexOfData);
497         }
498 
499         public CharBuffer getCharBuffer() {
500             return cb;
501         }
502 
503         public ByteBuffer getByteBuffer() {
504             return bb;
505         }
506     }
507 
508     /**
509      * This class avoids locking which gains the performance of this decoder.
510      */
511     public final class BufferedStream {
512         private byte[] buffer;
513         private int count;
514         private int positionByteBuffer;
515         private boolean isNewLine;
516 
517         BufferedStream(int capacity) {
518             this.buffer = new byte[capacity];
519         }
520 
521         public int getPositionByteBuffer() {
522             return positionByteBuffer;
523         }
524 
525         public void setPositionByteBuffer(int positionByteBuffer) {
526             this.positionByteBuffer = positionByteBuffer;
527         }
528 
529         public void write(ByteBuffer bb, int position, int length) {
530             ensureCapacity(length);
531             byte[] array = bb.array();
532             int pos = bb.arrayOffset() + position;
533             while (length-- > 0) {
534                 positionByteBuffer++;
535                 byte b = array[pos++];
536                 if (b == '\r' || b == '\n') {
537                     if (!isNewLine) {
538                         printExistingLine();
539                         count = 0;
540                     }
541                     isNewLine = true;
542                 } else {
543                     buffer[count++] = b;
544                     isNewLine = false;
545                 }
546             }
547         }
548 
549         public void clear() {
550             count = 0;
551         }
552 
553         @Override
554         public String toString() {
555             return new String(buffer, 0, count, DEFAULT_STREAM_ENCODING);
556         }
557 
558         private boolean isEmpty() {
559             return count == 0;
560         }
561 
562         private void ensureCapacity(int addCapacity) {
563             int oldCapacity = buffer.length;
564             int exactCapacity = count + addCapacity;
565             if (exactCapacity < 0) {
566                 throw new OutOfMemoryError();
567             }
568 
569             if (oldCapacity < exactCapacity) {
570                 int newCapacity = oldCapacity << 1;
571                 buffer = copyOf(buffer, max(newCapacity, exactCapacity));
572             }
573         }
574 
575         void printExistingLine() {
576             if (isEmpty()) {
577                 return;
578             }
579 
580             String s = toString();
581             if (isBlank(s)) {
582                 return;
583             }
584 
585             if (s.contains(PRINTABLE_JVM_NATIVE_STREAM)) {
586                 if (logger.isDebugEnabled()) {
587                     logger.debug(s);
588                 } else if (logger.isInfoEnabled()) {
589                     logger.info(s);
590                 } else {
591                     // In case of debugging forked JVM, see PRINTABLE_JVM_NATIVE_STREAM.
592                     System.out.println(s);
593                 }
594             } else {
595                 if (isJvmError(s)) {
596                     logger.error(s);
597                 } else if (logger.isDebugEnabled()) {
598                     logger.debug(s);
599                 }
600 
601                 String msg = "Corrupted channel by directly writing to native stream in forked JVM "
602                         + arguments.getForkChannelId() + ".";
603                 File dumpFile = arguments.dumpStreamText(msg + " Stream '" + s + "'.");
604                 String dumpPath = dumpFile.getAbsolutePath();
605                 arguments.logWarningAtEnd(msg + " See FAQ web page and the dump file " + dumpPath);
606             }
607         }
608 
609         private boolean isJvmError(String line) {
610             String lineLower = line.toLowerCase();
611             for (String errorPattern : JVM_ERROR_PATTERNS) {
612                 if (lineLower.contains(errorPattern)) {
613                     return true;
614                 }
615             }
616             return false;
617         }
618     }
619 
620     /**
621      *
622      */
623     public static final class MalformedFrameException extends Exception {
624         private final int readFrom;
625         private final int readTo;
626 
627         public MalformedFrameException(int readFrom, int readTo) {
628             this.readFrom = readFrom;
629             this.readTo = readTo;
630         }
631 
632         public int readFrom() {
633             return readFrom;
634         }
635 
636         public int readTo() {
637             return readTo;
638         }
639 
640         public boolean hasValidPositions() {
641             return readFrom != NO_POSITION && readTo != NO_POSITION && readTo - readFrom > 0;
642         }
643     }
644 
645     /**
646      * Underflow - could not completely read out al bytes in one call.
647      * <br>
648      * Overflow - read all bytes or more
649      * <br>
650      * EOF - end of stream
651      */
652     public enum StreamReadStatus {
653         UNDERFLOW,
654         OVERFLOW,
655         EOF
656     }
657 }