Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions router-tests/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ require (
github.com/twmb/franz-go/pkg/kadm v1.11.0
github.com/wundergraph/cosmo/demo v0.0.0-20250422163037-46e4333ac50e
github.com/wundergraph/cosmo/router v0.0.0-20250422163037-46e4333ac50e
github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.172
github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.172.0.20250424191051-430af8af6c64
go.opentelemetry.io/otel v1.28.0
go.opentelemetry.io/otel/sdk v1.28.0
go.opentelemetry.io/otel/sdk/metric v1.28.0
Expand Down Expand Up @@ -177,7 +177,7 @@ require (
replace (
github.com/wundergraph/cosmo/demo => ../demo
github.com/wundergraph/cosmo/router => ../router
// github.com/wundergraph/graphql-go-tools/v2 => ../../graphql-go-tools/v2
//github.com/wundergraph/graphql-go-tools/v2 => ../../graphql-go-tools/v2
)

replace github.com/hashicorp/consul/sdk => github.com/wundergraph/consul/sdk v0.0.0-20250204115147-ed842a8fd301
4 changes: 2 additions & 2 deletions router-tests/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -323,8 +323,8 @@ github.com/wundergraph/astjson v0.0.0-20250106123708-be463c97e083 h1:8/D7f8gKxTB
github.com/wundergraph/astjson v0.0.0-20250106123708-be463c97e083/go.mod h1:eOTL6acwctsN4F3b7YE+eE2t8zcJ/doLm9sZzsxxxrE=
github.com/wundergraph/consul/sdk v0.0.0-20250204115147-ed842a8fd301 h1:EzfKHQoTjFDDcgaECCCR2aTePqMu9QBmPbyhqIYOhV0=
github.com/wundergraph/consul/sdk v0.0.0-20250204115147-ed842a8fd301/go.mod h1:wxI0Nak5dI5RvJuzGyiEK4nZj0O9X+Aw6U0tC1wPKq0=
github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.172 h1:jrme3DFqhEp+mKYbg4brcc+Y4nw6PIQmPed0uG00pVI=
github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.172/go.mod h1:B7eV0Qh8Lop9QzIOQcsvKp3S0ejfC6mgyWoJnI917yQ=
github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.172.0.20250424191051-430af8af6c64 h1:liXxR8ddgPVY4uyYUTi1KEs0eQr1Ttq3ns4go+4IlG4=
github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.172.0.20250424191051-430af8af6c64/go.mod h1:B7eV0Qh8Lop9QzIOQcsvKp3S0ejfC6mgyWoJnI917yQ=
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb h1:zGWFAtiMcyryUHoUjUJX0/lt1H2+i2Ka2n+D3DImSNo=
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
Expand Down
15 changes: 8 additions & 7 deletions router-tests/structured_logging_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,18 @@ import (
"bytes"
"encoding/json"
"fmt"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/sdk/metric"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"math"
"net/http"
"os"
"path/filepath"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/sdk/metric"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"github.com/wundergraph/cosmo/router-tests/testenv"
"github.com/wundergraph/cosmo/router/core"
"github.com/wundergraph/cosmo/router/pkg/config"
Expand Down Expand Up @@ -193,7 +194,7 @@ func TestAccessLogsFileOutput(t *testing.T) {
require.NoError(t, os.RemoveAll(fp))
})

logger := logging.NewZapAccessLogger(f, false, false)
logger := logging.NewZapAccessLogger(f, zap.InfoLevel, false, false, false)
require.NoError(t, err)

testenv.Run(t, &testenv.Config{
Expand Down Expand Up @@ -249,7 +250,7 @@ func TestAccessLogsFileOutput(t *testing.T) {
require.NoError(t, os.RemoveAll(fp))
})

logger := logging.NewZapAccessLogger(f, false, false)
logger := logging.NewZapAccessLogger(f, zap.InfoLevel, false, false, false)
require.NoError(t, err)

testenv.Run(t, &testenv.Config{
Expand Down
6 changes: 4 additions & 2 deletions router-tests/telemetry/telemetry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6766,7 +6766,8 @@ func TestFlakyTelemetry(t *testing.T) {
require.Equal(t, trace.SpanKindInternal, sn[8].SpanKind())
require.Equal(t, codes.Error, sn[8].Status().Code)
require.Lenf(t, sn[8].Attributes(), 14, "expected 14 attributes, got %d", len(sn[8].Attributes()))
require.Contains(t, sn[8].Status().Description, "connect: connection refused\nFailed to fetch from Subgraph 'products' at Path: 'employees'.")
require.Contains(t, sn[8].Status().Description, "Failed to fetch from Subgraph 'products' at Path: 'employees'.")
require.Contains(t, sn[8].Status().Description, "connect: connection refused")

events := sn[8].Events()
require.Len(t, events, 1, "expected 1 event because the GraphQL request failed")
Expand All @@ -6776,7 +6777,8 @@ func TestFlakyTelemetry(t *testing.T) {
require.Equal(t, "query unnamed", sn[10].Name())
require.Equal(t, trace.SpanKindServer, sn[10].SpanKind())
require.Equal(t, codes.Error, sn[10].Status().Code)
require.Contains(t, sn[10].Status().Description, "connect: connection refused\nFailed to fetch from Subgraph 'products' at Path: 'employees'.")
require.Contains(t, sn[10].Status().Description, "Failed to fetch from Subgraph 'products' at Path: 'employees'.")
require.Contains(t, sn[10].Status().Description, "connect: connection refused")
})
})

Expand Down
4 changes: 2 additions & 2 deletions router-tests/testenv/testenv.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,14 +408,14 @@ func CreateTestEnv(t testing.TB, cfg *Config) (*Environment, error) {
if oc := cfg.LogObservation; oc.Enabled {
var zCore zapcore.Core
zCore, logObserver = observer.New(oc.LogLevel)
cfg.Logger = logging.NewZapLoggerWithCore(zCore, true)
cfg.Logger = logging.NewZapLoggerWithCore(zCore, true, true)
} else {
ec := zap.NewProductionEncoderConfig()
ec.EncodeDuration = zapcore.SecondsDurationEncoder
ec.TimeKey = "time"

syncer := zapcore.AddSync(os.Stderr)
cfg.Logger = logging.NewZapLogger(syncer, false, true, zapcore.ErrorLevel)
cfg.Logger = logging.NewZapLogger(syncer, false, true, true, zapcore.ErrorLevel)
}

if cfg.AccessLogger == nil {
Expand Down
17 changes: 13 additions & 4 deletions router/cmd/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,13 @@ func NewRouter(ctx context.Context, params Params, additionalOptions ...core.Opt
Attributes: cfg.AccessLogs.Router.Fields,
SubgraphEnabled: cfg.AccessLogs.Subgraphs.Enabled,
SubgraphAttributes: cfg.AccessLogs.Subgraphs.Fields,
Level: cfg.AccessLogs.Level,
AddStacktrace: cfg.AccessLogs.AddStacktrace,
}

level, err := logging.ZapLogLevelFromString(cfg.AccessLogs.Level)
if err != nil {
return nil, fmt.Errorf("could not parse log level: %w for access logs", err)
}

if cfg.AccessLogs.Output.File.Enabled {
Expand All @@ -168,33 +175,35 @@ func NewRouter(ctx context.Context, params Params, additionalOptions ...core.Opt
WS: f,
BufferSize: int(cfg.AccessLogs.Buffer.Size.Uint64()),
FlushInterval: cfg.AccessLogs.Buffer.FlushInterval,
Stacktrace: c.AddStacktrace,
Development: cfg.DevelopmentMode,
Level: zap.InfoLevel,
Level: level,
Pretty: !cfg.JSONLog,
})
if err != nil {
return nil, fmt.Errorf("could not create buffered logger: %w", err)
}
c.Logger = bl.Logger
} else {
c.Logger = logging.NewZapAccessLogger(f, cfg.DevelopmentMode, !cfg.JSONLog)
c.Logger = logging.NewZapAccessLogger(f, level, c.AddStacktrace, cfg.DevelopmentMode, !cfg.JSONLog)
}
} else if cfg.AccessLogs.Output.Stdout.Enabled {
if cfg.AccessLogs.Buffer.Enabled {
bl, err := logging.NewJSONZapBufferedLogger(logging.BufferedLoggerOptions{
WS: os.Stdout,
BufferSize: int(cfg.AccessLogs.Buffer.Size.Uint64()),
FlushInterval: cfg.AccessLogs.Buffer.FlushInterval,
Stacktrace: cfg.AccessLogs.AddStacktrace,
Development: cfg.DevelopmentMode,
Level: zap.InfoLevel,
Level: level,
Pretty: !cfg.JSONLog,
})
if err != nil {
return nil, fmt.Errorf("could not create buffered logger: %w", err)
}
c.Logger = bl.Logger
} else {
c.Logger = logging.NewZapAccessLogger(os.Stdout, cfg.DevelopmentMode, !cfg.JSONLog)
c.Logger = logging.NewZapAccessLogger(os.Stdout, level, c.AddStacktrace, cfg.DevelopmentMode, !cfg.JSONLog)
}
}

Expand Down
12 changes: 9 additions & 3 deletions router/core/engine_loader_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import (
"context"
"errors"
"fmt"
"slices"
"time"

"github.com/wundergraph/cosmo/router/internal/requestlogger"
"github.com/wundergraph/cosmo/router/internal/unique"
"github.com/wundergraph/cosmo/router/pkg/metric"
Expand All @@ -17,8 +20,6 @@ import (
semconv "go.opentelemetry.io/otel/semconv/v1.21.0"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"slices"
"time"
)

var (
Expand Down Expand Up @@ -148,7 +149,12 @@ func (f *engineLoaderHooks) OnFinished(ctx context.Context, ds resolve.DataSourc
path = responseInfo.Request.URL.Path
}
}
f.accessLogger.Info(path, fields)

if responseInfo.Err != nil {
f.accessLogger.Error(path, fields)
} else {
f.accessLogger.Info(path, fields)
}
}

if responseInfo.Err != nil {
Expand Down
14 changes: 9 additions & 5 deletions router/core/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,14 @@ func getAggregatedSubgraphServiceNames(err error) []string {
return nil
}

// propagateSubgraphErrors propagates the subgraph errors to the request context
func propagateSubgraphErrors(ctx *resolve.Context) {
err := ctx.SubgraphErrors()
// trackResponseError propagates the errors during engine execution to the request context.
func trackResponseError(ctx *resolve.Context, err error) {
if err != nil {
trackFinalResponseError(ctx.Context(), err)
return
}

err = ctx.ExecutionError()
if err != nil {
trackFinalResponseError(ctx.Context(), err)
}
Expand Down Expand Up @@ -276,8 +280,8 @@ func writeMultipartError(
// before, some clients that rely on both CR and LF strictly to parse blocks were broken and not parsing our
// multipart chunks correctly. With this fix here (and in a few other places) the clients are now working.
resp = append(resp, []byte("\r\n--graphql--")...)
if _, err := w.Write([]byte(resp)); err != nil {

if _, err := w.Write(resp); err != nil {
return err
}

Expand Down
1 change: 1 addition & 0 deletions router/core/graph_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -896,6 +896,7 @@ func (s *graphServer) buildGraphMux(ctx context.Context,
requestlogger.WithFields(baseLogFields...),
requestlogger.WithAttributes(s.accessLogsConfig.Attributes),
requestlogger.WithExprAttributes(exprAttributes),
requestlogger.WithLogLevelHandler(LogLevelHandler),
requestlogger.WithFieldsHandler(RouterAccessLogsFieldHandler),
}

Expand Down
16 changes: 8 additions & 8 deletions router/core/graphql_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,13 +175,14 @@ func (h *GraphQLHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ctx = WithResponseHeaderPropagation(ctx)
}

defer propagateSubgraphErrors(ctx)

resp, err := h.executor.Resolver.ResolveGraphQLResponse(ctx, p.Response, nil, HeaderPropagationWriter(w, ctx.Context()))

// Respects resolver errors and execution errors
defer trackResponseError(ctx, err)

requestContext.dataSourceNames = getSubgraphNames(p.Response.DataSources)

if err != nil {
trackFinalResponseError(ctx.Context(), err)
h.WriteError(ctx, err, p.Response, w)
return
}
Expand All @@ -194,7 +195,6 @@ func (h *GraphQLHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
)
h.setDebugCacheHeaders(w, requestContext.operation)

defer propagateSubgraphErrors(ctx)
ctx, writer, ok = GetSubscriptionResponseWriter(ctx, r, w, h.apolloSubscriptionMultipartPrintBoundary)
if !ok {
requestContext.logger.Error("unable to get subscription response writer", zap.Error(errCouldNotFlushResponse))
Expand All @@ -211,25 +211,25 @@ func (h *GraphQLHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
err := h.executor.Resolver.ResolveGraphQLSubscription(ctx, p.Response, writer)
requestContext.dataSourceNames = getSubgraphNames(p.Response.Response.DataSources)

// Respects resolver errors and execution errors
defer trackResponseError(ctx, err)

if err != nil {
if errors.Is(err, context.Canceled) {
requestContext.logger.Debug("context canceled: unable to resolve subscription response", zap.Error(err))
trackFinalResponseError(r.Context(), err)
return
} else if errors.Is(err, ErrUnauthorized) {
trackFinalResponseError(ctx.Context(), err)
writeRequestErrors(r, w, http.StatusUnauthorized, graphqlerrors.RequestErrorsFromError(err), requestContext.logger)
return
}

requestContext.logger.Error("unable to resolve subscription response", zap.Error(err))
trackFinalResponseError(ctx.Context(), err)
writeRequestErrors(r, w, http.StatusInternalServerError, graphqlerrors.RequestErrorsFromError(errCouldNotResolveResponse), requestContext.logger)
return
}
default:
requestContext.logger.Error("unsupported plan kind")
trackFinalResponseError(ctx.Context(), errOperationPlanUnsupported)
trackResponseError(ctx, errOperationPlanUnsupported)
writeRequestErrors(r, w, http.StatusInternalServerError, graphqlerrors.RequestErrorsFromError(errOperationPlanUnsupported), requestContext.logger)
}
}
Expand Down
Loading
Loading