View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.maven.index.updater;
20  
21  import java.io.BufferedInputStream;
22  import java.io.DataInput;
23  import java.io.DataInputStream;
24  import java.io.EOFException;
25  import java.io.File;
26  import java.io.IOException;
27  import java.io.InputStream;
28  import java.io.UTFDataFormatException;
29  import java.nio.file.Files;
30  import java.time.Duration;
31  import java.time.Instant;
32  import java.util.ArrayList;
33  import java.util.Date;
34  import java.util.HashSet;
35  import java.util.Set;
36  import java.util.concurrent.ArrayBlockingQueue;
37  import java.util.concurrent.ConcurrentHashMap;
38  import java.util.concurrent.ExecutorService;
39  import java.util.concurrent.Executors;
40  import java.util.concurrent.TimeUnit;
41  import java.util.zip.GZIPInputStream;
42  
43  import org.apache.lucene.document.Document;
44  import org.apache.lucene.document.Field;
45  import org.apache.lucene.document.FieldType;
46  import org.apache.lucene.index.IndexOptions;
47  import org.apache.lucene.index.IndexWriter;
48  import org.apache.lucene.index.IndexWriterConfig;
49  import org.apache.lucene.store.Directory;
50  import org.apache.lucene.store.FSDirectory;
51  import org.apache.maven.index.ArtifactInfo;
52  import org.apache.maven.index.context.IndexUtils;
53  import org.apache.maven.index.context.IndexingContext;
54  import org.apache.maven.index.context.NexusAnalyzer;
55  import org.apache.maven.index.context.NexusIndexWriter;
56  import org.slf4j.Logger;
57  import org.slf4j.LoggerFactory;
58  
59  /**
60   * An index data reader used to parse transfer index format.
61   *
62   * @author Eugene Kuleshov
63   */
64  public class IndexDataReader {
65      private static final Logger LOGGER = LoggerFactory.getLogger(IndexDataReader.class);
66  
67      private final DataInputStream dis;
68  
69      private final int threads;
70  
71      public IndexDataReader(final InputStream is) throws IOException {
72          this(is, 1);
73      }
74  
75      public IndexDataReader(final InputStream is, final int threads) throws IOException {
76          if (threads < 1) {
77              throw new IllegalArgumentException("Reader threads must be greater than zero: " + threads);
78          }
79          this.threads = threads;
80          // MINDEXER-13
81          // LightweightHttpWagon may have performed automatic decompression
82          // Handle it transparently
83          is.mark(2);
84          InputStream data;
85          if (is.read() == 0x1f && is.read() == 0x8b) // GZIPInputStream.GZIP_MAGIC
86          {
87              is.reset();
88              data = new BufferedInputStream(new GZIPInputStream(is, 1024 * 8), 1024 * 8);
89          } else {
90              is.reset();
91              data = new BufferedInputStream(is, 1024 * 8);
92          }
93  
94          this.dis = new DataInputStream(data);
95      }
96  
97      public IndexDataReadResult readIndex(IndexWriter w, IndexingContext context) throws IOException {
98          if (threads == 1) {
99              return readIndexST(w, context);
100         } else {
101             return readIndexMT(w, context);
102         }
103     }
104 
105     private IndexDataReadResult readIndexST(IndexWriter w, IndexingContext context) throws IOException {
106         LOGGER.debug("Reading ST index...");
107         Instant start = Instant.now();
108         long timestamp = readHeader();
109 
110         Date date = null;
111 
112         if (timestamp != -1) {
113             date = new Date(timestamp);
114 
115             IndexUtils.updateTimestamp(w.getDirectory(), date);
116         }
117 
118         int n = 0;
119 
120         Document doc;
121         Set<String> rootGroups = new HashSet<>();
122         Set<String> allGroups = new HashSet<>();
123 
124         while ((doc = readDocument()) != null) {
125             addToIndex(doc, context, w, rootGroups, allGroups);
126             n++;
127         }
128 
129         w.commit();
130 
131         IndexDataReadResult result = new IndexDataReadResult();
132         result.setDocumentCount(n);
133         result.setTimestamp(date);
134         result.setRootGroups(rootGroups);
135         result.setAllGroups(allGroups);
136 
137         LOGGER.debug(
138                 "Reading ST index done in {} sec",
139                 Duration.between(start, Instant.now()).getSeconds());
140         return result;
141     }
142 
143     private IndexDataReadResult readIndexMT(IndexWriter w, IndexingContext context) throws IOException {
144         LOGGER.debug("Reading MT index...");
145         Instant start = Instant.now();
146         long timestamp = readHeader();
147 
148         int n = 0;
149 
150         final Document theEnd = new Document();
151 
152         Set<String> rootGroups = ConcurrentHashMap.newKeySet();
153         Set<String> allGroups = ConcurrentHashMap.newKeySet();
154         ArrayBlockingQueue<Document> queue = new ArrayBlockingQueue<>(10000);
155 
156         ExecutorService executorService = Executors.newFixedThreadPool(threads);
157         ArrayList<Exception> errors = new ArrayList<>();
158         ArrayList<FSDirectory> siloDirectories = new ArrayList<>(threads);
159         ArrayList<IndexWriter> siloWriters = new ArrayList<>(threads);
160         LOGGER.debug("Creating {} silo writer threads...", threads);
161         for (int i = 0; i < threads; i++) {
162             final int silo = i;
163             FSDirectory siloDirectory = tempDirectory("silo" + i);
164             siloDirectories.add(siloDirectory);
165             siloWriters.add(tempWriter(siloDirectory));
166             executorService.execute(() -> {
167                 LOGGER.debug("Starting thread {}", Thread.currentThread().getName());
168                 try {
169                     while (true) {
170                         try {
171                             Document doc = queue.take();
172                             if (doc == theEnd) {
173                                 break;
174                             }
175                             addToIndex(doc, context, siloWriters.get(silo), rootGroups, allGroups);
176                         } catch (InterruptedException | IOException e) {
177                             errors.add(e);
178                             break;
179                         }
180                     }
181                 } finally {
182                     LOGGER.debug("Done thread {}", Thread.currentThread().getName());
183                 }
184             });
185         }
186 
187         LOGGER.debug("Loading up documents into silos");
188         try {
189             Document doc;
190             while ((doc = readDocument()) != null) {
191                 queue.put(doc);
192                 n++;
193             }
194             LOGGER.debug("Signalling END");
195             for (int i = 0; i < threads; i++) {
196                 queue.put(theEnd);
197             }
198 
199             LOGGER.debug("Shutting down threads");
200             executorService.shutdown();
201             executorService.awaitTermination(5L, TimeUnit.MINUTES);
202         } catch (InterruptedException e) {
203             throw new IOException("Interrupted", e);
204         }
205 
206         if (!errors.isEmpty()) {
207             IOException exception = new IOException("Error during load of index");
208             errors.forEach(exception::addSuppressed);
209             throw exception;
210         }
211 
212         LOGGER.debug("Silos loaded...");
213         Date date = null;
214         if (timestamp != -1) {
215             date = new Date(timestamp);
216             IndexUtils.updateTimestamp(w.getDirectory(), date);
217         }
218 
219         LOGGER.debug("Closing silo writers...");
220         for (IndexWriter siloWriter : siloWriters) {
221             siloWriter.commit();
222             siloWriter.close();
223         }
224 
225         LOGGER.debug("Merging silo directories...");
226         w.addIndexes(siloDirectories.toArray(new Directory[0]));
227 
228         LOGGER.debug("Cleanup of silo directories...");
229         for (FSDirectory siloDirectory : siloDirectories) {
230             File dir = siloDirectory.getDirectory().toFile();
231             siloDirectory.close();
232             IndexUtils.delete(dir);
233         }
234 
235         LOGGER.debug("Finalizing...");
236         w.commit();
237 
238         IndexDataReadResult result = new IndexDataReadResult();
239         result.setDocumentCount(n);
240         result.setTimestamp(date);
241         result.setRootGroups(rootGroups);
242         result.setAllGroups(allGroups);
243 
244         LOGGER.debug(
245                 "Reading MT index done in {} sec",
246                 Duration.between(start, Instant.now()).getSeconds());
247         return result;
248     }
249 
250     private FSDirectory tempDirectory(final String name) throws IOException {
251         return FSDirectory.open(Files.createTempDirectory(name + ".dir"));
252     }
253 
254     private IndexWriter tempWriter(final FSDirectory directory) throws IOException {
255         IndexWriterConfig config = new IndexWriterConfig(new NexusAnalyzer());
256         config.setUseCompoundFile(false);
257         return new NexusIndexWriter(directory, config);
258     }
259 
260     private void addToIndex(
261             final Document doc,
262             final IndexingContext context,
263             final IndexWriter indexWriter,
264             final Set<String> rootGroups,
265             final Set<String> allGroups)
266             throws IOException {
267         ArtifactInfo ai = IndexUtils.constructArtifactInfo(doc, context);
268         if (ai != null) {
269             indexWriter.addDocument(IndexUtils.updateDocument(doc, context, false, ai));
270 
271             rootGroups.add(ai.getRootGroup());
272             allGroups.add(ai.getGroupId());
273         } else {
274             // these two fields are automatically handled in code above
275             if (doc.getField(ArtifactInfo.ALL_GROUPS) == null && doc.getField(ArtifactInfo.ROOT_GROUPS) == null) {
276                 indexWriter.addDocument(doc);
277             }
278         }
279     }
280 
281     public long readHeader() throws IOException {
282         final byte hdrbyte = (byte) ((IndexDataWriter.VERSION << 24) >> 24);
283 
284         if (hdrbyte != dis.readByte()) {
285             // data format version mismatch
286             throw new IOException("Provided input contains unexpected data (0x01 expected as 1st byte)!");
287         }
288 
289         return dis.readLong();
290     }
291 
292     public Document readDocument() throws IOException {
293         int fieldCount;
294         try {
295             fieldCount = dis.readInt();
296         } catch (EOFException ex) {
297             return null; // no more documents
298         }
299 
300         Document doc = new Document();
301 
302         for (int i = 0; i < fieldCount; i++) {
303             doc.add(readField());
304         }
305 
306         // Fix up UINFO field wrt MINDEXER-41
307         final Field uinfoField = (Field) doc.getField(ArtifactInfo.UINFO);
308         final String info = doc.get(ArtifactInfo.INFO);
309         if (uinfoField != null && info != null && !info.isEmpty()) {
310             final String[] splitInfo = ArtifactInfo.FS_PATTERN.split(info);
311             if (splitInfo.length > 6) {
312                 final String extension = splitInfo[6];
313                 final String uinfoString = uinfoField.stringValue();
314                 if (uinfoString.endsWith(ArtifactInfo.FS + ArtifactInfo.NA)) {
315                     uinfoField.setStringValue(uinfoString + ArtifactInfo.FS + ArtifactInfo.nvl(extension));
316                 }
317             }
318         }
319 
320         return doc;
321     }
322 
323     private Field readField() throws IOException {
324         int flags = dis.read();
325 
326         FieldType fieldType = new FieldType();
327         if ((flags & IndexDataWriter.F_INDEXED) > 0) {
328             boolean tokenized = (flags & IndexDataWriter.F_TOKENIZED) > 0;
329             fieldType.setTokenized(tokenized);
330         }
331         fieldType.setIndexOptions(IndexOptions.DOCS_AND_FREQS_AND_POSITIONS);
332         fieldType.setStored((flags & IndexDataWriter.F_STORED) > 0);
333 
334         String name = dis.readUTF();
335         String value = readUTF(dis);
336 
337         return new Field(name, value, fieldType);
338     }
339 
340     private static String readUTF(DataInput in) throws IOException {
341         int utflen = in.readInt();
342 
343         byte[] bytearr;
344         char[] chararr;
345 
346         try {
347             bytearr = new byte[utflen];
348             chararr = new char[utflen];
349         } catch (OutOfMemoryError e) {
350             throw new IOException(
351                     "Index data content is inappropriate (is junk?), leads to OutOfMemoryError!"
352                             + " See MINDEXER-28 for more information!",
353                     e);
354         }
355 
356         int c, char2, char3;
357         int count = 0;
358         int chararrCount = 0;
359 
360         in.readFully(bytearr, 0, utflen);
361 
362         while (count < utflen) {
363             c = bytearr[count] & 0xff;
364             if (c > 127) {
365                 break;
366             }
367             count++;
368             chararr[chararrCount++] = (char) c;
369         }
370 
371         while (count < utflen) {
372             c = bytearr[count] & 0xff;
373             switch (c >> 4) {
374                 case 0:
375                 case 1:
376                 case 2:
377                 case 3:
378                 case 4:
379                 case 5:
380                 case 6:
381                 case 7:
382                     /* 0xxxxxxx */
383                     count++;
384                     chararr[chararrCount++] = (char) c;
385                     break;
386 
387                 case 12:
388                 case 13:
389                     /* 110x xxxx 10xx xxxx */
390                     count += 2;
391                     if (count > utflen) {
392                         throw new UTFDataFormatException("malformed input: partial character at end");
393                     }
394                     char2 = bytearr[count - 1];
395                     if ((char2 & 0xC0) != 0x80) {
396                         throw new UTFDataFormatException("malformed input around byte " + count);
397                     }
398                     chararr[chararrCount++] = (char) (((c & 0x1F) << 6) | (char2 & 0x3F));
399                     break;
400 
401                 case 14:
402                     /* 1110 xxxx 10xx xxxx 10xx xxxx */
403                     count += 3;
404                     if (count > utflen) {
405                         throw new UTFDataFormatException("malformed input: partial character at end");
406                     }
407                     char2 = bytearr[count - 2];
408                     char3 = bytearr[count - 1];
409                     if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
410                         throw new UTFDataFormatException("malformed input around byte " + (count - 1));
411                     }
412                     chararr[chararrCount++] = (char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | ((char3 & 0x3F)));
413                     break;
414 
415                 default:
416                     /* 10xx xxxx, 1111 xxxx */
417                     throw new UTFDataFormatException("malformed input around byte " + count);
418             }
419         }
420 
421         // The number of chars produced may be less than utflen
422         return new String(chararr, 0, chararrCount);
423     }
424 
425     /**
426      * An index data read result holder
427      */
428     public static class IndexDataReadResult {
429         private Date timestamp;
430 
431         private int documentCount;
432 
433         private Set<String> rootGroups;
434 
435         private Set<String> allGroups;
436 
437         public void setDocumentCount(int documentCount) {
438             this.documentCount = documentCount;
439         }
440 
441         public int getDocumentCount() {
442             return documentCount;
443         }
444 
445         public void setTimestamp(Date timestamp) {
446             this.timestamp = timestamp;
447         }
448 
449         public Date getTimestamp() {
450             return timestamp;
451         }
452 
453         public void setRootGroups(Set<String> rootGroups) {
454             this.rootGroups = rootGroups;
455         }
456 
457         public Set<String> getRootGroups() {
458             return rootGroups;
459         }
460 
461         public void setAllGroups(Set<String> allGroups) {
462             this.allGroups = allGroups;
463         }
464 
465         public Set<String> getAllGroups() {
466             return allGroups;
467         }
468     }
469 
470     /**
471      * Reads index content by using a visitor. <br>
472      * The visitor is called for each read documents after it has been populated with Lucene fields.
473      *
474      * @param visitor an index data visitor
475      * @param context indexing context
476      * @return statistics about read data
477      * @throws IOException in case of an IO exception during index file access
478      */
479     public IndexDataReadResult readIndex(final IndexDataReadVisitor visitor, final IndexingContext context)
480             throws IOException {
481         dis.readByte(); // data format version
482 
483         long timestamp = dis.readLong();
484 
485         Date date = null;
486 
487         if (timestamp != -1) {
488             date = new Date(timestamp);
489         }
490 
491         int n = 0;
492 
493         Document doc;
494         while ((doc = readDocument()) != null) {
495             visitor.visitDocument(IndexUtils.updateDocument(doc, context, false));
496 
497             n++;
498         }
499 
500         IndexDataReadResult result = new IndexDataReadResult();
501         result.setDocumentCount(n);
502         result.setTimestamp(date);
503         return result;
504     }
505 
506     /**
507      * Visitor of indexed Lucene documents.
508      */
509     public interface IndexDataReadVisitor {
510 
511         /**
512          * Called on each read document. The document is already populated with fields.
513          *
514          * @param document read document
515          */
516         void visitDocument(Document document);
517     }
518 }