Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions event/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package event // import "go.mongodb.org/mongo-driver/event"

import (
"context"
"time"

"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
Expand All @@ -32,7 +33,9 @@ type CommandStartedEvent struct {

// CommandFinishedEvent represents a generic command finishing.
type CommandFinishedEvent struct {
// Deprecated: Use Duration instead.
DurationNanos int64
Duration time.Duration
CommandName string
RequestID int64
ConnectionID string
Expand Down Expand Up @@ -157,15 +160,19 @@ type ServerHeartbeatStartedEvent struct {

// ServerHeartbeatSucceededEvent is an event generated when the heartbeat succeeds.
type ServerHeartbeatSucceededEvent struct {
// Deprecated: Use Duration instead.
DurationNanos int64
Duration time.Duration
Reply description.Server
ConnectionID string // The address this heartbeat was sent to with a unique identifier
Awaited bool // If this heartbeat was awaitable
}

// ServerHeartbeatFailedEvent is an event generated when the heartbeat fails.
type ServerHeartbeatFailedEvent struct {
// Deprecated: Use Duration instead.
DurationNanos int64
Duration time.Duration
Failure error
ConnectionID string // The address this heartbeat was sent to with a unique identifier
Awaited bool // If this heartbeat was awaitable
Expand Down
8 changes: 5 additions & 3 deletions mongo/integration/unified/client_entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ import (
)

// Security-sensitive commands that should be ignored in command monitoring by default.
var securitySensitiveCommands = []string{"authenticate", "saslStart", "saslContinue", "getnonce",
"createUser", "updateUser", "copydbgetnonce", "copydbsaslstart", "copydb"}
var securitySensitiveCommands = []string{
"authenticate", "saslStart", "saslContinue", "getnonce",
"createUser", "updateUser", "copydbgetnonce", "copydbsaslstart", "copydb",
}

// clientEntity is a wrapper for a mongo.Client object that also holds additional information required during test
// execution.
Expand Down Expand Up @@ -291,7 +293,7 @@ func (c *clientEntity) processFailedEvent(_ context.Context, evt *event.CommandF
bsonBuilder := bsoncore.NewDocumentBuilder().
AppendString("name", "CommandFailedEvent").
AppendDouble("observedAt", getSecondsSinceEpoch()).
AppendInt64("durationNanos", evt.DurationNanos).
AppendInt64("duration", int64(evt.Duration)).
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is the bson used? Why do other events not use Duration (non-exhaustive)?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This BSON document is used for test assertions. It must remain the same for the test assertions to continue working.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The structure of this BSON document must remain the same for test assertions to continue working.

Suggested change
AppendInt64("duration", int64(evt.Duration)).
AppendInt64("durationNanos", duration.Nanoseconds()).

AppendString("commandName", evt.CommandName).
AppendInt64("requestId", evt.RequestID).
AppendString("connectionId", evt.ConnectionID).
Expand Down
17 changes: 9 additions & 8 deletions x/mongo/driver/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,7 @@ func (op Operation) Execute(ctx context.Context) error {
if err == nil {
// roundtrip using either the full roundTripper or a special one for when the moreToCome
// flag is set
var roundTrip = op.roundTrip
roundTrip := op.roundTrip
if moreToCome {
roundTrip = op.moreToComeRoundTrip
}
Expand Down Expand Up @@ -981,8 +981,8 @@ func (op Operation) createWireMessage(
dst []byte,
desc description.SelectedServer,
maxTimeMS uint64,
conn Connection) ([]byte, startedInformation, error) {

conn Connection,
) ([]byte, startedInformation, error) {
// If topology is not LoadBalanced, API version is not declared, and wire version is unknown
// or less than 6, use OP_QUERY. Otherwise, use OP_MSG.
if desc.Kind != description.LoadBalanced && op.ServerAPI == nil &&
Expand Down Expand Up @@ -1074,8 +1074,8 @@ func (op Operation) createQueryWireMessage(maxTimeMS uint64, dst []byte, desc de
}

func (op Operation) createMsgWireMessage(ctx context.Context, maxTimeMS uint64, dst []byte, desc description.SelectedServer,
conn Connection) ([]byte, startedInformation, error) {

conn Connection,
) ([]byte, startedInformation, error) {
var info startedInformation
var flags wiremessage.MsgFlag
var wmindex int32
Expand Down Expand Up @@ -1731,17 +1731,18 @@ func (op Operation) publishFinishedEvent(ctx context.Context, info finishedInfor
return
}

var durationNanos int64
var duration time.Duration
var emptyTime time.Time
if info.startTime != emptyTime {
durationNanos = time.Since(info.startTime).Nanoseconds()
duration = time.Since(info.startTime)
}

finished := event.CommandFinishedEvent{
CommandName: info.cmdName,
RequestID: int64(info.requestID),
ConnectionID: info.connID,
DurationNanos: durationNanos,
DurationNanos: duration.Nanoseconds(),
Duration: duration,
ServerConnectionID: info.serverConnID,
ServiceID: info.serviceID,
}
Expand Down
24 changes: 14 additions & 10 deletions x/mongo/driver/topology/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -705,7 +705,7 @@ func (s *Server) createBaseOperation(conn driver.Connection) *operation.Hello {
func (s *Server) check() (description.Server, error) {
var descPtr *description.Server
var err error
var durationNanos int64
var duration time.Duration

// Create a new connection if this is the first check, the connection was closed after an error during the previous
// check, or the previous check was cancelled.
Expand Down Expand Up @@ -760,19 +760,19 @@ func (s *Server) check() (description.Server, error) {
s.conn.setSocketTimeout(s.cfg.heartbeatTimeout)
err = baseOperation.Execute(s.heartbeatCtx)
}
durationNanos = time.Since(start).Nanoseconds()
duration = time.Since(start)

if err == nil {
tempDesc := baseOperation.Result(s.address)
descPtr = &tempDesc
s.publishServerHeartbeatSucceededEvent(s.conn.ID(), durationNanos, tempDesc, s.conn.getCurrentlyStreaming() || streamable)
s.publishServerHeartbeatSucceededEvent(s.conn.ID(), duration, tempDesc, s.conn.getCurrentlyStreaming() || streamable)
} else {
// Close the connection here rather than below so we ensure we're not closing a connection that wasn't
// successfully created.
if s.conn != nil {
_ = s.conn.close()
}
s.publishServerHeartbeatFailedEvent(s.conn.ID(), durationNanos, err, s.conn.getCurrentlyStreaming() || streamable)
s.publishServerHeartbeatFailedEvent(s.conn.ID(), duration, err, s.conn.getCurrentlyStreaming() || streamable)
}
}

Expand Down Expand Up @@ -901,11 +901,13 @@ func (s *Server) publishServerHeartbeatStartedEvent(connectionID string, await b

// publishes a ServerHeartbeatSucceededEvent to indicate hello has succeeded
func (s *Server) publishServerHeartbeatSucceededEvent(connectionID string,
durationNanos int64,
duration time.Duration,
desc description.Server,
await bool) {
await bool,
) {
serverHeartbeatSucceeded := &event.ServerHeartbeatSucceededEvent{
DurationNanos: durationNanos,
DurationNanos: duration.Nanoseconds(),
Duration: duration,
Reply: desc,
ConnectionID: connectionID,
Awaited: await,
Expand All @@ -918,11 +920,13 @@ func (s *Server) publishServerHeartbeatSucceededEvent(connectionID string,

// publishes a ServerHeartbeatFailedEvent to indicate hello has failed
func (s *Server) publishServerHeartbeatFailedEvent(connectionID string,
durationNanos int64,
duration time.Duration,
err error,
await bool) {
await bool,
) {
serverHeartbeatFailed := &event.ServerHeartbeatFailedEvent{
DurationNanos: durationNanos,
DurationNanos: duration.Nanoseconds(),
Duration: duration,
Failure: err,
ConnectionID: connectionID,
Awaited: await,
Expand Down