Skip to content

Commit e142f6e

Browse files
committed
Fix other error checks to use dns error response; properly report errors from kafka client dns resolution
1 parent ca6c539 commit e142f6e

File tree

12 files changed

+163
-56
lines changed

12 files changed

+163
-56
lines changed

src/kafka-util/src/client.rs

Lines changed: 102 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,9 @@ enum BrokerRewriteHandle {
295295
/// For _default_ ssh tunnels, we store an error if _creation_
296296
/// of the tunnel failed, so that `tunnel_status` can return it.
297297
FailedDefaultSshTunnel(String),
298+
/// We store an error if DNS resolution fails when resolving
299+
/// a new broker host.
300+
FailedDNSResolution(String),
298301
}
299302

300303
/// Tunneling clients
@@ -309,6 +312,24 @@ pub enum TunnelConfig {
309312
None,
310313
}
311314

315+
/// Status of all active ssh tunnels and direct broker connections for a `TunnelingClientContext`.
316+
#[derive(Clone)]
317+
pub struct TunnelingClientStatus {
318+
/// Status of all active ssh tunnels.
319+
pub ssh_status: SshTunnelStatus,
320+
/// Status of direct broker connections.
321+
pub broker_status: BrokerStatus,
322+
}
323+
324+
/// Status of direct broker connections for a `TunnelingClientContext`.
325+
#[derive(Clone)]
326+
pub enum BrokerStatus {
327+
/// The broker connections are nominal.
328+
Nominal,
329+
/// At least one broker connection has failed.
330+
Failed(String),
331+
}
332+
312333
/// A client context that supports rewriting broker addresses.
313334
#[derive(Clone)]
314335
pub struct TunnelingClientContext<C> {
@@ -391,19 +412,22 @@ impl<C> TunnelingClientContext<C> {
391412
&self.inner
392413
}
393414

394-
/// Returns a _consolidated_ `SshTunnelStatus` that communicates the status
395-
/// of all active ssh tunnels `self` knows about.
396-
pub fn tunnel_status(&self) -> SshTunnelStatus {
397-
self.rewrites
398-
.lock()
399-
.expect("poisoned")
415+
/// Returns a `TunnelingClientStatus` that contains a _consolidated_ `SshTunnelStatus` to
416+
/// communicates the status of all active ssh tunnels `self` knows about, and a `BrokerStatus`
417+
/// that contains a _consolidated_ status of all direct broker connections.
418+
pub fn tunnel_status(&self) -> TunnelingClientStatus {
419+
let rewrites = self.rewrites.lock().expect("poisoned");
420+
421+
let ssh_status = rewrites
400422
.values()
401423
.map(|handle| match handle {
402424
BrokerRewriteHandle::SshTunnel(s) => s.check_status(),
403425
BrokerRewriteHandle::FailedDefaultSshTunnel(e) => {
404426
SshTunnelStatus::Errored(e.clone())
405427
}
406-
BrokerRewriteHandle::Simple(_) => SshTunnelStatus::Running,
428+
BrokerRewriteHandle::Simple(_) | BrokerRewriteHandle::FailedDNSResolution(_) => {
429+
SshTunnelStatus::Running
430+
}
407431
})
408432
.fold(SshTunnelStatus::Running, |acc, status| {
409433
match (acc, status) {
@@ -418,7 +442,27 @@ impl<C> TunnelingClientContext<C> {
418442
SshTunnelStatus::Running
419443
}
420444
}
445+
});
446+
447+
let broker_status = rewrites
448+
.values()
449+
.map(|handle| match handle {
450+
BrokerRewriteHandle::FailedDNSResolution(e) => BrokerStatus::Failed(e.clone()),
451+
_ => BrokerStatus::Nominal,
421452
})
453+
.fold(BrokerStatus::Nominal, |acc, status| match (acc, status) {
454+
(BrokerStatus::Nominal, BrokerStatus::Failed(e))
455+
| (BrokerStatus::Failed(e), BrokerStatus::Nominal) => BrokerStatus::Failed(e),
456+
(BrokerStatus::Failed(err), BrokerStatus::Failed(e)) => {
457+
BrokerStatus::Failed(format!("{}, {}", err, e))
458+
}
459+
(BrokerStatus::Nominal, BrokerStatus::Nominal) => BrokerStatus::Nominal,
460+
});
461+
462+
TunnelingClientStatus {
463+
ssh_status,
464+
broker_status,
465+
}
422466
}
423467
}
424468

@@ -441,7 +485,8 @@ where
441485
port: Some(addr.port()),
442486
}
443487
}
444-
BrokerRewriteHandle::FailedDefaultSshTunnel(_) => {
488+
BrokerRewriteHandle::FailedDefaultSshTunnel(_)
489+
| BrokerRewriteHandle::FailedDNSResolution(_) => {
445490
unreachable!()
446491
}
447492
};
@@ -463,16 +508,30 @@ where
463508
let rewrite = self.rewrites.lock().expect("poisoned").get(&addr).cloned();
464509

465510
match rewrite {
466-
None | Some(BrokerRewriteHandle::FailedDefaultSshTunnel(_)) => {
511+
None
512+
| Some(BrokerRewriteHandle::FailedDefaultSshTunnel(_))
513+
| Some(BrokerRewriteHandle::FailedDNSResolution(_)) => {
467514
match &self.default_tunnel {
468515
TunnelConfig::Ssh(default_tunnel) => {
469516
// Multiple users could all run `connect` at the same time; only one ssh
470517
// tunnel will ever be connected, and only one will be inserted into the
471518
// map.
472519
let ssh_tunnel = self.runtime.block_on(async {
520+
// Ensure the default tunnel host is resolved to an external address.
521+
let resolved_tunnel_addr = resolve_external_address(
522+
&default_tunnel.host,
523+
self.enforce_external_addresses,
524+
)
525+
.await?;
526+
let tunnel_config = SshTunnelConfig {
527+
host: resolved_tunnel_addr.to_string(),
528+
port: default_tunnel.port,
529+
user: default_tunnel.user.clone(),
530+
key_pair: default_tunnel.key_pair.clone(),
531+
};
473532
self.ssh_tunnel_manager
474533
.connect(
475-
default_tunnel.clone(),
534+
tunnel_config,
476535
&addr.host,
477536
addr.port.parse().unwrap(),
478537
self.ssh_timeout_config,
@@ -487,6 +546,7 @@ where
487546
if matches!(
488547
o.get(),
489548
BrokerRewriteHandle::FailedDefaultSshTunnel(_)
549+
| BrokerRewriteHandle::FailedDNSResolution(_)
490550
) =>
491551
{
492552
o.insert(BrokerRewriteHandle::SshTunnel(
@@ -533,19 +593,42 @@ where
533593
TunnelConfig::None => {
534594
// If no rewrite is specified, we still should check that this potentially
535595
// new broker address is a global address.
536-
let rewrite = self.runtime.block_on(async {
537-
let resolved = resolve_external_address(
596+
self.runtime.block_on(async {
597+
match resolve_external_address(
538598
&addr.host,
539599
self.enforce_external_addresses,
540600
)
541601
.await
542-
.unwrap();
543-
BrokerRewriteHandle::Simple(BrokerRewrite {
544-
host: resolved.to_string(),
545-
port: addr.port.parse().ok(),
546-
})
547-
});
548-
return_rewrite(&rewrite)
602+
{
603+
Ok(resolved) => {
604+
let rewrite = BrokerRewriteHandle::Simple(BrokerRewrite {
605+
host: resolved.to_string(),
606+
port: addr.port.parse().ok(),
607+
});
608+
return_rewrite(&rewrite)
609+
}
610+
Err(e) => {
611+
warn!(
612+
"failed to resolve external address for {:?}: {}",
613+
addr,
614+
e.display_with_causes()
615+
);
616+
// Write an error if no one else has already written one.
617+
let mut rewrites = self.rewrites.lock().expect("poisoned");
618+
rewrites.entry(addr.clone()).or_insert_with(|| {
619+
BrokerRewriteHandle::FailedDNSResolution(
620+
e.to_string_with_causes(),
621+
)
622+
});
623+
// We have to give rdkafka an address, as this callback can't fail.
624+
BrokerAddr {
625+
host: "failed-dns-resolution.dev.materialize.com"
626+
.to_string(),
627+
port: 1337.to_string(),
628+
}
629+
}
630+
}
631+
})
549632
}
550633
}
551634
}

src/mysql-util/src/tunnel.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ impl Config {
239239
// the TLS hostname back to the actual upstream host and not the
240240
// TCP hostname.
241241
opts_builder = opts_builder.ssl_opts(Some(
242-
ssl_opts.clone().with_tls_hostname_override(Some(
242+
ssl_opts.clone().with_danger_tls_hostname_override(Some(
243243
self.inner.ip_or_hostname().to_string(),
244244
)),
245245
));

src/storage-types/src/connections.rs

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -547,8 +547,14 @@ impl KafkaConnection {
547547
.await?;
548548
let key_pair = SshKeyPair::from_bytes(&secret)?;
549549

550+
// Ensure any ssh-bastion address we connect to is resolved to an external address.
551+
let resolved = resolve_external_address(
552+
&ssh_tunnel.connection.host,
553+
ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
554+
)
555+
.await?;
550556
context.set_default_tunnel(TunnelConfig::Ssh(SshTunnelConfig {
551-
host: ssh_tunnel.connection.host.clone(),
557+
host: resolved.to_string(),
552558
port: ssh_tunnel.connection.port,
553559
user: ssh_tunnel.connection.user.clone(),
554560
key_pair,
@@ -567,25 +573,21 @@ impl KafkaConnection {
567573
};
568574
match &broker.tunnel {
569575
Tunnel::Direct => {
576+
// By default, don't override broker address lookup.
577+
//
578+
// N.B.
579+
//
570580
// We _could_ pre-setup the default ssh tunnel for all known brokers here, but
571581
// we avoid doing because:
572582
// - Its not necessary.
573583
// - Not doing so makes it easier to test the `FailedDefaultSshTunnel` path
574584
// in the `TunnelingClientContext`.
575-
576-
// Ensure any broker address we connect to is resolved to an external address.
577-
let resolved = resolve_external_address(
578-
&addr.host,
579-
ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
580-
)
581-
.await?;
582-
context.add_broker_rewrite(
583-
addr,
584-
BrokerRewrite {
585-
host: resolved.to_string(),
586-
port: None,
587-
},
588-
)
585+
//
586+
// NOTE that we do not need to use the `resolve_external_address` method to
587+
// validate the broker address here since it will be validated when the
588+
// connection is established in `src/kafka-util/src/client.rs`, and we do not
589+
// want to specify any BrokerRewrite that would override any default-tunnel
590+
// settings.
589591
}
590592
Tunnel::AwsPrivatelink(aws_privatelink) => {
591593
let host = mz_cloud_resources::vpc_endpoint_host(

src/storage-types/src/errors.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1026,7 +1026,7 @@ where
10261026
cx: &TunnelingClientContext<C>,
10271027
) -> Result<T, ContextCreationError> {
10281028
self.map_err(|e| {
1029-
if let SshTunnelStatus::Errored(e) = cx.tunnel_status() {
1029+
if let SshTunnelStatus::Errored(e) = cx.tunnel_status().ssh_status {
10301030
ContextCreationError::Ssh(anyhow!(e))
10311031
} else {
10321032
ContextCreationError::from(e)

src/storage/src/source/kafka.rs

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@ use chrono::NaiveDateTime;
2020
use differential_dataflow::{AsCollection, Collection};
2121
use futures::StreamExt;
2222
use maplit::btreemap;
23-
use mz_kafka_util::client::{get_partitions, MzClientContext, PartitionId, TunnelingClientContext};
23+
use mz_kafka_util::client::{
24+
get_partitions, BrokerStatus, MzClientContext, PartitionId, TunnelingClientContext,
25+
};
2426
use mz_ore::error::ErrorExt;
2527
use mz_ore::thread::{JoinHandleExt, UnparkOnDropHandle};
2628
use mz_repr::adt::timestamp::CheckedTimestamp;
@@ -384,21 +386,38 @@ impl SourceRender for KafkaSourceConnection {
384386
"kafka metadata thread: updated partition metadata info",
385387
);
386388

387-
// Clear all the health namespaces we know about.
388-
// Note that many kafka sources's don't have an ssh tunnel, but
389-
// the `health_operator` handles this fine.
390-
*status_report.lock().unwrap() = HealthStatus {
391-
kafka: Some(HealthStatusUpdate::running()),
392-
ssh: Some(HealthStatusUpdate::running()),
393-
};
389+
// Check to see if any broker errors have been hit
390+
match consumer.client().context().tunnel_status().broker_status
391+
{
392+
BrokerStatus::Failed(err) => {
393+
let status = HealthStatusUpdate::stalled(
394+
format!("broker error: {}", err),
395+
None,
396+
);
397+
*status_report.lock().unwrap() = HealthStatus {
398+
kafka: Some(status),
399+
ssh: None,
400+
};
401+
}
402+
BrokerStatus::Nominal => {
403+
// Clear all the health namespaces we know about.
404+
// Note that many kafka sources's don't have an ssh tunnel, but
405+
// the `health_operator` handles this fine.
406+
*status_report.lock().unwrap() = HealthStatus {
407+
kafka: Some(HealthStatusUpdate::running()),
408+
ssh: Some(HealthStatusUpdate::running()),
409+
};
410+
}
411+
}
394412
}
395413
Err(e) => {
396414
let kafka_status = Some(HealthStatusUpdate::stalled(
397415
format!("{}", e.display_with_causes()),
398416
None,
399417
));
400418

401-
let ssh_status = consumer.client().context().tunnel_status();
419+
let ssh_status =
420+
consumer.client().context().tunnel_status().ssh_status;
402421
let ssh_status = match ssh_status {
403422
SshTunnelStatus::Running => {
404423
Some(HealthStatusUpdate::running())

test/kafka-auth/test-kafka-ssl.td

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ running
121121
# ALTER CONNECTION for Kafka + SSH
122122

123123
! ALTER CONNECTION testdrive_no_reset_connections.public.ssh SET (HOST = 'abcd') WITH (VALIDATE = true);
124-
contains:Could not resolve hostname abcd
124+
contains:failed to lookup address information: Name or service not known
125125

126126
! ALTER CONNECTION testdrive_no_reset_connections.public.ssh RESET (HOST);
127127
contains:HOST option is required

test/source-sink-errors/mzcompose.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,7 @@ def schema() -> str:
4545
class Disruption(Protocol):
4646
name: str
4747

48-
def run_test(self, c: Composition) -> None:
49-
...
48+
def run_test(self, c: Composition) -> None: ...
5049

5150

5251
@dataclass
@@ -182,8 +181,13 @@ def populate(self, c: Composition) -> None:
182181
$ kafka-ingest topic=source-topic format=bytes
183182
ABC
184183
184+
# Specify a faster metadata refresh interval so errors are detected every second
185+
# instead of every minute
185186
> CREATE SOURCE source1
186-
FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-source-topic-${testdrive.seed}')
187+
FROM KAFKA CONNECTION kafka_conn (
188+
TOPIC 'testdrive-source-topic-${testdrive.seed}',
189+
TOPIC METADATA REFRESH INTERVAL '1s'
190+
)
187191
FORMAT BYTES
188192
ENVELOPE NONE
189193
# WITH ( REMOTE 'clusterd:2100' ) https://github.com/MaterializeInc/materialize/issues/16582
@@ -466,7 +470,7 @@ def assert_recovery(self, c: Composition) -> None:
466470
"redpanda", "rpk", "topic", "delete", f"testdrive-source-topic-{seed}"
467471
),
468472
expected_error="UnknownTopicOrPartition|topic",
469-
fixage=None
473+
fixage=None,
470474
# Re-creating the topic does not restart the source
471475
# fixage=lambda c,seed: redpanda_topics(c, "create", seed),
472476
),

test/ssh-connection/kafka-source-after-ssh-failure-restart-replica.td

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414

1515
> SELECT status FROM mz_internal.mz_source_statuses st
1616
JOIN mz_sources s ON st.id = s.id
17-
WHERE error LIKE 'ssh:%'
1817
AND s.name in ('dynamic_text', 'fixed_text', 'fixed_plus_csr', 'dynamic_plus_csr')
1918
stalled
2019
stalled

test/ssh-connection/pg-source-after-ssh-failure.td

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,5 @@
1111

1212
> SELECT status FROM mz_internal.mz_source_statuses st
1313
JOIN mz_sources s ON st.id = s.id
14-
WHERE s.name = 'mz_source' AND error LIKE 'ssh:%'
14+
WHERE s.name = 'mz_source'
1515
stalled

test/testdrive/connection-create-external.td

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ ALTER SYSTEM SET enable_connection_validation_syntax = true
1818
ALTER SYSTEM SET storage_enforce_external_addresses = true
1919

2020
! CREATE CONNECTION testconn TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT) WITH (VALIDATE = true)
21-
contains:address is not global
21+
contains:Failed to resolve hostname
2222

2323
# Setup kafka topic with schema
2424
# must be a subset of the keys in the rows

0 commit comments

Comments
 (0)