1717package  io .aiven .kafka .tieredstorage .transform ;
1818
1919import  java .io .ByteArrayInputStream ;
20+ import  java .io .IOException ;
2021import  java .io .InputStream ;
2122import  java .io .SequenceInputStream ;
23+ import  java .nio .file .Files ;
24+ import  java .nio .file .Path ;
2225import  java .util .Enumeration ;
2326import  java .util .Objects ;
27+ import  java .util .Optional ;
2428
2529import  io .aiven .kafka .tieredstorage .manifest .index .AbstractChunkIndexBuilder ;
2630import  io .aiven .kafka .tieredstorage .manifest .index .ChunkIndex ;
4347public  class  TransformFinisher  implements  Enumeration <InputStream > {
4448    private  final  TransformChunkEnumeration  inner ;
4549    private  final  AbstractChunkIndexBuilder  chunkIndexBuilder ;
50+     private  final  Path  originalFilePath ;
51+     private  final  int  originalFileSize ;
4652    private  ChunkIndex  chunkIndex  = null ;
4753    private  final  Bucket  rateLimitingBucket ;
4854
@@ -53,13 +59,16 @@ public static Builder newBuilder(final TransformChunkEnumeration inner, final in
5359    private  TransformFinisher (
5460        final  TransformChunkEnumeration  inner ,
5561        final  boolean  chunkingEnabled ,
62+         final  Path  originalFilePath ,
5663        final  int  originalFileSize ,
5764        final  Bucket  rateLimitingBucket 
5865    ) {
5966        this .inner  = Objects .requireNonNull (inner , "inner cannot be null" );
6067
6168        final  int  originalChunkSize  = chunkingEnabled  ? inner .originalChunkSize () : originalFileSize ;
6269        this .chunkIndexBuilder  = chunkIndexBuilder (inner , originalChunkSize , originalFileSize );
70+         this .originalFilePath  = originalFilePath ;
71+         this .originalFileSize  = originalFileSize ;
6372        this .rateLimitingBucket  = rateLimitingBucket ;
6473    }
6574
@@ -102,24 +111,58 @@ public InputStream nextElement() {
102111
103112    public  ChunkIndex  chunkIndex () {
104113        if  (chunkIndex  == null ) {
105-             throw  new  IllegalStateException ("Chunk index was not built, was finisher used?" );
114+             if  (isBaseTransform ()) {
115+                 // as chunk index will not be built by consuming the input stream, calculate it from source file 
116+                 return  calculateChunkIndex ();
117+             } else  {
118+                 throw  new  IllegalStateException ("Chunk index was not built, was finisher used?" );
119+             }
106120        }
107121        return  this .chunkIndex ;
108122    }
109123
110-     public  InputStream  toInputStream () {
111-         final  SequenceInputStream  sequencedInputStream  = new  SequenceInputStream (this );
112-         if  (rateLimitingBucket  == null ) {
113-             return  sequencedInputStream ;
124+     private  ChunkIndex  calculateChunkIndex () {
125+         final  var  chunkSize  = inner .transformedChunkSize ();
126+         var  size  = originalFileSize ;
127+         while  (size  > chunkSize ) {
128+             chunkIndexBuilder .addChunk (chunkSize );
129+             size  -= chunkSize ;
130+         }
131+         return  chunkIndexBuilder .finish (size );
132+     }
133+ 
134+     public  InputStream  toInputStream () throws  IOException  {
135+         if  (isBaseTransform () && originalFilePath  != null ) {
136+             // close inner input stream (based on source file) 
137+             final  var  sequenceInputStream  = new  SequenceInputStream (this );
138+             sequenceInputStream .close ();
139+ 
140+             return  maybeToRateLimitedInputStream (Files .newInputStream (originalFilePath ));
114141        } else  {
115-             return  new  RateLimitedInputStream ( sequencedInputStream ,  rateLimitingBucket );
142+             return  maybeToRateLimitedInputStream ( new  SequenceInputStream ( this ) );
116143        }
117144    }
118145
146+     private  InputStream  maybeToRateLimitedInputStream (final  InputStream  delegated ) {
147+         if  (rateLimitingBucket  == null ) {
148+             return  delegated ;
149+         }
150+         return  new  RateLimitedInputStream (delegated , rateLimitingBucket );
151+     }
152+ 
153+     private  boolean  isBaseTransform () {
154+         return  inner  instanceof  BaseTransformChunkEnumeration ;
155+     }
156+ 
157+     Optional <Path > maybeOriginalFilePath () {
158+         return  Optional .ofNullable (originalFilePath );
159+     }
160+ 
119161    public  static  class  Builder  {
120162        final  TransformChunkEnumeration  inner ;
121163        final  Integer  originalFileSize ;
122164        boolean  chunkingEnabled  = true ;
165+         Path  originalFilePath  = null ;
123166        Bucket  rateLimitingBucket ;
124167
125168        public  Builder (final  TransformChunkEnumeration  inner , final  int  originalFileSize ) {
@@ -143,8 +186,14 @@ public Builder withChunkingDisabled() {
143186            return  this ;
144187        }
145188
189+         public  Builder  withOriginalFilePath (final  Path  originalFilePath ) {
190+             this .originalFilePath  = Objects .requireNonNull (originalFilePath , "originalFilePath cannot be null" );
191+             return  this ;
192+         }
193+ 
146194        public  TransformFinisher  build () {
147-             return  new  TransformFinisher (inner , chunkingEnabled , originalFileSize , rateLimitingBucket );
195+             return  new  TransformFinisher (inner , chunkingEnabled , originalFilePath , originalFileSize ,
196+                 rateLimitingBucket );
148197        }
149198    }
150199}
0 commit comments