Skip to content

test: SOCKS5 in CI and docker compose files #135

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ jobs:
- --kafka-addr redpanda-2:9092
- --rpc-addr redpanda-2:33145
- --seeds redpanda-0:33145
- image: serjs/go-socks5-proxy
name: proxy
resource_class: xlarge # use of a smaller executor tends crashes on link
environment:
# Disable incremental compilation to avoid overhead. We are not preserving these files anyway.
Expand All @@ -196,7 +198,8 @@ jobs:
TEST_JAVA_INTEROPT: 1
# Don't use the first node here since this is likely the controller and we want to ensure that we automatically
# pick the controller for certain actions (e.g. topic creation) and don't just get lucky.
KAFKA_CONNECT: "redpanda-1:9092"
KAFKA_CONNECT: "invalid:9092,redpanda-1:9092"
SOCKS_PROXY: "proxy:1080"
steps:
- checkout
- rust_components
Expand Down Expand Up @@ -247,6 +250,8 @@ jobs:
- KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka-2:9092,EXTERNAL://kafka-2:9093
- KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
- image: serjs/go-socks5-proxy
name: proxy
resource_class: xlarge # use of a smaller executor tends crashes on link
environment:
# Disable incremental compilation to avoid overhead. We are not preserving these files anyway.
Expand All @@ -265,7 +270,8 @@ jobs:
TEST_JAVA_INTEROPT: 1
# Don't use the first node here since this is likely the controller and we want to ensure that we automatically
# pick the controller for certain actions (e.g. topic creation) and don't just get lucky.
KAFKA_CONNECT: "kafka-1:9093"
KAFKA_CONNECT: "invalid:9093,kafka-1:9093"
SOCKS_PROXY: "proxy:1080"
steps:
- checkout
- rust_components
Expand Down
20 changes: 16 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ $ docker-compose -f docker-compose-redpanda.yml up
in one session, and then run:

```console
$ TEST_INTEGRATION=1 KAFKA_CONNECT=0.0.0.0:9093 cargo test
$ TEST_INTEGRATION=1 KAFKA_CONNECT=0.0.0.0:9011 cargo test
```

in another session.
Expand All @@ -129,12 +129,24 @@ $ docker-compose -f docker-compose-kafka.yml up
in one session, and then run:

```console
$ TEST_INTEGRATION=1 TEST_DELETE_RECORDS=1 KAFKA_CONNECT=localhost:9094 cargo test
$ TEST_INTEGRATION=1 TEST_DELETE_RECORDS=1 KAFKA_CONNECT=localhost:9011 cargo test
```

in another session. Note that Apache Kafka supports a different set of features then redpanda, so we pass other
environment variables.

### Using a SOCKS5 Proxy

To run the integration test via a SOCKS5 proxy, you need to set the environment variable `SOCKS_PROXY`. The following
command requires a running proxy on the local machine.

```console
$ KAFKA_CONNECT=0.0.0.0:9011,kafka-1:9021,redpanda-1:9021 SOCKS_PROXY=localhost:1080 cargo test --features full
```

The SOCKS5 proxy will automatically be started by the docker compose files. Note that `KAFKA_CONNECT` was extended by
addresses that are reachable via the proxy.

### Java Interopt
To test if RSKafka can produce/consume records to/from the official Java client, you need to have Java installed and the
`TEST_JAVA_INTEROPT=1` environment variable set.
Expand Down Expand Up @@ -217,14 +229,14 @@ execution that hooks right into the place where it is about to exit:
Install [cargo-criterion], make sure you have some Kafka cluster running, and then you can run all benchmarks with:

```console
$ TEST_INTEGRATION=1 KAFKA_CONNECT=localhost:9093 cargo criterion --all-features
$ TEST_INTEGRATION=1 KAFKA_CONNECT=localhost:9011 cargo criterion --all-features
```

If you find a benchmark that is too slow, you can may want to profile it. Get [cargo-with], and [perf], then run (here
for the `parallel/rskafka` benchmark):

```console
$ TEST_INTEGRATION=1 KAFKA_CONNECT=localhost:9093 cargo with 'perf record --call-graph dwarf -- {bin}' -- \
$ TEST_INTEGRATION=1 KAFKA_CONNECT=localhost:9011 cargo with 'perf record --call-graph dwarf -- {bin}' -- \
bench --all-features --bench write_throughput -- \
--bench --noplot parallel/rskafka
```
Expand Down
14 changes: 9 additions & 5 deletions benches/write_throughput.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,11 @@ macro_rules! maybe_skip_kafka_integration {
env::var("TEST_INTEGRATION").is_ok(),
env::var("KAFKA_CONNECT").ok(),
) {
(true, Some(kafka_connection)) => kafka_connection,
(true, Some(kafka_connection)) => {
let kafka_connection: Vec<String> =
kafka_connection.split(",").map(|s| s.to_owned()).collect();
kafka_connection
}
(true, None) => {
panic!(
"TEST_INTEGRATION is set which requires running integration tests, but \
Expand Down Expand Up @@ -227,7 +231,7 @@ fn random_topic_name() -> String {
format!("test_topic_{}", uuid::Uuid::new_v4())
}

async fn setup_rdkafka(connection: String, buffering: bool) -> (FutureProducer, String) {
async fn setup_rdkafka(connection: Vec<String>, buffering: bool) -> (FutureProducer, String) {
use rdkafka::{
admin::{AdminClient, AdminOptions, NewTopic, TopicReplication},
producer::FutureRecord,
Expand All @@ -239,7 +243,7 @@ async fn setup_rdkafka(connection: String, buffering: bool) -> (FutureProducer,

// configure clients
let mut cfg = ClientConfig::new();
cfg.set("bootstrap.servers", connection);
cfg.set("bootstrap.servers", connection.join(","));
cfg.set("message.timeout.ms", "5000");
if buffering {
cfg.set("batch.num.messages", PARALLEL_BATCH_SIZE.to_string()); // = loads
Expand Down Expand Up @@ -273,10 +277,10 @@ async fn setup_rdkafka(connection: String, buffering: bool) -> (FutureProducer,
(producer_client, topic_name)
}

async fn setup_rskafka(connection: String) -> PartitionClient {
async fn setup_rskafka(connection: Vec<String>) -> PartitionClient {
let topic_name = random_topic_name();

let client = ClientBuilder::new(vec![connection]).build().await.unwrap();
let client = ClientBuilder::new(connection).build().await.unwrap();
client
.controller_client()
.unwrap()
Expand Down
34 changes: 21 additions & 13 deletions docker-compose-kafka.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version: "2"
version: "3"

services:
zookeeper:
Expand All @@ -12,14 +12,14 @@ services:
kafka-0:
image: docker.io/bitnami/kafka:3
ports:
- "9093:9093"
- "9010:9010"
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_CFG_BROKER_ID=0
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka-0:9092,EXTERNAL://localhost:9093
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT,FOR_PROXY:PLAINTEXT
- KAFKA_CFG_LISTENERS=CLIENT://:9000,EXTERNAL://:9010,FOR_PROXY://:9020
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka-0:9000,EXTERNAL://localhost:9010,FOR_PROXY://kafka-0:9020
- KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
volumes:
- kafka_0_data:/bitnami/kafka
Expand All @@ -28,14 +28,14 @@ services:
kafka-1:
image: docker.io/bitnami/kafka:3
ports:
- "9094:9094"
- "9011:9011"
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_CFG_BROKER_ID=1
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9094
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka-1:9092,EXTERNAL://localhost:9094
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT,FOR_PROXY:PLAINTEXT
- KAFKA_CFG_LISTENERS=CLIENT://:9000,EXTERNAL://:9011,FOR_PROXY://:9021
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka-1:9000,EXTERNAL://localhost:9011,FOR_PROXY://kafka-1:9021
- KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
volumes:
- kafka_1_data:/bitnami/kafka
Expand All @@ -44,19 +44,27 @@ services:
kafka-2:
image: docker.io/bitnami/kafka:3
ports:
- "9095:9095"
- "9012:9012"
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_CFG_BROKER_ID=2
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9095
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka-2:9092,EXTERNAL://localhost:9095
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT,FOR_PROXY:PLAINTEXT
- KAFKA_CFG_LISTENERS=CLIENT://:9000,EXTERNAL://:9012,FOR_PROXY://:9022
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka-2:9000,EXTERNAL://localhost:9012,FOR_PROXY://kafka-2:9022
- KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
volumes:
- kafka_2_data:/bitnami/kafka
depends_on:
- zookeeper
proxy:
image: serjs/go-socks5-proxy
ports:
- "1080:1080"
depends_on:
- kafka-0
- kafka-1
- kafka-2

volumes:
zookeeper_data:
Expand Down
28 changes: 18 additions & 10 deletions docker-compose-redpanda.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ services:
image: vectorized/redpanda:v21.11.2
container_name: redpanda-0
ports:
- '9092:9092'
- '9010:9010'
command:
- redpanda
- start
Expand All @@ -14,15 +14,15 @@ services:
- --overprovisioned
- --node-id 0
- --check=false
- --kafka-addr 0.0.0.0:9092
- --advertise-kafka-addr 127.0.0.1:9092
- --kafka-addr EXTERNAL://0.0.0.0:9010,FOR_PROXY://0.0.0.0:9020
- --advertise-kafka-addr EXTERNAL://127.0.0.1:9010,FOR_PROXY://redpanda-0:9020
- --rpc-addr 0.0.0.0:33145
- --advertise-rpc-addr redpanda-0:33145
redpanda-1:
image: vectorized/redpanda:v21.11.2
container_name: redpanda-1
ports:
- '9093:9093'
- '9011:9011'
command:
- redpanda
- start
Expand All @@ -33,15 +33,15 @@ services:
- --node-id 1
- --seeds "redpanda-0:33145"
- --check=false
- --kafka-addr 0.0.0.0:9093
- --advertise-kafka-addr 127.0.0.1:9093
- --kafka-addr EXTERNAL://0.0.0.0:9011,FOR_PROXY://0.0.0.0:9021
- --advertise-kafka-addr EXTERNAL://127.0.0.1:9011,FOR_PROXY://redpanda-1:9021
- --rpc-addr 0.0.0.0:33146
- --advertise-rpc-addr redpanda-1:33146
redpanda-2:
image: vectorized/redpanda:v21.11.2
container_name: redpanda-2
ports:
- '9094:9094'
- '9012:9012'
command:
- redpanda
- start
Expand All @@ -52,7 +52,15 @@ services:
- --node-id 2
- --seeds "redpanda-0:33145"
- --check=false
- --kafka-addr 0.0.0.0:9094
- --advertise-kafka-addr 127.0.0.1:9094
- --kafka-addr EXTERNAL://0.0.0.0:9012,FOR_PROXY://0.0.0.0:9022
- --advertise-kafka-addr EXTERNAL://127.0.0.1:9012,FOR_PROXY://redpanda-2:9022
- --rpc-addr 0.0.0.0:33147
- --advertise-rpc-addr redpanda-2:33147
- --advertise-rpc-addr redpanda-2:33147
proxy:
image: serjs/go-socks5-proxy
ports:
- "1080:1080"
depends_on:
- redpanda-0
- redpanda-1
- redpanda-2
31 changes: 17 additions & 14 deletions tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ async fn test_plain() {
maybe_start_logging();

let connection = maybe_skip_kafka_integration!();
ClientBuilder::new(vec![connection]).build().await.unwrap();
ClientBuilder::new(connection).build().await.unwrap();
}

#[tokio::test]
async fn test_topic_crud() {
maybe_start_logging();

let connection = maybe_skip_kafka_integration!();
let client = ClientBuilder::new(vec![connection]).build().await.unwrap();
let client = ClientBuilder::new(connection).build().await.unwrap();
let controller_client = client.controller_client().unwrap();
let topics = client.list_topics().await.unwrap();

Expand Down Expand Up @@ -109,22 +109,25 @@ async fn test_tls() {
.unwrap();

let connection = maybe_skip_kafka_integration!();
ClientBuilder::new(vec![connection])
ClientBuilder::new(connection)
.tls_config(Arc::new(config))
.build()
.await
.unwrap();
}

// Disabled as currently no SOCKS5 integration tests
#[cfg(feature = "transport-socks5")]
#[ignore]
#[tokio::test]
async fn test_socks5() {
maybe_start_logging();

let client = ClientBuilder::new(vec!["my-cluster-kafka-bootstrap:9092".to_owned()])
.socks5_proxy("localhost:1080".to_owned())
// e.g. "my-connection-kafka-bootstrap:9092"
let connection = maybe_skip_kafka_integration!();
// e.g. "localhost:1080"
let proxy = maybe_skip_SOCKS_PROXY!();

let client = ClientBuilder::new(connection)
.socks5_proxy(proxy)
.build()
.await
.unwrap();
Expand All @@ -143,7 +146,7 @@ async fn test_produce_empty() {
let topic_name = random_topic_name();
let n_partitions = 2;

let client = ClientBuilder::new(vec![connection]).build().await.unwrap();
let client = ClientBuilder::new(connection).build().await.unwrap();
let controller_client = client.controller_client().unwrap();
controller_client
.create_topic(&topic_name, n_partitions, 1, 5_000)
Expand All @@ -165,7 +168,7 @@ async fn test_consume_empty() {
let topic_name = random_topic_name();
let n_partitions = 2;

let client = ClientBuilder::new(vec![connection]).build().await.unwrap();
let client = ClientBuilder::new(connection).build().await.unwrap();
let controller_client = client.controller_client().unwrap();
controller_client
.create_topic(&topic_name, n_partitions, 1, 5_000)
Expand All @@ -189,7 +192,7 @@ async fn test_consume_offset_out_of_range() {
let topic_name = random_topic_name();
let n_partitions = 2;

let client = ClientBuilder::new(vec![connection]).build().await.unwrap();
let client = ClientBuilder::new(connection).build().await.unwrap();
let controller_client = client.controller_client().unwrap();
controller_client
.create_topic(&topic_name, n_partitions, 1, 5_000)
Expand Down Expand Up @@ -222,7 +225,7 @@ async fn test_get_offset() {
let topic_name = random_topic_name();
let n_partitions = 1;

let client = ClientBuilder::new(vec![connection.clone()])
let client = ClientBuilder::new(connection.clone())
.build()
.await
.unwrap();
Expand Down Expand Up @@ -286,7 +289,7 @@ async fn test_produce_consume_size_cutoff() {
let connection = maybe_skip_kafka_integration!();
let topic_name = random_topic_name();

let client = ClientBuilder::new(vec![connection]).build().await.unwrap();
let client = ClientBuilder::new(connection).build().await.unwrap();
let controller_client = client.controller_client().unwrap();
controller_client
.create_topic(&topic_name, 1, 1, 5_000)
Expand Down Expand Up @@ -359,7 +362,7 @@ async fn test_consume_midbatch() {
let connection = maybe_skip_kafka_integration!();
let topic_name = random_topic_name();

let client = ClientBuilder::new(vec![connection]).build().await.unwrap();
let client = ClientBuilder::new(connection).build().await.unwrap();
let controller_client = client.controller_client().unwrap();
controller_client
.create_topic(&topic_name, 1, 1, 5_000)
Expand Down Expand Up @@ -404,7 +407,7 @@ async fn test_delete_records() {
let connection = maybe_skip_kafka_integration!();
let topic_name = random_topic_name();

let client = ClientBuilder::new(vec![connection]).build().await.unwrap();
let client = ClientBuilder::new(connection).build().await.unwrap();
let controller_client = client.controller_client().unwrap();
controller_client
.create_topic(&topic_name, 1, 1, 5_000)
Expand Down
Loading