Skip to content

Commit 63d1e3d

Browse files
Fix airflow_db_cleanup script (#7593)
## Description Fixes b/223364117 The DAG is now able to perform cleanup operation on airflow databases with larger data in a single table. More context : http://docs/document/d/1eWydWjGXqqQ4lvACOMs5o7qQtnuuyo_0jzwWXJ3_YfY?resourcekey=0-TBb9afCO5cWD5hhW_sRZgQ Note: It's a good idea to open an issue first for discussion. ## Checklist - [x] I have followed [Sample Guidelines from AUTHORING_GUIDE.MD](https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/AUTHORING_GUIDE.md) - [ ] README is updated to include [all relevant information](https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/AUTHORING_GUIDE.md#readme-file) - [ ] **Tests** pass: `nox -s py-3.6` (see [Test Environment Setup](https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/AUTHORING_GUIDE.md#test-environment-setup)) - [x] **Lint** pass: `nox -s lint` (see [Test Environment Setup](https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/AUTHORING_GUIDE.md#test-environment-setup)) - [ ] These samples need a new **API enabled** in testing projects to pass (let us know which ones) - [ ] These samples need a new/updated **env vars** in testing projects set to pass (let us know which ones) - [x] Please **merge** this PR for me once it is approved. - [ ] This sample adds a new sample directory, and I updated the [CODEOWNERS file](https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/.github/CODEOWNERS) with the codeowners for this sample
1 parent 558844a commit 63d1e3d

File tree

1 file changed

+15
-7
lines changed

1 file changed

+15
-7
lines changed

composer/airflow_1_samples/airflow_db_cleanup.py

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,8 @@
186186
"age_check_column": ImportError.timestamp,
187187
"keep_last": False,
188188
"keep_last_filters": None,
189-
"keep_last_group_by": None
189+
"keep_last_group_by": None,
190+
"do_not_delete_by_dag_id": True
190191
})
191192

192193
except Exception as e:
@@ -204,13 +205,15 @@
204205
"age_check_column": Task.date_done,
205206
"keep_last": False,
206207
"keep_last_filters": None,
207-
"keep_last_group_by": None
208+
"keep_last_group_by": None,
209+
"do_not_delete_by_dag_id": True
208210
}, {
209211
"airflow_db_model": TaskSet,
210212
"age_check_column": TaskSet.date_done,
211213
"keep_last": False,
212214
"keep_last_filters": None,
213-
"keep_last_group_by": None
215+
"keep_last_group_by": None,
216+
"do_not_delete_by_dag_id": True
214217
}))
215218

216219
except Exception as e:
@@ -358,10 +361,15 @@ def cleanup_function(**context):
358361

359362
if ENABLE_DELETE:
360363
logging.info("Performing Delete...")
361-
# using bulk delete
362-
query.delete(synchronize_session=False)
363-
session.commit()
364-
logging.info("Finished Performing Delete")
364+
if context["params"].get("do_not_delete_by_dag_id"):
365+
query.filter(age_check_column <= max_date).delete(synchronize_session=False)
366+
session.commit()
367+
else:
368+
dags = session.query(airflow_db_model.dag_id).distinct()
369+
list_dags = [str(list(dag)[0]) for dag in dags]
370+
for dag in list_dags:
371+
query.filter(age_check_column <= max_date).filter(airflow_db_model.dag_id == dag).delete(synchronize_session=False)
372+
session.commit()
365373
else:
366374
logging.warn("You've opted to skip deleting the db entries. "
367375
"Set ENABLE_DELETE to True to delete entries!!!")

0 commit comments

Comments
 (0)