Skip to content

Commit e581edf

Browse files
authored
Merge pull request #17 from jsafrane/add-common-functions
Add Get*Capabilities and Probe common functions
2 parents d195a7c + c2df104 commit e581edf

File tree

2 files changed

+586
-19
lines changed

2 files changed

+586
-19
lines changed

connection/connection.go

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ import (
2525
"strings"
2626
"time"
2727

28+
"google.golang.org/grpc/codes"
29+
"google.golang.org/grpc/status"
30+
2831
"github.com/container-storage-interface/spec/lib/go/csi"
2932
"github.com/kubernetes-csi/csi-lib-utils/protosanitizer"
3033
"google.golang.org/grpc"
@@ -34,6 +37,9 @@ import (
3437
const (
3538
// Interval of logging connection errors
3639
connectionLoggingInterval = 10 * time.Second
40+
41+
// Interval of trying to call Probe() until it succeeds
42+
probeInterval = 1 * time.Second
3743
)
3844

3945
const terminationLogPath = "/dev/termination-log"
@@ -179,6 +185,7 @@ func LogGRPC(ctx context.Context, method string, req, reply interface{}, cc *grp
179185
return err
180186
}
181187

188+
// GetDriverName returns name of CSI driver.
182189
func GetDriverName(ctx context.Context, conn *grpc.ClientConn) (string, error) {
183190
client := csi.NewIdentityClient(conn)
184191

@@ -193,3 +200,111 @@ func GetDriverName(ctx context.Context, conn *grpc.ClientConn) (string, error) {
193200
}
194201
return name, nil
195202
}
203+
204+
// PluginCapabilitySet is set of CSI plugin capabilities. Only supported capabilities are in the map.
205+
type PluginCapabilitySet map[csi.PluginCapability_Service_Type]bool
206+
207+
// GetPluginCapabilities returns set of supported capabilities of CSI driver.
208+
func GetPluginCapabilities(ctx context.Context, conn *grpc.ClientConn) (PluginCapabilitySet, error) {
209+
client := csi.NewIdentityClient(conn)
210+
req := csi.GetPluginCapabilitiesRequest{}
211+
rsp, err := client.GetPluginCapabilities(ctx, &req)
212+
if err != nil {
213+
return nil, err
214+
}
215+
caps := PluginCapabilitySet{}
216+
for _, cap := range rsp.GetCapabilities() {
217+
if cap == nil {
218+
continue
219+
}
220+
srv := cap.GetService()
221+
if srv == nil {
222+
continue
223+
}
224+
t := srv.GetType()
225+
caps[t] = true
226+
}
227+
return caps, nil
228+
}
229+
230+
// ControllerCapabilitySet is set of CSI controller capabilities. Only supported capabilities are in the map.
231+
type ControllerCapabilitySet map[csi.ControllerServiceCapability_RPC_Type]bool
232+
233+
// GetControllerCapabilities returns set of supported controller capabilities of CSI driver.
234+
func GetControllerCapabilities(ctx context.Context, conn *grpc.ClientConn) (ControllerCapabilitySet, error) {
235+
client := csi.NewControllerClient(conn)
236+
req := csi.ControllerGetCapabilitiesRequest{}
237+
rsp, err := client.ControllerGetCapabilities(ctx, &req)
238+
if err != nil {
239+
return nil, err
240+
}
241+
242+
caps := ControllerCapabilitySet{}
243+
for _, cap := range rsp.GetCapabilities() {
244+
if cap == nil {
245+
continue
246+
}
247+
rpc := cap.GetRpc()
248+
if rpc == nil {
249+
continue
250+
}
251+
t := rpc.GetType()
252+
caps[t] = true
253+
}
254+
return caps, nil
255+
}
256+
257+
// ProbeForever calls Probe() of a CSI driver and waits until the driver becomes ready.
258+
// Any error other than timeout is returned.
259+
func ProbeForever(conn *grpc.ClientConn, singleProbeTimeout time.Duration) error {
260+
for {
261+
klog.Info("Probing CSI driver for readiness")
262+
ready, err := probeOnce(conn, singleProbeTimeout)
263+
if err != nil {
264+
st, ok := status.FromError(err)
265+
if !ok {
266+
// This is not gRPC error. The probe must have failed before gRPC
267+
// method was called, otherwise we would get gRPC error.
268+
return fmt.Errorf("CSI driver probe failed: %s", err)
269+
}
270+
if st.Code() != codes.DeadlineExceeded {
271+
return fmt.Errorf("CSI driver probe failed: %s", err)
272+
}
273+
// Timeout -> driver is not ready. Fall through to sleep() below.
274+
klog.Warning("CSI driver probe timed out")
275+
} else {
276+
if ready {
277+
return nil
278+
}
279+
klog.Warning("CSI driver is not ready")
280+
}
281+
// Timeout was returned or driver is not ready.
282+
time.Sleep(probeInterval)
283+
}
284+
}
285+
286+
// probeOnce is a helper to simplify defer cancel()
287+
func probeOnce(conn *grpc.ClientConn, timeout time.Duration) (bool, error) {
288+
ctx, cancel := context.WithTimeout(context.Background(), timeout)
289+
defer cancel()
290+
return Probe(ctx, conn)
291+
}
292+
293+
// Probe calls driver Probe() just once and returns its result without any processing.
294+
func Probe(ctx context.Context, conn *grpc.ClientConn) (ready bool, err error) {
295+
client := csi.NewIdentityClient(conn)
296+
297+
req := csi.ProbeRequest{}
298+
rsp, err := client.Probe(ctx, &req)
299+
300+
if err != nil {
301+
return false, err
302+
}
303+
304+
r := rsp.GetReady()
305+
if r == nil {
306+
// "If not present, the caller SHALL assume that the plugin is in a ready state"
307+
return true, nil
308+
}
309+
return r.GetValue(), nil
310+
}

0 commit comments

Comments
 (0)