Skip to content

Commit 6fc6193

Browse files
nfssdqtejasmanohar
authored andcommitted
fixed missing row issues due to skipping heade row in each page of responses(#12) (#13)
1 parent 6918d0d commit 6fc6193

File tree

3 files changed

+257
-9
lines changed

3 files changed

+257
-9
lines changed

conn.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ func (c *conn) runQuery(ctx context.Context, query string) (driver.Rows, error)
5050
return newRows(rowsConfig{
5151
Athena: c.athena,
5252
QueryID: queryID,
53+
// todo add check for ddl queries to not skip header(#10)
54+
SkipHeader: true,
5355
})
5456
}
5557

rows.go

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,19 +13,22 @@ type rows struct {
1313
athena athenaiface.AthenaAPI
1414
queryID string
1515

16-
done bool
17-
out *athena.GetQueryResultsOutput
16+
done bool
17+
skipHeaderRow bool
18+
out *athena.GetQueryResultsOutput
1819
}
1920

2021
type rowsConfig struct {
21-
Athena athenaiface.AthenaAPI
22-
QueryID string
22+
Athena athenaiface.AthenaAPI
23+
QueryID string
24+
SkipHeader bool
2325
}
2426

2527
func newRows(cfg rowsConfig) (*rows, error) {
2628
r := rows{
27-
athena: cfg.Athena,
28-
queryID: cfg.QueryID,
29+
athena: cfg.Athena,
30+
queryID: cfg.QueryID,
31+
skipHeaderRow: cfg.SkipHeader,
2932
}
3033

3134
shouldContinue, err := r.fetchNextPage(nil)
@@ -97,13 +100,19 @@ func (r *rows) fetchNextPage(token *string) (bool, error) {
97100
return false, err
98101
}
99102

100-
// First row of an Athena response contains headers.
103+
var rowOffset = 0
104+
// First row of the first page contains header if the query is not DDL.
101105
// These are also available in *athena.Row.ResultSetMetadata.
102-
if len(r.out.ResultSet.Rows) < 2 {
106+
if r.skipHeaderRow {
107+
rowOffset = 1
108+
r.skipHeaderRow = false
109+
}
110+
111+
if len(r.out.ResultSet.Rows) < rowOffset+1 {
103112
return false, nil
104113
}
105114

106-
r.out.ResultSet.Rows = r.out.ResultSet.Rows[1:]
115+
r.out.ResultSet.Rows = r.out.ResultSet.Rows[rowOffset:]
107116
return true, nil
108117
}
109118

rows_test.go

Lines changed: 237 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,237 @@
1+
package athena
2+
3+
import (
4+
"database/sql/driver"
5+
"errors"
6+
"io"
7+
"math/rand"
8+
"testing"
9+
10+
"github.com/aws/aws-sdk-go/service/athena"
11+
"github.com/aws/aws-sdk-go/service/athena/athenaiface"
12+
"github.com/stretchr/testify/assert"
13+
)
14+
15+
var dummyError = errors.New("dummy error")
16+
17+
type genQueryResultsOutputByToken func(token string) (*athena.GetQueryResultsOutput, error)
18+
19+
var queryToResultsGenMap = map[string]genQueryResultsOutputByToken{
20+
"select": dummySelectQueryResponse,
21+
"show": dummyShowResponse,
22+
"iteration_fail": dummyFailedIterationResponse,
23+
}
24+
25+
func genColumnInfo(column string) *athena.ColumnInfo {
26+
caseSensitive := true
27+
catalogName := "hive"
28+
nullable := "UNKNOWN"
29+
precision := int64(2147483647)
30+
scale := int64(0)
31+
schemaName := ""
32+
tableName := ""
33+
columnType := "varchar"
34+
35+
return &athena.ColumnInfo{
36+
CaseSensitive: &caseSensitive,
37+
CatalogName: &catalogName,
38+
Nullable: &nullable,
39+
Precision: &precision,
40+
Scale: &scale,
41+
SchemaName: &schemaName,
42+
TableName: &tableName,
43+
Type: &columnType,
44+
Label: &column,
45+
Name: &column,
46+
}
47+
}
48+
49+
func randomString() string {
50+
const alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
51+
s := make([]byte, 10)
52+
for i := 0; i < len(s); i++ {
53+
s[i] = alphabet[rand.Intn(len(alphabet))]
54+
}
55+
return string(s)
56+
}
57+
58+
func genRow(isHeader bool, columns []*athena.ColumnInfo) *athena.Row {
59+
var data []*athena.Datum
60+
for i := 0; i < len(columns); i++ {
61+
if isHeader {
62+
data = append(data, &athena.Datum{
63+
VarCharValue: columns[i].Name,
64+
})
65+
} else {
66+
s := randomString()
67+
data = append(data, &athena.Datum{
68+
VarCharValue: &s,
69+
})
70+
}
71+
}
72+
return &athena.Row{
73+
Data: data,
74+
}
75+
}
76+
77+
func dummySelectQueryResponse(token string) (*athena.GetQueryResultsOutput, error) {
78+
switch token {
79+
case "":
80+
var nextToken = "page_1"
81+
columns := []*athena.ColumnInfo{
82+
genColumnInfo("first_name"),
83+
genColumnInfo("last_name"),
84+
}
85+
return &athena.GetQueryResultsOutput{
86+
NextToken: &nextToken,
87+
ResultSet: &athena.ResultSet{
88+
ResultSetMetadata: &athena.ResultSetMetadata{
89+
ColumnInfo: columns,
90+
},
91+
Rows: []*athena.Row{
92+
genRow(true, columns),
93+
genRow(false, columns),
94+
genRow(false, columns),
95+
genRow(false, columns),
96+
genRow(false, columns),
97+
},
98+
},
99+
}, nil
100+
case "page_1":
101+
columns := []*athena.ColumnInfo{
102+
genColumnInfo("first_name"),
103+
genColumnInfo("last_name"),
104+
}
105+
return &athena.GetQueryResultsOutput{
106+
ResultSet: &athena.ResultSet{
107+
ResultSetMetadata: &athena.ResultSetMetadata{
108+
ColumnInfo: columns,
109+
},
110+
Rows: []*athena.Row{
111+
genRow(false, columns),
112+
genRow(false, columns),
113+
genRow(false, columns),
114+
genRow(false, columns),
115+
genRow(false, columns),
116+
},
117+
},
118+
}, nil
119+
default:
120+
return nil, dummyError
121+
}
122+
}
123+
124+
func dummyShowResponse(_ string) (*athena.GetQueryResultsOutput, error) {
125+
columns := []*athena.ColumnInfo{
126+
genColumnInfo("partition"),
127+
}
128+
return &athena.GetQueryResultsOutput{
129+
ResultSet: &athena.ResultSet{
130+
ResultSetMetadata: &athena.ResultSetMetadata{
131+
ColumnInfo: columns,
132+
},
133+
Rows: []*athena.Row{
134+
genRow(false, columns),
135+
genRow(false, columns),
136+
},
137+
},
138+
}, nil
139+
}
140+
141+
func dummyFailedIterationResponse(token string) (*athena.GetQueryResultsOutput, error) {
142+
switch token {
143+
case "":
144+
var nextToken = "page_1"
145+
columns := []*athena.ColumnInfo{
146+
genColumnInfo("first_name"),
147+
genColumnInfo("last_name"),
148+
}
149+
return &athena.GetQueryResultsOutput{
150+
NextToken: &nextToken,
151+
ResultSet: &athena.ResultSet{
152+
ResultSetMetadata: &athena.ResultSetMetadata{
153+
ColumnInfo: columns,
154+
},
155+
Rows: []*athena.Row{
156+
genRow(true, columns),
157+
genRow(false, columns),
158+
genRow(false, columns),
159+
genRow(false, columns),
160+
genRow(false, columns),
161+
},
162+
},
163+
}, nil
164+
default:
165+
return nil, dummyError
166+
}
167+
}
168+
169+
type mockAthenaClient struct {
170+
athenaiface.AthenaAPI
171+
}
172+
173+
func (m *mockAthenaClient) GetQueryResults(query *athena.GetQueryResultsInput) (*athena.GetQueryResultsOutput, error) {
174+
var nextToken = ""
175+
if query.NextToken != nil {
176+
nextToken = *query.NextToken
177+
}
178+
return queryToResultsGenMap[*query.QueryExecutionId](nextToken)
179+
}
180+
181+
func castToValue(dest ...driver.Value) []driver.Value {
182+
return dest
183+
}
184+
185+
func TestRows_Next(t *testing.T) {
186+
tests := []struct {
187+
desc string
188+
queryID string
189+
skipHeader bool
190+
expectedResultsSize int
191+
expectedError error
192+
}{
193+
{
194+
desc: "show query, no header, 2 rows, no error",
195+
queryID: "show",
196+
skipHeader: false,
197+
expectedResultsSize: 2,
198+
expectedError: nil,
199+
},
200+
{
201+
desc: "select query, header, multipage, 9 rows, no error",
202+
queryID: "select",
203+
skipHeader: true,
204+
expectedResultsSize: 9,
205+
expectedError: nil,
206+
},
207+
{
208+
desc: "failed during calling next",
209+
queryID: "iteration_fail",
210+
skipHeader: true,
211+
expectedError: dummyError,
212+
},
213+
}
214+
for _, test := range tests {
215+
r, _ := newRows(rowsConfig{
216+
Athena: new(mockAthenaClient),
217+
QueryID: test.queryID,
218+
SkipHeader: test.skipHeader,
219+
})
220+
221+
var firstName, lastName string
222+
cnt := 0
223+
for {
224+
err := r.Next(castToValue(&firstName, &lastName))
225+
if err != nil {
226+
if err != io.EOF {
227+
assert.Equal(t, test.expectedError, err)
228+
}
229+
break
230+
}
231+
cnt++
232+
}
233+
if test.expectedError == nil {
234+
assert.Equal(t, test.expectedResultsSize, cnt)
235+
}
236+
}
237+
}

0 commit comments

Comments
 (0)