View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.maven.index.updater;
20  
21  import java.io.BufferedInputStream;
22  import java.io.DataInput;
23  import java.io.DataInputStream;
24  import java.io.EOFException;
25  import java.io.File;
26  import java.io.IOException;
27  import java.io.InputStream;
28  import java.io.UTFDataFormatException;
29  import java.nio.file.Files;
30  import java.nio.file.Path;
31  import java.time.Duration;
32  import java.time.Instant;
33  import java.util.ArrayList;
34  import java.util.Collections;
35  import java.util.Date;
36  import java.util.HashSet;
37  import java.util.List;
38  import java.util.Objects;
39  import java.util.Set;
40  import java.util.concurrent.ArrayBlockingQueue;
41  import java.util.concurrent.ConcurrentHashMap;
42  import java.util.concurrent.ExecutorService;
43  import java.util.concurrent.Executors;
44  import java.util.concurrent.TimeUnit;
45  import java.util.concurrent.atomic.AtomicBoolean;
46  import java.util.zip.GZIPInputStream;
47  
48  import org.apache.lucene.document.Document;
49  import org.apache.lucene.document.Field;
50  import org.apache.lucene.document.FieldType;
51  import org.apache.lucene.index.IndexOptions;
52  import org.apache.lucene.index.IndexWriter;
53  import org.apache.lucene.index.IndexWriterConfig;
54  import org.apache.lucene.store.Directory;
55  import org.apache.lucene.store.FSDirectory;
56  import org.apache.maven.index.ArtifactInfo;
57  import org.apache.maven.index.context.DocumentFilter;
58  import org.apache.maven.index.context.IndexUtils;
59  import org.apache.maven.index.context.IndexingContext;
60  import org.apache.maven.index.context.NexusAnalyzer;
61  import org.apache.maven.index.context.NexusIndexWriter;
62  import org.slf4j.Logger;
63  import org.slf4j.LoggerFactory;
64  
65  /**
66   * An index data reader used to parse transfer index format.
67   *
68   * @author Eugene Kuleshov
69   */
70  public class IndexDataReader {
71      private static final Logger LOGGER = LoggerFactory.getLogger(IndexDataReader.class);
72  
73      private final DataInputStream dis;
74      private final Path tempStorage;
75      private final DocumentFilter filter;
76      private final FSDirectoryFactory factory;
77      private final int threads;
78  
79      public IndexDataReader(final InputStream is) throws IOException {
80          this(is, 1);
81      }
82  
83      public IndexDataReader(final InputStream is, final int threads) throws IOException {
84          this(is, null, null, null, threads);
85      }
86  
87      public IndexDataReader(final InputStream is, final IndexUpdateRequest request) throws IOException {
88          this(
89                  is,
90                  request.getIndexTempDir() != null ? request.getIndexTempDir().toPath() : null,
91                  request.getExtractionFilter(),
92                  request.getFSDirectoryFactory(),
93                  request.getThreads());
94      }
95  
96      public IndexDataReader(
97              final InputStream is,
98              final Path tempStorage,
99              final DocumentFilter filter,
100             final FSDirectoryFactory factory,
101             final int threads)
102             throws IOException {
103         if (threads < 1) {
104             throw new IllegalArgumentException("Reader threads must be greater than zero: " + threads);
105         }
106         this.tempStorage = Objects.requireNonNullElse(tempStorage, Path.of(System.getProperty("java.io.tmpdir")));
107         this.factory = Objects.requireNonNullElse(factory, FSDirectoryFactory.DEFAULT);
108         this.filter = filter;
109         this.threads = threads;
110 
111         // MINDEXER-13
112         // LightweightHttpWagon may have performed automatic decompression
113         // Handle it transparently
114         is.mark(2);
115         InputStream data;
116         if (is.read() == 0x1f && is.read() == 0x8b) // GZIPInputStream.GZIP_MAGIC
117         {
118             is.reset();
119             data = new BufferedInputStream(new GZIPInputStream(is, 1024 * 8), 1024 * 8);
120         } else {
121             is.reset();
122             data = new BufferedInputStream(is, 1024 * 8);
123         }
124 
125         this.dis = new DataInputStream(data);
126     }
127 
128     public IndexDataReadResult readIndex(IndexWriter w, IndexingContext context) throws IOException {
129         if (threads == 1) {
130             return readIndexST(w, context);
131         } else {
132             return readIndexMT(w, context);
133         }
134     }
135 
136     private IndexDataReadResult readIndexST(IndexWriter w, IndexingContext context) throws IOException {
137         LOGGER.debug("Reading ST index...");
138         Instant start = Instant.now();
139         long timestamp = readHeader();
140 
141         Date date = null;
142 
143         if (timestamp != -1) {
144             date = new Date(timestamp);
145 
146             IndexUtils.updateTimestamp(w.getDirectory(), date);
147         }
148 
149         int n = 0;
150 
151         Document doc;
152         Set<String> rootGroups = new HashSet<>();
153         Set<String> allGroups = new HashSet<>();
154 
155         while ((doc = readDocument()) != null) {
156             addToIndex(doc, context, w, rootGroups, allGroups);
157             n++;
158         }
159 
160         w.commit();
161 
162         IndexDataReadResult result = new IndexDataReadResult();
163         result.setDocumentCount(n);
164         result.setTimestamp(date);
165         result.setRootGroups(rootGroups);
166         result.setAllGroups(allGroups);
167 
168         LOGGER.debug(
169                 "Reading ST index done in {} sec",
170                 Duration.between(start, Instant.now()).getSeconds());
171         return result;
172     }
173 
174     private IndexDataReadResult readIndexMT(IndexWriter w, IndexingContext context) throws IOException {
175         LOGGER.debug("Reading MT index...");
176         Instant start = Instant.now();
177         long timestamp = readHeader();
178 
179         int n = 0;
180 
181         final Document theEnd = new Document();
182 
183         Set<String> rootGroups = ConcurrentHashMap.newKeySet();
184         Set<String> allGroups = ConcurrentHashMap.newKeySet();
185         ArrayBlockingQueue<Document> queue = new ArrayBlockingQueue<>(10000);
186 
187         ExecutorService executorService = Executors.newFixedThreadPool(threads);
188         List<Throwable> errors = Collections.synchronizedList(new ArrayList<>());
189         List<FSDirectory> siloDirectories = new ArrayList<>(threads);
190         List<IndexWriter> siloWriters = new ArrayList<>(threads);
191         AtomicBoolean stopEarly = new AtomicBoolean(false);
192         LOGGER.debug("Creating {} silo writer threads...", threads);
193         for (int i = 0; i < threads; i++) {
194             final int silo = i;
195             FSDirectory siloDirectory = tempDirectory("silo" + i);
196             siloDirectories.add(siloDirectory);
197             siloWriters.add(tempWriter(siloDirectory));
198             executorService.execute(() -> {
199                 LOGGER.debug("Starting thread {}", Thread.currentThread().getName());
200                 try {
201                     while (true) {
202                         try {
203                             Document doc = queue.take();
204                             if (doc == theEnd) {
205                                 break;
206                             }
207                             addToIndex(doc, context, siloWriters.get(silo), rootGroups, allGroups);
208                         } catch (Throwable e) {
209                             errors.add(e);
210                             if (stopEarly.compareAndSet(false, true)) {
211                                 queue.clear(); // unblock producer
212                                 executorService.shutdownNow(); // unblock consumers
213                             }
214                             break;
215                         }
216                     }
217                 } finally {
218                     LOGGER.debug("Done thread {}", Thread.currentThread().getName());
219                 }
220             });
221         }
222 
223         LOGGER.debug("Loading up documents into silos");
224         try {
225             Document doc;
226             while (!stopEarly.get() && (doc = readDocument()) != null) {
227                 queue.put(doc);
228                 n++;
229             }
230             LOGGER.debug("Signalling END");
231             for (int i = 0; i < threads; i++) {
232                 queue.put(theEnd);
233             }
234 
235             LOGGER.debug("Shutting down threads");
236             executorService.shutdown();
237             executorService.awaitTermination(5L, TimeUnit.MINUTES);
238         } catch (InterruptedException e) {
239             throw new IOException("Interrupted", e);
240         }
241 
242         if (!errors.isEmpty()) {
243             if (errors.stream().allMatch(ex -> ex instanceof IOException || ex instanceof InterruptedException)) {
244                 IOException exception = new IOException("Error during load of index");
245                 errors.forEach(exception::addSuppressed);
246                 throw exception;
247             } else {
248                 RuntimeException exception = new RuntimeException("Error during load of index");
249                 errors.forEach(exception::addSuppressed);
250                 throw exception;
251             }
252         }
253 
254         LOGGER.debug("Silos loaded...");
255         Date date = null;
256         if (timestamp != -1) {
257             date = new Date(timestamp);
258             IndexUtils.updateTimestamp(w.getDirectory(), date);
259         }
260 
261         LOGGER.debug("Closing silo writers...");
262         for (IndexWriter siloWriter : siloWriters) {
263             siloWriter.commit();
264             siloWriter.close();
265         }
266 
267         LOGGER.debug("Merging silo directories...");
268         w.addIndexes(siloDirectories.toArray(new Directory[0]));
269 
270         LOGGER.debug("Cleanup of silo directories...");
271         for (FSDirectory siloDirectory : siloDirectories) {
272             File dir = siloDirectory.getDirectory().toFile();
273             siloDirectory.close();
274             IndexUtils.delete(dir);
275         }
276 
277         LOGGER.debug("Finalizing...");
278         w.commit();
279 
280         IndexDataReadResult result = new IndexDataReadResult();
281         result.setDocumentCount(n);
282         result.setTimestamp(date);
283         result.setRootGroups(rootGroups);
284         result.setAllGroups(allGroups);
285 
286         LOGGER.debug(
287                 "Reading MT index done in {} sec",
288                 Duration.between(start, Instant.now()).getSeconds());
289         return result;
290     }
291 
292     private FSDirectory tempDirectory(final String name) throws IOException {
293         return factory.open(
294                 Files.createTempDirectory(tempStorage, name + ".dir").toFile());
295     }
296 
297     private IndexWriter tempWriter(final FSDirectory directory) throws IOException {
298         IndexWriterConfig config = new IndexWriterConfig(new NexusAnalyzer());
299         config.setUseCompoundFile(false);
300         return new NexusIndexWriter(directory, config);
301     }
302 
303     private void addToIndex(
304             final Document doc,
305             final IndexingContext context,
306             final IndexWriter indexWriter,
307             final Set<String> rootGroups,
308             final Set<String> allGroups)
309             throws IOException {
310         ArtifactInfo ai = IndexUtils.constructArtifactInfo(doc, context);
311         if (ai != null) {
312             if (filter == null || filter.accept(doc)) {
313                 indexWriter.addDocument(IndexUtils.updateDocument(doc, context, false, ai));
314                 rootGroups.add(ai.getRootGroup());
315                 allGroups.add(ai.getGroupId());
316             }
317         } else {
318             // these two fields are automatically handled in code above
319             if (doc.getField(ArtifactInfo.ALL_GROUPS) == null && doc.getField(ArtifactInfo.ROOT_GROUPS) == null) {
320                 indexWriter.addDocument(doc);
321             }
322         }
323     }
324 
325     public long readHeader() throws IOException {
326         final byte hdrbyte = (byte) ((IndexDataWriter.VERSION << 24) >> 24);
327 
328         if (hdrbyte != dis.readByte()) {
329             // data format version mismatch
330             throw new IOException("Provided input contains unexpected data (0x01 expected as 1st byte)!");
331         }
332 
333         return dis.readLong();
334     }
335 
336     public Document readDocument() throws IOException {
337         int fieldCount;
338         try {
339             fieldCount = dis.readInt();
340         } catch (EOFException ex) {
341             return null; // no more documents
342         }
343 
344         Document doc = new Document();
345 
346         for (int i = 0; i < fieldCount; i++) {
347             doc.add(readField());
348         }
349 
350         // Fix up UINFO field wrt MINDEXER-41
351         final Field uinfoField = (Field) doc.getField(ArtifactInfo.UINFO);
352         final String info = doc.get(ArtifactInfo.INFO);
353         if (uinfoField != null && info != null && !info.isEmpty()) {
354             String uinfoString = uinfoField.stringValue();
355             if (uinfoString.endsWith(ArtifactInfo.FS + ArtifactInfo.NA)) {
356                 int elem = 0;
357                 for (int i = -1; (i = info.indexOf(ArtifactInfo.FS, i + 1)) != -1; ) {
358                     if (++elem == 6) { // extension is field 6
359                         String extension = info.substring(i + 1);
360                         uinfoField.setStringValue(uinfoString + ArtifactInfo.FS + ArtifactInfo.nvl(extension));
361                         break;
362                     }
363                 }
364             }
365         }
366 
367         return doc;
368     }
369 
370     private Field readField() throws IOException {
371         int flags = dis.read();
372 
373         FieldType fieldType = new FieldType();
374         if ((flags & IndexDataWriter.F_INDEXED) > 0) {
375             boolean tokenized = (flags & IndexDataWriter.F_TOKENIZED) > 0;
376             fieldType.setTokenized(tokenized);
377         }
378         fieldType.setIndexOptions(IndexOptions.DOCS_AND_FREQS_AND_POSITIONS);
379         fieldType.setStored((flags & IndexDataWriter.F_STORED) > 0);
380 
381         String name = dis.readUTF();
382         String value = readUTF(dis);
383 
384         return new Field(name, value, fieldType);
385     }
386 
387     private static String readUTF(DataInput in) throws IOException {
388         int utflen = in.readInt();
389 
390         byte[] bytearr;
391         char[] chararr;
392 
393         try {
394             bytearr = new byte[utflen];
395             chararr = new char[utflen];
396         } catch (OutOfMemoryError e) {
397             throw new IOException(
398                     "Index data content is inappropriate (is junk?), leads to OutOfMemoryError!"
399                             + " See MINDEXER-28 for more information!",
400                     e);
401         }
402 
403         int c, char2, char3;
404         int count = 0;
405         int chararrCount = 0;
406 
407         in.readFully(bytearr, 0, utflen);
408 
409         while (count < utflen) {
410             c = bytearr[count] & 0xff;
411             if (c > 127) {
412                 break;
413             }
414             count++;
415             chararr[chararrCount++] = (char) c;
416         }
417 
418         while (count < utflen) {
419             c = bytearr[count] & 0xff;
420             switch (c >> 4) {
421                 case 0:
422                 case 1:
423                 case 2:
424                 case 3:
425                 case 4:
426                 case 5:
427                 case 6:
428                 case 7:
429                     /* 0xxxxxxx */
430                     count++;
431                     chararr[chararrCount++] = (char) c;
432                     break;
433 
434                 case 12:
435                 case 13:
436                     /* 110x xxxx 10xx xxxx */
437                     count += 2;
438                     if (count > utflen) {
439                         throw new UTFDataFormatException("malformed input: partial character at end");
440                     }
441                     char2 = bytearr[count - 1];
442                     if ((char2 & 0xC0) != 0x80) {
443                         throw new UTFDataFormatException("malformed input around byte " + count);
444                     }
445                     chararr[chararrCount++] = (char) (((c & 0x1F) << 6) | (char2 & 0x3F));
446                     break;
447 
448                 case 14:
449                     /* 1110 xxxx 10xx xxxx 10xx xxxx */
450                     count += 3;
451                     if (count > utflen) {
452                         throw new UTFDataFormatException("malformed input: partial character at end");
453                     }
454                     char2 = bytearr[count - 2];
455                     char3 = bytearr[count - 1];
456                     if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
457                         throw new UTFDataFormatException("malformed input around byte " + (count - 1));
458                     }
459                     chararr[chararrCount++] = (char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | ((char3 & 0x3F)));
460                     break;
461 
462                 default:
463                     /* 10xx xxxx, 1111 xxxx */
464                     throw new UTFDataFormatException("malformed input around byte " + count);
465             }
466         }
467 
468         // The number of chars produced may be less than utflen
469         return new String(chararr, 0, chararrCount);
470     }
471 
472     /**
473      * An index data read result holder
474      */
475     public static class IndexDataReadResult {
476         private Date timestamp;
477 
478         private int documentCount;
479 
480         private Set<String> rootGroups;
481 
482         private Set<String> allGroups;
483 
484         public void setDocumentCount(int documentCount) {
485             this.documentCount = documentCount;
486         }
487 
488         public int getDocumentCount() {
489             return documentCount;
490         }
491 
492         public void setTimestamp(Date timestamp) {
493             this.timestamp = timestamp;
494         }
495 
496         public Date getTimestamp() {
497             return timestamp;
498         }
499 
500         public void setRootGroups(Set<String> rootGroups) {
501             this.rootGroups = rootGroups;
502         }
503 
504         public Set<String> getRootGroups() {
505             return rootGroups;
506         }
507 
508         public void setAllGroups(Set<String> allGroups) {
509             this.allGroups = allGroups;
510         }
511 
512         public Set<String> getAllGroups() {
513             return allGroups;
514         }
515     }
516 
517     /**
518      * Reads index content by using a visitor. <br>
519      * The visitor is called for each read documents after it has been populated with Lucene fields.
520      *
521      * @param visitor an index data visitor
522      * @param context indexing context
523      * @return statistics about read data
524      * @throws IOException in case of an IO exception during index file access
525      */
526     public IndexDataReadResult readIndex(final IndexDataReadVisitor visitor, final IndexingContext context)
527             throws IOException {
528         dis.readByte(); // data format version
529 
530         long timestamp = dis.readLong();
531 
532         Date date = null;
533 
534         if (timestamp != -1) {
535             date = new Date(timestamp);
536         }
537 
538         int n = 0;
539 
540         Document doc;
541         while ((doc = readDocument()) != null) {
542             visitor.visitDocument(IndexUtils.updateDocument(doc, context, false));
543 
544             n++;
545         }
546 
547         IndexDataReadResult result = new IndexDataReadResult();
548         result.setDocumentCount(n);
549         result.setTimestamp(date);
550         return result;
551     }
552 
553     /**
554      * Visitor of indexed Lucene documents.
555      */
556     public interface IndexDataReadVisitor {
557 
558         /**
559          * Called on each read document. The document is already populated with fields.
560          *
561          * @param document read document
562          */
563         void visitDocument(Document document);
564     }
565 }