1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19 package org.apache.maven.surefire.booter.spi;
20
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23 import java.security.AccessControlException;
24 import java.security.PrivilegedAction;
25 import java.util.concurrent.ScheduledExecutorService;
26 import java.util.concurrent.atomic.AtomicLong;
27
28 import org.apache.maven.surefire.api.util.internal.WritableBufferedByteChannel;
29 import org.apache.maven.surefire.spi.MasterProcessChannelProcessorFactory;
30
31 import static java.security.AccessController.doPrivileged;
32 import static java.util.concurrent.Executors.newScheduledThreadPool;
33 import static java.util.concurrent.TimeUnit.MILLISECONDS;
34 import static org.apache.maven.surefire.api.util.internal.DaemonThreadFactory.newDaemonThreadFactory;
35
36 /**
37 * Default implementation of {@link MasterProcessChannelProcessorFactory}.
38 */
39 public abstract class AbstractMasterProcessChannelProcessorFactory implements MasterProcessChannelProcessorFactory {
40 private static final String STREAM_FLUSHER = "surefire-forkedjvm-stream-flusher";
41 private final ScheduledExecutorService flusher;
42
43 public AbstractMasterProcessChannelProcessorFactory() {
44 flusher = newScheduledThreadPool(1, newDaemonThreadFactory(STREAM_FLUSHER));
45 }
46
47 protected void schedulePeriodicFlusher(int delayInMillis, final WritableBufferedByteChannel channel) {
48 final AtomicLong bufferOverflows = new AtomicLong();
49 flusher.scheduleWithFixedDelay(
50 new Runnable() {
51 @Override
52 public void run() {
53 long currentBufferOverflows = channel.countBufferOverflows();
54 // optimization: flush the Channel only if the buffer has not overflew after last period of time
55 if (bufferOverflows.get() == currentBufferOverflows) {
56 try {
57 channel.write(ByteBuffer.allocate(0));
58 } catch (Exception e) {
59 // cannot do anything about this I/O issue
60 }
61 } else {
62 bufferOverflows.set(currentBufferOverflows);
63 }
64 }
65 },
66 0L,
67 delayInMillis,
68 MILLISECONDS);
69 }
70
71 @Override
72 public void close() throws IOException {
73 try {
74 doPrivileged(new PrivilegedAction<Object>() {
75 @Override
76 public Object run() {
77 flusher.shutdown();
78 // Do NOT call awaitTermination() due to this would unnecessarily prolong teardown
79 // time of the JVM. It is not a critical situation when the JXM exits this daemon
80 // thread because the interrupted flusher does not break any business function.
81 // All business data is already safely flushed by test events, then by sending BYE
82 // event at the exit time and finally by flushEventChannelOnExit() in ForkedBooter.
83 return null;
84 }
85 });
86 } catch (AccessControlException e) {
87 // ignore
88 }
89 }
90 }