Skip to content
This repository was archived by the owner on Nov 4, 2021. It is now read-only.

Support Postgres environment variables for SSL configuration #445

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 48 additions & 38 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,44 +40,54 @@ Each one does a single thing. They are listed in the table below, in order of pr

There's a multitude of settings you can use to control the plugin server. Use them as environment variables.

| Name | Description | Default value |
| ----------------------------- | ----------------------------------------------------------------- | ------------------------------------- |
| DATABASE_URL | Postgres database URL | `'postgres://localhost:5432/posthog'` |
| REDIS_URL | Redis store URL | `'redis://localhost'` |
| BASE_DIR | base path for resolving local plugins | `'.'` |
| WORKER_CONCURRENCY | number of concurrent worker threads | `0` – all cores |
| TASKS_PER_WORKER | number of parallel tasks per worker thread | `10` |
| REDIS_POOL_MIN_SIZE | minimum number of Redis connections to use per thread | `1` |
| REDIS_POOL_MAX_SIZE | maximum number of Redis connections to use per thread | `3` |
| SCHEDULE_LOCK_TTL | how many seconds to hold the lock for the schedule | `60` |
| CELERY_DEFAULT_QUEUE | Celery outgoing queue | `'celery'` |
| PLUGINS_CELERY_QUEUE | Celery incoming queue | `'posthog-plugins'` |
| PLUGINS_RELOAD_PUBSUB_CHANNEL | Redis channel for reload events | `'reload-plugins'` |
| CLICKHOUSE_HOST | ClickHouse host | `'localhost'` |
| CLICKHOUSE_DATABASE | ClickHouse database | `'default'` |
| CLICKHOUSE_USER | ClickHouse username | `'default'` |
| CLICKHOUSE_PASSWORD | ClickHouse password | `null` |
| CLICKHOUSE_CA | ClickHouse CA certs | `null` |
| CLICKHOUSE_SECURE | whether to secure ClickHouse connection | `false` |
| KAFKA_ENABLED | use Kafka instead of Celery to ingest events | `false` |
| KAFKA_HOSTS | comma-delimited Kafka hosts | `null` |
| KAFKA_CONSUMPTION_TOPIC | Kafka incoming events topic | `'events_plugin_ingestion'` |
| KAFKA_CLIENT_CERT_B64 | Kafka certificate in Base64 | `null` |
| KAFKA_CLIENT_CERT_KEY_B64 | Kafka certificate key in Base64 | `null` |
| KAFKA_TRUSTED_CERT_B64 | Kafka trusted CA in Base64 | `null` |
| KAFKA_PRODUCER_MAX_QUEUE_SIZE | Kafka producer batch max size before flushing | `20` |
| KAFKA_FLUSH_FREQUENCY_MS | Kafka producer batch max duration before flushing | `500` |
| KAFKA_MAX_MESSAGE_BATCH_SIZE | Kafka producer batch max size in bytes before flushing | `900000` |
| LOG_LEVEL | minimum log level | `'info'` |
| SENTRY_DSN | Sentry ingestion URL | `null` |
| STATSD_HOST | StatsD host - integration disabled if this is not provided | `null` |
| STATSD_PORT | StatsD port | `8125` |
| STATSD_PREFIX | StatsD prefix | `'plugin-server.'` |
| DISABLE_MMDB | whether to disable MMDB IP location capabilities | `false` |
| INTERNAL_MMDB_SERVER_PORT | port of the internal server used for IP location (0 means random) | `0` |
| DISTINCT_ID_LRU_SIZE | size of persons distinct ID LRU cache | `10000` |
| PLUGIN_SERVER_IDLE | whether to disengage the plugin server, e.g. for development | `false` |
| CAPTURE_INTERNAL_METRICS | whether to capture internal metrics for posthog in posthog | `false` |
| Name | Description | Default value |
| ------------------------------------ | ----------------------------------------------------------------- | ------------------------------------- |
| DATABASE_URL | Postgres database URL | `'postgres://localhost:5432/posthog'` |
| REDIS_URL | Redis store URL | `'redis://localhost'` |
| BASE_DIR | base path for resolving local plugins | `'.'` |
| WORKER_CONCURRENCY | number of concurrent worker threads | `0` – all cores |
| TASKS_PER_WORKER | number of parallel tasks per worker thread | `10` |
| REDIS_POOL_MIN_SIZE | minimum number of Redis connections to use per thread | `1` |
| REDIS_POOL_MAX_SIZE | maximum number of Redis connections to use per thread | `3` |
| SCHEDULE_LOCK_TTL | how many seconds to hold the lock for the schedule | `60` |
| CELERY_DEFAULT_QUEUE | Celery outgoing queue | `'celery'` |
| PLUGINS_CELERY_QUEUE | Celery incoming queue | `'posthog-plugins'` |
| PLUGINS_RELOAD_PUBSUB_CHANNEL | Redis channel for reload events | `'reload-plugins'` |
| POSTHOG_DB_NAME | Postgres database name | `null` |
| POSTHOG_DB_USER | Postgres database user | `'postgres'` |
| POSTHOG_DB_PASSWORD | Postgres database password | `''` |
| POSTHOG_POSTGRES_HOST | Postgres database host | `'localhost'` |
| POSTHOG_POSTGRES_PORT | Postgres database port | `5432` |
| POSTHOG_POSTGRES_SSL_MODE | Postgres database SSL mode | `'disable'` |
| POSTHOG_POSTGRES_REJECT_UNAUTHORIZED | Reject the connection if the SSL cert. cannot be validated by CA | `true` |
| POSTHOG_POSTGRES_CLI_SSL_CA | Absolute path to the CA certificate to use for Postgres | `null` |
| POSTHOG_POSTGRES_CLI_SSL_CRT | Absolute path to the certificate to use for Postgres client auth. | `null` |
| POSTHOG_POSTGRES_CLI_SSL_KEY | Absolute path to the private key to use for Postgres client auth. | `null` |
| CLICKHOUSE_HOST | ClickHouse host | `'localhost'` |
| CLICKHOUSE_DATABASE | ClickHouse database | `'default'` |
| CLICKHOUSE_USER | ClickHouse username | `'default'` |
| CLICKHOUSE_PASSWORD | ClickHouse password | `null` |
| CLICKHOUSE_CA | ClickHouse CA certs | `null` |
| CLICKHOUSE_SECURE | whether to secure ClickHouse connection | `false` |
| KAFKA_ENABLED | use Kafka instead of Celery to ingest events | `false` |
| KAFKA_HOSTS | comma-delimited Kafka hosts | `null` |
| KAFKA_CONSUMPTION_TOPIC | Kafka incoming events topic | `'events_plugin_ingestion'` |
| KAFKA_CLIENT_CERT_B64 | Kafka certificate in Base64 | `null` |
| KAFKA_CLIENT_CERT_KEY_B64 | Kafka certificate key in Base64 | `null` |
| KAFKA_TRUSTED_CERT_B64 | Kafka trusted CA in Base64 | `null` |
| KAFKA_PRODUCER_MAX_QUEUE_SIZE | Kafka producer batch max size before flushing | `20` |
| KAFKA_FLUSH_FREQUENCY_MS | Kafka producer batch max duration before flushing | `500` |
| KAFKA_MAX_MESSAGE_BATCH_SIZE | Kafka producer batch max size in bytes before flushing | `900000` |
| LOG_LEVEL | minimum log level | `'info'` |
| SENTRY_DSN | Sentry ingestion URL | `null` |
| STATSD_HOST | StatsD host - integration disabled if this is not provided | `null` |
| STATSD_PORT | StatsD port | `8125` |
| STATSD_PREFIX | StatsD prefix | `'plugin-server.'` |
| DISABLE_MMDB | whether to disable MMDB IP location capabilities | `false` |
| INTERNAL_MMDB_SERVER_PORT | port of the internal server used for IP location (0 means random) | `0` |
| DISTINCT_ID_LRU_SIZE | size of persons distinct ID LRU cache | `10000` |
| PLUGIN_SERVER_IDLE | whether to disengage the plugin server, e.g. for development | `false` |
| CAPTURE_INTERNAL_METRICS | whether to capture internal metrics for posthog in posthog | `false` |

## Releasing a new version

Expand Down
4 changes: 2 additions & 2 deletions benchmarks/postgres/ingestion.benchmark.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ describe('ingestion benchmarks', () => {
'my_id',
'127.0.0.1',
'http://localhost',
{
({
event: 'default event',
timestamp: now.toISO(),
properties: { token: team.api_token },
} as any as PluginEvent,
} as any) as PluginEvent,
team.id,
now,
now,
Expand Down
7 changes: 6 additions & 1 deletion src/config/config.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import os from 'os'

import { LogLevel, PluginsServerConfig } from '../types'
import { LogLevel, PluginsServerConfig, PostgresSSLMode } from '../types'
import { determineNodeEnv, NodeEnv, stringToBoolean } from '../utils/utils'
import { KAFKA_EVENTS_PLUGIN_INGESTION } from './kafka-topics'

Expand All @@ -24,6 +24,11 @@ export function getDefaultConfig(): PluginsServerConfig {
POSTHOG_DB_PASSWORD: '',
POSTHOG_POSTGRES_HOST: 'localhost',
POSTHOG_POSTGRES_PORT: 5432,
POSTHOG_POSTGRES_SSL_MODE: PostgresSSLMode.Disable,
POSTHOG_POSTGRES_REJECT_UNAUTHORIZED: true,
POSTHOG_POSTGRES_CLI_SSL_CA: null,
POSTHOG_POSTGRES_CLI_SSL_CRT: null,
POSTHOG_POSTGRES_CLI_SSL_KEY: null,
CLICKHOUSE_HOST: 'localhost',
CLICKHOUSE_DATABASE: isTestEnv ? 'posthog_test' : 'default',
CLICKHOUSE_USER: 'default',
Expand Down
2 changes: 1 addition & 1 deletion src/main/job-queues/concurrent/graphile-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ export class GraphileQueue extends JobQueueBase {
this.consumerPool = await this.createPool()
this.runner = await run({
// graphile's types refer to a local node_modules version of Pool
pgPool: this.consumerPool as Pool as any,
pgPool: (this.consumerPool as Pool) as any,
schema: this.serverConfig.JOB_QUEUE_GRAPHILE_SCHEMA,
noPreparedStatements: !this.serverConfig.JOB_QUEUE_GRAPHILE_PREPARED_STATEMENTS,
concurrency: 1,
Expand Down
14 changes: 8 additions & 6 deletions src/main/job-queues/job-queue-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ export class JobQueueManager implements JobQueue {
.map((q) => q.trim() as JobQueueType)
.filter((q) => !!q)

this.jobQueues = this.jobQueueTypes.map((queue): JobQueue => {
if (jobQueueMap[queue]) {
return jobQueueMap[queue].getQueue(pluginsServer)
} else {
throw new Error(`Unknown job queue "${queue}"`)
this.jobQueues = this.jobQueueTypes.map(
(queue): JobQueue => {
if (jobQueueMap[queue]) {
return jobQueueMap[queue].getQueue(pluginsServer)
} else {
throw new Error(`Unknown job queue "${queue}"`)
}
}
})
)
}

async connectProducer(): Promise<void> {
Expand Down
14 changes: 14 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@ import { ActionMatcher } from './worker/ingestion/action-matcher'
import { EventsProcessor } from './worker/ingestion/process-event'
import { LazyPluginVM } from './worker/vm/lazy'

export enum PostgresSSLMode {
Disable = 'disable',
Allow = 'allow',
Prefer = 'prefer',
Require = 'require',
VerifyCA = 'verify-ca',
VerifyFull = 'verify-full',
}

export enum LogLevel {
None = 'none',
Debug = 'debug',
Expand Down Expand Up @@ -47,6 +56,11 @@ export interface PluginsServerConfig extends Record<string, any> {
POSTHOG_DB_PASSWORD: string
POSTHOG_POSTGRES_HOST: string
POSTHOG_POSTGRES_PORT: number
POSTHOG_POSTGRES_SSL_MODE: PostgresSSLMode
POSTHOG_POSTGRES_REJECT_UNAUTHORIZED: boolean
POSTHOG_POSTGRES_CLI_SSL_CA: string | null
POSTHOG_POSTGRES_CLI_SSL_CRT: string | null
POSTHOG_POSTGRES_CLI_SSL_KEY: string | null
CLICKHOUSE_HOST: string
CLICKHOUSE_DATABASE: string
CLICKHOUSE_USER: string
Expand Down
11 changes: 4 additions & 7 deletions src/utils/db/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -264,10 +264,8 @@ export class DB {
return rest
}) as ClickHousePerson[]
} else if (database === Database.Postgres) {
return (
(await this.postgresQuery('SELECT * FROM posthog_person', undefined, 'fetchPersons'))
.rows as RawPerson[]
).map(
return ((await this.postgresQuery('SELECT * FROM posthog_person', undefined, 'fetchPersons'))
.rows as RawPerson[]).map(
(rawPerson: RawPerson) =>
({
...rawPerson,
Expand Down Expand Up @@ -621,9 +619,8 @@ export class DB {

public async fetchSessionRecordingEvents(): Promise<PostgresSessionRecordingEvent[] | SessionRecordingEvent[]> {
if (this.kafkaProducer) {
const events = (
(await this.clickhouseQuery(`SELECT * FROM session_recording_events`)).data as SessionRecordingEvent[]
).map((event) => {
const events = ((await this.clickhouseQuery(`SELECT * FROM session_recording_events`))
.data as SessionRecordingEvent[]).map((event) => {
return {
...event,
snapshot_data: event.snapshot_data ? JSON.parse(event.snapshot_data) : null,
Expand Down
Loading