@@ -75,16 +75,17 @@ void SelectiveDecimalColumnReader<DataT>::seekToRowGroup(int64_t index) {
7575
7676template <typename DataT>
7777template <bool kDense >
78- void SelectiveDecimalColumnReader<DataT>::readHelper(RowSet rows) {
79- vector_size_t numRows = rows.back () + 1 ;
78+ void SelectiveDecimalColumnReader<DataT>::readHelper(
79+ const common::Filter* filter,
80+ RowSet rows) {
8081 ExtractToReader extractValues (this );
81- common::AlwaysTrue filter ;
82+ common::AlwaysTrue alwaysTrue ;
8283 DirectRleColumnVisitor<
8384 int64_t ,
8485 common::AlwaysTrue,
8586 decltype (extractValues),
8687 kDense >
87- visitor (filter , this , rows, extractValues);
88+ visitor (alwaysTrue , this , rows, extractValues);
8889
8990 // decode scale stream
9091 if (version_ == velox::dwrf::RleVersion_1) {
@@ -104,46 +105,201 @@ void SelectiveDecimalColumnReader<DataT>::readHelper(RowSet rows) {
104105 // reset numValues_ before reading values
105106 numValues_ = 0 ;
106107 valueSize_ = sizeof (DataT);
108+ vector_size_t numRows = rows.back () + 1 ;
107109 ensureValuesCapacity<DataT>(numRows);
108110
109111 // decode value stream
110112 facebook::velox::dwio::common::
111113 ColumnVisitor<DataT, common::AlwaysTrue, decltype (extractValues), kDense >
112- valueVisitor (filter , this , rows, extractValues);
114+ valueVisitor (alwaysTrue , this , rows, extractValues);
113115 decodeWithVisitor<DirectDecoder<true >>(valueDecoder_.get (), valueVisitor);
114116 readOffset_ += numRows;
117+
118+ // Fill decimals before applying filter.
119+ fillDecimals ();
120+
121+ const auto rawNulls = nullsInReadRange_
122+ ? (kDense ? nullsInReadRange_->as <uint64_t >() : rawResultNulls_)
123+ : nullptr ;
124+ // Process filter.
125+ process (filter, rows, rawNulls);
126+ }
127+
128+ template <typename DataT>
129+ void SelectiveDecimalColumnReader<DataT>::processNulls(
130+ bool isNull,
131+ const RowSet& rows,
132+ const uint64_t * rawNulls) {
133+ if (!rawNulls) {
134+ return ;
135+ }
136+ returnReaderNulls_ = false ;
137+ anyNulls_ = !isNull;
138+ allNull_ = isNull;
139+
140+ auto rawDecimal = values_->asMutable <DataT>();
141+ auto rawScale = scaleBuffer_->asMutable <int64_t >();
142+
143+ vector_size_t idx = 0 ;
144+ if (isNull) {
145+ for (vector_size_t i = 0 ; i < numValues_; i++) {
146+ if (bits::isBitNull (rawNulls, i)) {
147+ bits::setNull (rawResultNulls_, idx);
148+ addOutputRow (rows[i]);
149+ idx++;
150+ }
151+ }
152+ } else {
153+ for (vector_size_t i = 0 ; i < numValues_; i++) {
154+ if (!bits::isBitNull (rawNulls, i)) {
155+ bits::setNull (rawResultNulls_, idx, false );
156+ rawDecimal[idx] = rawDecimal[i];
157+ rawScale[idx] = rawScale[i];
158+ addOutputRow (rows[i]);
159+ idx++;
160+ }
161+ }
162+ }
163+ }
164+
165+ template <typename DataT>
166+ void SelectiveDecimalColumnReader<DataT>::processFilter(
167+ const common::Filter* filter,
168+ const RowSet& rows,
169+ const uint64_t * rawNulls) {
170+ VELOX_CHECK_NOT_NULL (filter, " Filter must not be null." );
171+ returnReaderNulls_ = false ;
172+ anyNulls_ = false ;
173+ allNull_ = true ;
174+
175+ vector_size_t idx = 0 ;
176+ auto rawDecimal = values_->asMutable <DataT>();
177+ for (vector_size_t i = 0 ; i < numValues_; i++) {
178+ if (rawNulls && bits::isBitNull (rawNulls, i)) {
179+ if (filter->testNull ()) {
180+ bits::setNull (rawResultNulls_, idx);
181+ addOutputRow (rows[i]);
182+ anyNulls_ = true ;
183+ idx++;
184+ }
185+ } else {
186+ bool tested;
187+ if constexpr (std::is_same_v<DataT, int64_t >) {
188+ tested = filter->testInt64 (rawDecimal[i]);
189+ } else {
190+ tested = filter->testInt128 (rawDecimal[i]);
191+ }
192+
193+ if (tested) {
194+ if (rawNulls) {
195+ bits::setNull (rawResultNulls_, idx, false );
196+ }
197+ rawDecimal[idx] = rawDecimal[i];
198+ addOutputRow (rows[i]);
199+ allNull_ = false ;
200+ idx++;
201+ }
202+ }
203+ }
204+ }
205+
206+ template <typename DataT>
207+ void SelectiveDecimalColumnReader<DataT>::process(
208+ const common::Filter* filter,
209+ const RowSet& rows,
210+ const uint64_t * rawNulls) {
211+ // Treat the filter as kAlwaysTrue if any of the following conditions are met:
212+ // 1) No filter found;
213+ // 2) Filter is kIsNotNull but rawNulls == NULL (no elements is null).
214+ auto filterKind =
215+ !filter || (filter->kind () == common::FilterKind::kIsNotNull && !rawNulls)
216+ ? common::FilterKind::kAlwaysTrue
217+ : filter->kind ();
218+ switch (filterKind) {
219+ case common::FilterKind::kAlwaysTrue :
220+ // Simply add all rows to output.
221+ for (vector_size_t i = 0 ; i < numValues_; i++) {
222+ addOutputRow (rows[i]);
223+ }
224+ break ;
225+ case common::FilterKind::kIsNull :
226+ processNulls (true , rows, rawNulls);
227+ break ;
228+ case common::FilterKind::kIsNotNull :
229+ processNulls (false , rows, rawNulls);
230+ break ;
231+ case common::FilterKind::kBigintRange :
232+ case common::FilterKind::kBigintValuesUsingHashTable :
233+ case common::FilterKind::kBigintValuesUsingBitmask :
234+ case common::FilterKind::kNegatedBigintRange :
235+ case common::FilterKind::kNegatedBigintValuesUsingHashTable :
236+ case common::FilterKind::kNegatedBigintValuesUsingBitmask :
237+ case common::FilterKind::kBigintMultiRange : {
238+ if constexpr (std::is_same_v<DataT, int64_t >) {
239+ processFilter (filter, rows, rawNulls);
240+ } else {
241+ const auto actualType = CppToType<DataT>::create ();
242+ VELOX_NYI (
243+ " Expected type BIGINT, but found file type {}." ,
244+ actualType->toString ());
245+ }
246+ break ;
247+ }
248+ case common::FilterKind::kHugeintValuesUsingHashTable :
249+ case common::FilterKind::kHugeintRange : {
250+ if constexpr (std::is_same_v<DataT, int128_t >) {
251+ processFilter (filter, rows, rawNulls);
252+ } else {
253+ const auto actualType = CppToType<DataT>::create ();
254+ VELOX_NYI (
255+ " Expected type HUGEINT, but found file type {}." ,
256+ actualType->toString ());
257+ }
258+ break ;
259+ }
260+ default :
261+ VELOX_NYI (" Unsupported filter: {}." , static_cast <int >(filterKind));
262+ }
115263}
116264
117265template <typename DataT>
118266void SelectiveDecimalColumnReader<DataT>::read(
119267 int64_t offset,
120268 const RowSet& rows,
121269 const uint64_t * incomingNulls) {
122- VELOX_CHECK (!scanSpec_->filter ());
123270 VELOX_CHECK (!scanSpec_->valueHook ());
124271 prepareRead<int64_t >(offset, rows, incomingNulls);
272+ if (!resultNulls_ || !resultNulls_->unique () ||
273+ resultNulls_->capacity () * 8 < rows.size ()) {
274+ // Make sure a dedicated resultNulls_ is allocated with enough capacity as
275+ // RleDecoder always assumes it is available.
276+ resultNulls_ = AlignedBuffer::allocate<bool >(rows.size (), memoryPool_);
277+ rawResultNulls_ = resultNulls_->asMutable <uint64_t >();
278+ }
125279 bool isDense = rows.back () == rows.size () - 1 ;
126280 if (isDense) {
127- readHelper<true >(rows);
281+ readHelper<true >(scanSpec_-> filter (), rows);
128282 } else {
129- readHelper<false >(rows);
283+ readHelper<false >(scanSpec_-> filter (), rows);
130284 }
131285}
132286
133287template <typename DataT>
134288void SelectiveDecimalColumnReader<DataT>::getValues(
135289 const RowSet& rows,
136290 VectorPtr* result) {
291+ rawValues_ = values_->asMutable <char >();
292+ getIntValues (rows, requestedType_, result);
293+ }
294+
295+ template <typename DataT>
296+ void SelectiveDecimalColumnReader<DataT>::fillDecimals() {
137297 auto nullsPtr =
138298 resultNulls () ? resultNulls ()->template as <uint64_t >() : nullptr ;
139299 auto scales = scaleBuffer_->as <int64_t >();
140300 auto values = values_->asMutable <DataT>();
141-
142301 DecimalUtil::fillDecimals<DataT>(
143302 values, nullsPtr, values, scales, numValues_, scale_);
144-
145- rawValues_ = values_->asMutable <char >();
146- getIntValues (rows, requestedType_, result);
147303}
148304
149305template class SelectiveDecimalColumnReader <int64_t >;
0 commit comments