Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion internal/codecutil/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@ type MarshalError struct {

// Error implements the error interface.
func (e MarshalError) Error() string {
return fmt.Sprintf("cannot transform type %s to a BSON Document: %v",
return fmt.Sprintf("cannot marshal type %q to a BSON Document: %v",
reflect.TypeOf(e.Value), e.Err)
}

func (e MarshalError) Unwrap() error { return e.Err }

// EncoderFn is used to functionally construct an encoder for marshaling values.
type EncoderFn func(io.Writer) *bson.Encoder

Expand Down
14 changes: 7 additions & 7 deletions mongo/change_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,15 +376,15 @@ AggregateExecuteLoop:
}
}
if err != nil {
cs.err = replaceErrors(err)
cs.err = wrapErrors(err)
return cs.err
}

cr := cs.aggregate.ResultCursorResponse()
cr.Server = server

cs.cursor, cs.err = driver.NewBatchCursor(cr, cs.sess, cs.client.clock, cs.cursorOptions)
if cs.err = replaceErrors(cs.err); cs.err != nil {
if cs.err = wrapErrors(cs.err); cs.err != nil {
return cs.Err()
}

Expand Down Expand Up @@ -597,13 +597,13 @@ func (cs *ChangeStream) Decode(val interface{}) error {
// Err returns the last error seen by the change stream, or nil if no errors has occurred.
func (cs *ChangeStream) Err() error {
if cs.err != nil {
return replaceErrors(cs.err)
return wrapErrors(cs.err)
}
if cs.cursor == nil {
return nil
}

return replaceErrors(cs.cursor.Err())
return wrapErrors(cs.cursor.Err())
}

// Close closes this change stream and the underlying cursor. Next and TryNext must not be called after Close has been
Expand All @@ -619,7 +619,7 @@ func (cs *ChangeStream) Close(ctx context.Context) error {
return nil // cursor is already closed
}

cs.err = replaceErrors(cs.cursor.Close(ctx))
cs.err = wrapErrors(cs.cursor.Close(ctx))
cs.cursor = nil
return cs.Err()
}
Expand Down Expand Up @@ -678,7 +678,7 @@ func (cs *ChangeStream) next(ctx context.Context, nonBlocking bool) bool {
if len(cs.batch) == 0 {
cs.loopNext(ctx, nonBlocking)
if cs.err != nil {
cs.err = replaceErrors(cs.err)
cs.err = wrapErrors(cs.err)
return false
}
if len(cs.batch) == 0 {
Expand Down Expand Up @@ -719,7 +719,7 @@ func (cs *ChangeStream) loopNext(ctx context.Context, nonBlocking bool) {
return
}

cs.err = replaceErrors(cs.cursor.Err())
cs.err = wrapErrors(cs.cursor.Err())
if cs.err == nil {
// Check if cursor is alive
if cs.ID() == 0 {
Expand Down
16 changes: 8 additions & 8 deletions mongo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func newClient(opts ...*options.ClientOptions) (*Client, error) {
if client.deployment == nil {
client.deployment, err = topology.New(cfg)
if err != nil {
return nil, replaceErrors(err)
return nil, wrapErrors(err)
}
}

Expand All @@ -261,7 +261,7 @@ func (c *Client) connect() error {
if connector, ok := c.deployment.(driver.Connector); ok {
err := connector.Connect()
if err != nil {
return replaceErrors(err)
return wrapErrors(err)
}
}

Expand Down Expand Up @@ -293,7 +293,7 @@ func (c *Client) connect() error {
if subscriber, ok := c.deployment.(driver.Subscriber); ok {
sub, err := subscriber.Subscribe()
if err != nil {
return replaceErrors(err)
return wrapErrors(err)
}
updateChan = sub.Updates
}
Expand Down Expand Up @@ -350,7 +350,7 @@ func (c *Client) Disconnect(ctx context.Context) error {
}

if disconnector, ok := c.deployment.(driver.Disconnector); ok {
return replaceErrors(disconnector.Disconnect(ctx))
return wrapErrors(disconnector.Disconnect(ctx))
}

return nil
Expand Down Expand Up @@ -381,7 +381,7 @@ func (c *Client) Ping(ctx context.Context, rp *readpref.ReadPref) error {
{"ping", 1},
}, options.RunCmd().SetReadPreference(rp))

return replaceErrors(res.Err())
return wrapErrors(res.Err())
}

// StartSession starts a new session configured with the given options.
Expand Down Expand Up @@ -434,7 +434,7 @@ func (c *Client) StartSession(opts ...options.Lister[options.SessionOptions]) (*

sess, err := session.NewClientSession(c.sessionPool, c.id, coreOpts)
if err != nil {
return nil, replaceErrors(err)
return nil, wrapErrors(err)
}

return &Session{
Expand Down Expand Up @@ -741,7 +741,7 @@ func (c *Client) ListDatabases(ctx context.Context, filter interface{}, opts ...

err = op.Execute(ctx)
if err != nil {
return ListDatabasesResult{}, replaceErrors(err)
return ListDatabasesResult{}, wrapErrors(err)
}

return newListDatabasesResultFromOperation(op.Result()), nil
Expand Down Expand Up @@ -965,7 +965,7 @@ func (c *Client) BulkWrite(ctx context.Context, writes []ClientBulkWrite,
op.result.Acknowledged = acknowledged
op.result.HasVerboseResults = !op.errorsOnly
err = op.execute(ctx)
return &op.result, replaceErrors(err)
return &op.result, wrapErrors(err)
}

// newLogger will use the LoggerOptions to create an internal logger and publish
Expand Down
22 changes: 11 additions & 11 deletions mongo/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func (coll *Collection) BulkWrite(ctx context.Context, models []WriteModel,

err = op.execute(ctx)

return &op.result, replaceErrors(err)
return &op.result, wrapErrors(err)
}

func (coll *Collection) insert(
Expand Down Expand Up @@ -1049,15 +1049,15 @@ func aggregate(a aggregateParams, opts ...options.Lister[options.AggregateOption
if errors.As(err, &wce) && wce.WriteConcernError != nil {
return nil, *convertDriverWriteConcernError(wce.WriteConcernError)
}
return nil, replaceErrors(err)
return nil, wrapErrors(err)
}

bc, err := op.Result(cursorOpts)
if err != nil {
return nil, replaceErrors(err)
return nil, wrapErrors(err)
}
cursor, err := newCursorWithSession(bc, a.client.bsonOpts, a.registry, sess)
return cursor, replaceErrors(err)
return cursor, wrapErrors(err)
}

// CountDocuments returns the number of documents in the collection. For a fast count of the documents in the
Expand Down Expand Up @@ -1132,7 +1132,7 @@ func (coll *Collection) CountDocuments(ctx context.Context, filter interface{},

err = op.Execute(ctx)
if err != nil {
return 0, replaceErrors(err)
return 0, wrapErrors(err)
}

batch := op.ResultCursorResponse().FirstBatch
Expand Down Expand Up @@ -1213,7 +1213,7 @@ func (coll *Collection) EstimatedDocumentCount(
op.Retry(retry)

err = op.Execute(ctx)
return op.Result().N, replaceErrors(err)
return op.Result().N, wrapErrors(err)
}

// Distinct executes a distinct command to find the unique values for a specified field in the collection.
Expand Down Expand Up @@ -1302,7 +1302,7 @@ func (coll *Collection) Distinct(

err = op.Execute(ctx)
if err != nil {
return &DistinctResult{err: replaceErrors(err)}
return &DistinctResult{err: wrapErrors(err)}
}

arr, ok := op.Result().Values.ArrayOK()
Expand Down Expand Up @@ -1504,12 +1504,12 @@ func (coll *Collection) find(
op = op.Retry(retry)

if err = op.Execute(ctx); err != nil {
return nil, replaceErrors(err)
return nil, wrapErrors(err)
}

bc, err := op.Result(cursorOpts)
if err != nil {
return nil, replaceErrors(err)
return nil, wrapErrors(err)
}
return newCursorWithSession(bc, coll.bsonOpts, coll.registry, sess)
}
Expand Down Expand Up @@ -1560,7 +1560,7 @@ func (coll *Collection) FindOne(ctx context.Context, filter interface{},
cur: cursor,
bsonOpts: coll.bsonOpts,
reg: coll.registry,
err: replaceErrors(err),
err: wrapErrors(err),
}
}

Expand Down Expand Up @@ -2044,7 +2044,7 @@ func (coll *Collection) drop(ctx context.Context) error {
// ignore namespace not found errors
var driverErr driver.Error
if !errors.As(err, &driverErr) || !driverErr.NamespaceNotFound() {
return replaceErrors(err)
return wrapErrors(err)
}
return nil
}
Expand Down
6 changes: 3 additions & 3 deletions mongo/cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func (c *Cursor) next(ctx context.Context, nonBlocking bool) bool {
// If we don't have a next batch
if !c.bc.Next(ctx) {
// Do we have an error? If so we return false.
c.err = replaceErrors(c.bc.Err())
c.err = wrapErrors(c.bc.Err())
if c.err != nil {
return false
}
Expand Down Expand Up @@ -289,7 +289,7 @@ func (c *Cursor) Err() error { return c.err }
// the first call, any subsequent calls will not change the state.
func (c *Cursor) Close(ctx context.Context) error {
defer c.closeImplicitSession()
return replaceErrors(c.bc.Close(ctx))
return wrapErrors(c.bc.Close(ctx))
}

// All iterates the cursor and decodes each document into results. The results parameter must be a pointer to a slice.
Expand Down Expand Up @@ -336,7 +336,7 @@ func (c *Cursor) All(ctx context.Context, results interface{}) error {
batch = c.bc.Batch()
}

if err = replaceErrors(c.bc.Err()); err != nil {
if err = wrapErrors(c.bc.Err()); err != nil {
return err
}

Expand Down
18 changes: 9 additions & 9 deletions mongo/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ func (db *Database) RunCommandCursor(
op, sess, err := db.processRunCommand(ctx, runCommand, true, opts...)
if err != nil {
closeImplicitSession(sess)
return nil, replaceErrors(err)
return nil, wrapErrors(err)
}

if err = op.Execute(ctx); err != nil {
Expand All @@ -297,16 +297,16 @@ func (db *Database) RunCommandCursor(
return nil, errors.New(
"database response does not contain a cursor; try using RunCommand instead")
}
return nil, replaceErrors(err)
return nil, wrapErrors(err)
}

bc, err := op.ResultCursor()
if err != nil {
closeImplicitSession(sess)
return nil, replaceErrors(err)
return nil, wrapErrors(err)
}
cursor, err := newCursorWithSession(bc, db.bsonOpts, db.registry, sess)
return cursor, replaceErrors(err)
return cursor, wrapErrors(err)
}

// Drop drops the database on the server. This method ignores "namespace not found" errors so it is safe to drop
Expand Down Expand Up @@ -347,7 +347,7 @@ func (db *Database) Drop(ctx context.Context) error {

var driverErr driver.Error
if err != nil && (!errors.As(err, &driverErr) || !driverErr.NamespaceNotFound()) {
return replaceErrors(err)
return wrapErrors(err)
}
return nil
}
Expand Down Expand Up @@ -497,16 +497,16 @@ func (db *Database) ListCollections(
err = op.Execute(ctx)
if err != nil {
closeImplicitSession(sess)
return nil, replaceErrors(err)
return nil, wrapErrors(err)
}

bc, err := op.Result(cursorOpts)
if err != nil {
closeImplicitSession(sess)
return nil, replaceErrors(err)
return nil, wrapErrors(err)
}
cursor, err := newCursorWithSession(bc, db.bsonOpts, db.registry, sess)
return cursor, replaceErrors(err)
return cursor, wrapErrors(err)
}

// ListCollectionNames executes a listCollections command and returns a slice containing the names of the collections
Expand Down Expand Up @@ -944,7 +944,7 @@ func (db *Database) executeCreateOperation(ctx context.Context, op *operation.Cr
Deployment(db.client.deployment).
Crypt(db.client.cryptFLE)

return replaceErrors(op.Execute(ctx))
return wrapErrors(op.Execute(ctx))
}

// GridFSBucket is used to construct a GridFS bucket which can be used as a
Expand Down
Loading
Loading