Skip to content

Commit 540d0e1

Browse files
authored
Merge pull request #15 from njasm/add_gha_runner_collector
Add github actions runner collector
2 parents ab16068 + a612b79 commit 540d0e1

File tree

7 files changed

+256
-2
lines changed

7 files changed

+256
-2
lines changed

Cargo.lock

Lines changed: 11 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,4 @@ futures = "0.3"
1818
anyhow = "1.0"
1919
log = "0.4"
2020
env_logger = { version = "0.8", features = ["termcolor", "humantime"] }
21+
parse_link_header = "0.2"

src/collectors/github_rate_limit.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ impl ProductMetrics {
184184
let gauge = |name, help| -> IntGauge {
185185
IntGauge::with_opts(
186186
Opts::new(name, help)
187-
.namespace("monitorbot_github_rate_limit")
187+
.namespace("github_rate_limit")
188188
.const_label("username", user)
189189
.const_label("product", product),
190190
)

src/collectors/github_runners.rs

Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
use super::default_headers;
2+
use crate::Config;
3+
use anyhow::{Context, Result};
4+
use log::{debug, error};
5+
use prometheus::core::AtomicI64;
6+
use prometheus::core::{Desc, GenericGauge};
7+
use prometheus::proto::MetricFamily;
8+
use prometheus::{core::Collector, IntGauge, Opts};
9+
use reqwest::header::{HeaderValue, LINK};
10+
use reqwest::{Client, Response};
11+
use std::collections::HashMap;
12+
use std::sync::{Arc, RwLock};
13+
use tokio::time::Duration;
14+
15+
const GH_RUNNERS_ENDPOINT: &str =
16+
"https://github.com/api/repos/{owner_repo}/actions/runners?per_page=100";
17+
18+
#[derive(Debug, serde::Deserialize)]
19+
struct ApiResponse {
20+
total_count: usize,
21+
runners: Vec<Runner>,
22+
}
23+
24+
#[derive(Debug, serde::Deserialize)]
25+
struct Runner {
26+
id: usize,
27+
name: String,
28+
os: String,
29+
status: String,
30+
busy: bool,
31+
}
32+
33+
#[derive(Clone)]
34+
pub struct GithubRunners {
35+
//api token to use
36+
token: String,
37+
// repos to track gha runners
38+
repos: Vec<String>,
39+
// actual metrics
40+
metrics: Arc<RwLock<Vec<IntGauge>>>,
41+
// default metric description
42+
desc: Desc,
43+
http: Client,
44+
}
45+
46+
impl GithubRunners {
47+
pub async fn new(config: &Config, http: Client) -> Result<Self> {
48+
let token = config.github_token.to_string();
49+
let repos: Vec<String> = config
50+
.gha_runners_repos
51+
.split(',')
52+
.map(|v| v.trim().to_string())
53+
.collect();
54+
55+
let rv = Self {
56+
token,
57+
repos,
58+
http,
59+
metrics: Arc::new(RwLock::new(Vec::new())),
60+
desc: Desc::new(
61+
String::from("gha_runner"),
62+
String::from("GHA runner's status"),
63+
Vec::new(),
64+
HashMap::new(),
65+
)
66+
.unwrap(),
67+
};
68+
69+
let refresh_rate = config.gha_runners_cache_refresh;
70+
let mut rv2 = rv.clone();
71+
tokio::spawn(async move {
72+
loop {
73+
if let Err(e) = rv2.update_stats().await {
74+
error!("{:#?}", e);
75+
}
76+
77+
tokio::time::delay_for(Duration::from_secs(refresh_rate)).await;
78+
}
79+
});
80+
81+
Ok(rv)
82+
}
83+
84+
async fn update_stats(&mut self) -> Result<()> {
85+
let mut gauges = Vec::with_capacity(self.repos.len() * 2);
86+
for repo in self.repos.iter() {
87+
let mut url: Option<String> = String::from(GH_RUNNERS_ENDPOINT)
88+
.replace("{owner_repo}", repo)
89+
.into();
90+
91+
debug!("Updating runner's stats");
92+
93+
while let Some(endpoint) = url.take() {
94+
let response = self
95+
.http
96+
.get(&endpoint)
97+
.headers(default_headers(&self.token))
98+
.send()
99+
.await?;
100+
101+
url = guard_rate_limited(&response)?
102+
.error_for_status_ref()
103+
.map(|res| next_uri(res.headers().get(LINK)))?;
104+
105+
let resp = response.json::<ApiResponse>().await?;
106+
107+
for runner in resp.runners.iter() {
108+
let online = metric_factory(
109+
"online",
110+
"runner is online",
111+
&self.desc.fq_name,
112+
&repo,
113+
&runner.name,
114+
);
115+
online.set(if runner.status == "online" { 1 } else { 0 });
116+
gauges.push(online);
117+
118+
let busy = metric_factory(
119+
"busy",
120+
"runner is busy",
121+
&self.desc.fq_name,
122+
&repo,
123+
&runner.name,
124+
);
125+
busy.set(if runner.busy { 1 } else { 0 });
126+
gauges.push(busy);
127+
}
128+
}
129+
}
130+
131+
// lock and replace old data
132+
let mut guard = self.metrics.write().unwrap();
133+
*guard = gauges;
134+
135+
Ok(())
136+
}
137+
}
138+
139+
impl Collector for GithubRunners {
140+
fn desc(&self) -> Vec<&Desc> {
141+
vec![&self.desc]
142+
}
143+
144+
fn collect(&self) -> Vec<MetricFamily> {
145+
self.metrics.read().map_or_else(
146+
|e| {
147+
error!("Unable to collect: {:#?}", e);
148+
Vec::with_capacity(0)
149+
},
150+
|guard| {
151+
guard.iter().fold(Vec::new(), |mut acc, item| {
152+
acc.extend(item.collect());
153+
acc
154+
})
155+
},
156+
)
157+
}
158+
}
159+
160+
fn guard_rate_limited(response: &Response) -> Result<&Response> {
161+
let rate_limited = match response.headers().get("x-ratelimit-remaining") {
162+
Some(rl) => rl.to_str()?.parse::<usize>()? == 0,
163+
None => unreachable!(),
164+
};
165+
166+
if rate_limited {
167+
return response
168+
.error_for_status_ref()
169+
.context("We've hit the rate limit");
170+
}
171+
172+
Ok(response)
173+
}
174+
175+
fn next_uri(header: Option<&HeaderValue>) -> Option<String> {
176+
if let Some(header) = header {
177+
return match header.to_str() {
178+
Ok(header_str) => match parse_link_header::parse(header_str) {
179+
Ok(links) => links
180+
.get(&Some("next".to_string()))
181+
.map(|next| next.uri.to_string()),
182+
_ => None,
183+
},
184+
_ => None,
185+
};
186+
}
187+
188+
None
189+
}
190+
191+
fn metric_factory<S: Into<String>>(
192+
name: S,
193+
help: S,
194+
ns: S,
195+
repo: S,
196+
runner: S,
197+
) -> GenericGauge<AtomicI64> {
198+
IntGauge::with_opts(
199+
Opts::new(name, help)
200+
.namespace(ns)
201+
.const_label("repo", repo)
202+
.const_label("runner", runner),
203+
)
204+
.unwrap()
205+
}

src/collectors/mod.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,43 @@
11
mod github_rate_limit;
2+
mod github_runners;
23

34
pub use crate::collectors::github_rate_limit::GitHubRateLimit;
5+
pub use crate::collectors::github_runners::GithubRunners;
46

57
use crate::MetricProvider;
68
use anyhow::{Error, Result};
79
use futures::TryFutureExt;
810
use log::info;
11+
use reqwest::header::{HeaderMap, ACCEPT, AUTHORIZATION};
12+
use reqwest::ClientBuilder;
913

1014
// register collectors for metrics gathering
1115
pub async fn register_collectors(p: &MetricProvider) -> Result<(), Error> {
16+
let http = ClientBuilder::new()
17+
.user_agent("https://github.com/rust-lang/monitorbot ([email protected])")
18+
.build()?;
19+
1220
GitHubRateLimit::new(&p.config)
1321
.and_then(|rl| async {
1422
info!("Registering GitHubRateLimit collector");
1523
p.register_collector(rl)
1624
})
25+
.await?;
26+
27+
GithubRunners::new(&p.config, http)
28+
.and_then(|gr| async {
29+
info!("Registering GitHubActionsRunners collector");
30+
p.register_collector(gr)
31+
})
1732
.await
1833
}
34+
35+
fn default_headers(token: &str) -> HeaderMap {
36+
let mut headers = HeaderMap::new();
37+
headers.insert(
38+
AUTHORIZATION,
39+
format!("{} {}", "token", token).parse().unwrap(),
40+
);
41+
headers.insert(ACCEPT, "application/vnd.github.v3+json".parse().unwrap());
42+
headers
43+
}

src/config.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,14 @@ pub struct Config {
1414
pub gh_rate_limit_tokens: String,
1515
// github rate limit stats data cache refresh rate frequency (in seconds)
1616
pub gh_rate_limit_stats_cache_refresh: u64,
17+
// github api token to be used when querying for gha runner's status
18+
// note: token must have (repo scope) authorization
19+
pub github_token: String,
20+
// gh runner's repos to track they status. multiple repos are allowed
21+
// ex. "rust,cargo,docs.rs"
22+
pub gha_runners_repos: String,
23+
// gha runner's status refresh rate frequency (in seconds)
24+
pub gha_runners_cache_refresh: u64,
1725
}
1826

1927
impl Config {
@@ -23,6 +31,9 @@ impl Config {
2331
port: default_env("PORT", 3001)?,
2432
gh_rate_limit_tokens: require_env("RATE_LIMIT_TOKENS")?,
2533
gh_rate_limit_stats_cache_refresh: default_env("GH_RATE_LIMIT_STATS_REFRESH", 120)?,
34+
github_token: require_env("GITHUB_TOKEN")?,
35+
gha_runners_repos: require_env("RUNNERS_REPOS")?,
36+
gha_runners_cache_refresh: default_env("GHA_RUNNERS_REFRESH", 120)?,
2637
})
2738
}
2839
}

src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ pub struct MetricProvider {
2525

2626
impl MetricProvider {
2727
pub fn new(config: Config) -> Self {
28-
let register = Registry::new_custom(None, None).expect("Unable to build Registry");
28+
let register = Registry::new_custom(Some("monitorbot".to_string()), None)
29+
.expect("Unable to build Registry");
2930
Self { register, config }
3031
}
3132

0 commit comments

Comments
 (0)