Name | k8s-audit-filter JSON |
Version |
0.3.0
JSON |
| download |
home_page | None |
Summary | A tool to filter k8s audit logs |
upload_time | 2023-04-26 13:13:17 |
maintainer | None |
docs_url | None |
author | None |
requires_python | >=3.8 |
license | None |
keywords |
audit
filter
k8s
|
VCS |
|
bugtrack_url |
|
requirements |
No requirements were recorded.
|
Travis-CI |
No Travis.
|
coveralls test coverage |
No coveralls.
|
# Filter K8s Audit Logs
## Abstract
This library provides a simple way to filter Kubernetes audit logs, if you, whit some reason, are not able to apply
audit policy directly at your cloud (e. g. in yandex cloud) and have to filter it with python script.
Also you can use this library to analyze audit logs.
The library does not provide any service, it just give you easy way to filter audit logs in your python script
with ```AuditFilter``` class interfaces.
## Instalation
```bash
pip install k8s-audit-filter
```
## Usage
You can easly modify your python script to filter audit logs.
Just import ```AuditFilter``` class, init it with your ```audit-policy.yaml``` file and use it's methods.
See an example of modification
of [this script](<https://github.com/yandex-cloud/yc-solution-library-for-security/blob/master/auditlogs/export-k8s-to-s3/terraform/function/main.py>):
```python
import json
import os
import boto3
import string
import random
from datetime import datetime
from k8s_audit_filter import AuditFilter # import AuditFilter class
def get_random_alphanumeric_string(length):
letters_and_digits = string.ascii_letters + string.digits
result_str = ''.join((random.choice(letters_and_digits) for i in range(length)))
return result_str
client = boto3.client(
service_name='s3',
endpoint_url='https://storage.yandexcloud.net',
region_name='ru-central1'
)
def handler(event, context):
for log_data in event['messages']:
full_log = []
for log_entry in log_data['details']['messages']:
kubernetes_log = json.loads(log_entry['message'])
full_log.append(json.dumps(kubernetes_log))
audit_filter = AuditFilter('path/to/audit_policy.yaml') # init AuditFilter class with path to audit policy file
filtered_log = [line for line in full_log if audit_filter.filter(full_log)] # filter audit logs
bucket_name = os.environ.get('BUCKET_NAME')
object_key = 'AUDIT/' + os.environ.get('CLUSTER_ID') + '/' + datetime.now().strftime(
'%Y-%m-%d-%H:%M:%S') + '-' + get_random_alphanumeric_string(5)
object_value = '\n'.join(filtered_log) # prepare data to load
# load data to cloud storage
client.put_object(Bucket=bucket_name, Key=object_key, Body=object_value, StorageClass='COLD')
```
Also you can update your policy dinamically, just use ```add_rule``` and ```remove_rule``` method:
```python
from k8s_audit_filter import AuditFilter
audit_filter = AuditFilter() # init AuditFilter class with blink audit policy
audit_filter.add_rule({'level': 'Metadata'})
audit_filter.filter({'level': 'Metadata'}) # return True
audit_filter.remove_rule({'level': 'Metadata'})
audit_filter.filter({'level': 'Metadata'}) # return False
```
## Describing Audit Policy
You can use find the way to describe k8s audit policy rules in Official Kubernetes Documentation at <https://kubernetes.io/docs/reference/config-api/apiserver-audit.v1/>
See example of audit policy:
```yaml
apiVersion: audit.k8s.io/v1 # This is required.
kind: Policy
rules:
# Include line in the audit log which contains verb "get" and have level "Metadata"
- level: Metadata
verbs:
- "get"
# Exclude line in the audit log which contains verb "create"
- level: None
verbs:
- "create"
- level: Request
users:
- "admin"
- level: Request
userGroups:
- "system:admins"
```
## Describing Audit Policy in Python
If you want to describe audit policy in python, you can describe it in the same way as in yaml file with dict:
```python
from k8s_audit_filter import AuditFilter
audit_filter = AuditFilter()
rules = [
{'level': 'Metadata', 'verbs': ['get']},
{'level': 'None', 'verbs': ['create']},
{'level': 'Request', 'users': ['admin']},
{'level': 'Request', 'userGroups': ['system:admins']},
{'level': 'Request', 'namespaces': ['kube-system']},
{'level': 'RequestResponse',
'resources': [
{'group': '', 'resources': ['deployments'], 'resourceNames': ['pods']},
{'group': 'apps', 'resources': ['leases'], 'resourceNames': ['test']}
]
}
]
audit_filter.add_rules(rules)
```
## Supported Rules
Please note, that ```level``` is required field for every rule, and should have of one of next values:
- ```None``` - do not log events that match this rule (this is ExcludeRule)
- ```Metadata``` - log line marked as "Metadata"
- ```Request``` - log line marked as "Request"
- ```RequestResponse``` - log line marked as "RequestResponse"
The library supports the following rules k8s audit PolicyRules:
- ```level```
- ```verbs```
- ```users```
- ```userGroups```
- ```namespaces```
- ```resources``` - Partly supported. Please read about limitations below
## Limitations
The library does not support following k8s Audit rules:
- ```resources``` - please note, that unlike original k8s audit policy, empty string "" in "group" field will not return "core" group, but will return all groups
- ```nonResourceURLs``` - notResourceURLs are not declared explicitly in audit logs
- ```omitStages``` - now it does not discern any stages
- ```omitManagedFields``` - omitManagedFields is not declared explicitly in audit logs
Raw data
{
"_id": null,
"home_page": null,
"name": "k8s-audit-filter",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.8",
"maintainer_email": null,
"keywords": "audit,filter,k8s",
"author": null,
"author_email": "Petr Ishutinr <ishutinpetrdev@gmail.com>",
"download_url": null,
"platform": null,
"description": "# Filter K8s Audit Logs\n\n## Abstract\n\nThis library provides a simple way to filter Kubernetes audit logs, if you, whit some reason, are not able to apply\naudit policy directly at your cloud (e. g. in yandex cloud) and have to filter it with python script. \nAlso you can use this library to analyze audit logs.\nThe library does not provide any service, it just give you easy way to filter audit logs in your python script\nwith ```AuditFilter``` class interfaces.\n\n## Instalation\n\n```bash\npip install k8s-audit-filter\n```\n\n## Usage\n\nYou can easly modify your python script to filter audit logs.\nJust import ```AuditFilter``` class, init it with your ```audit-policy.yaml``` file and use it's methods.\nSee an example of modification\nof [this script](<https://github.com/yandex-cloud/yc-solution-library-for-security/blob/master/auditlogs/export-k8s-to-s3/terraform/function/main.py>):\n\n```python\nimport json\n\nimport os\nimport boto3\nimport string\nimport random\n\nfrom datetime import datetime\n\nfrom k8s_audit_filter import AuditFilter # import AuditFilter class\n\n\ndef get_random_alphanumeric_string(length):\n letters_and_digits = string.ascii_letters + string.digits\n result_str = ''.join((random.choice(letters_and_digits) for i in range(length)))\n return result_str\n\n\nclient = boto3.client(\n service_name='s3',\n endpoint_url='https://storage.yandexcloud.net',\n region_name='ru-central1'\n)\n\n\ndef handler(event, context):\n for log_data in event['messages']:\n\n full_log = []\n for log_entry in log_data['details']['messages']:\n kubernetes_log = json.loads(log_entry['message'])\n full_log.append(json.dumps(kubernetes_log))\n\n audit_filter = AuditFilter('path/to/audit_policy.yaml') # init AuditFilter class with path to audit policy file\n filtered_log = [line for line in full_log if audit_filter.filter(full_log)] # filter audit logs\n\n bucket_name = os.environ.get('BUCKET_NAME')\n object_key = 'AUDIT/' + os.environ.get('CLUSTER_ID') + '/' + datetime.now().strftime(\n '%Y-%m-%d-%H:%M:%S') + '-' + get_random_alphanumeric_string(5)\n object_value = '\\n'.join(filtered_log) # prepare data to load\n # load data to cloud storage\n client.put_object(Bucket=bucket_name, Key=object_key, Body=object_value, StorageClass='COLD')\n```\n\nAlso you can update your policy dinamically, just use ```add_rule``` and ```remove_rule``` method:\n\n```python\nfrom k8s_audit_filter import AuditFilter\n\naudit_filter = AuditFilter() # init AuditFilter class with blink audit policy\naudit_filter.add_rule({'level': 'Metadata'})\naudit_filter.filter({'level': 'Metadata'}) # return True\naudit_filter.remove_rule({'level': 'Metadata'})\naudit_filter.filter({'level': 'Metadata'}) # return False\n```\n\n## Describing Audit Policy\n\nYou can use find the way to describe k8s audit policy rules in Official Kubernetes Documentation at <https://kubernetes.io/docs/reference/config-api/apiserver-audit.v1/>\n\nSee example of audit policy:\n\n```yaml\napiVersion: audit.k8s.io/v1 # This is required.\n\nkind: Policy\n\nrules:\n # Include line in the audit log which contains verb \"get\" and have level \"Metadata\"\n - level: Metadata\n verbs:\n - \"get\"\n\n # Exclude line in the audit log which contains verb \"create\"\n - level: None\n verbs:\n - \"create\"\n\n - level: Request\n users:\n - \"admin\"\n\n - level: Request\n userGroups:\n - \"system:admins\"\n\n```\n\n## Describing Audit Policy in Python\n\nIf you want to describe audit policy in python, you can describe it in the same way as in yaml file with dict:\n\n```python\nfrom k8s_audit_filter import AuditFilter\n\naudit_filter = AuditFilter()\n\nrules = [\n {'level': 'Metadata', 'verbs': ['get']},\n {'level': 'None', 'verbs': ['create']},\n {'level': 'Request', 'users': ['admin']},\n {'level': 'Request', 'userGroups': ['system:admins']},\n {'level': 'Request', 'namespaces': ['kube-system']},\n {'level': 'RequestResponse',\n 'resources': [\n {'group': '', 'resources': ['deployments'], 'resourceNames': ['pods']},\n {'group': 'apps', 'resources': ['leases'], 'resourceNames': ['test']}\n ]\n }\n]\naudit_filter.add_rules(rules)\n\n```\n\n## Supported Rules\n\nPlease note, that ```level``` is required field for every rule, and should have of one of next values:\n\n- ```None``` - do not log events that match this rule (this is ExcludeRule)\n- ```Metadata``` - log line marked as \"Metadata\"\n- ```Request``` - log line marked as \"Request\"\n- ```RequestResponse``` - log line marked as \"RequestResponse\"\n\nThe library supports the following rules k8s audit PolicyRules:\n\n- ```level```\n- ```verbs```\n- ```users```\n- ```userGroups```\n- ```namespaces```\n- ```resources``` - Partly supported. Please read about limitations below\n\n## Limitations\n\nThe library does not support following k8s Audit rules:\n\n- ```resources``` - please note, that unlike original k8s audit policy, empty string \"\" in \"group\" field will not return \"core\" group, but will return all groups\n- ```nonResourceURLs``` - notResourceURLs are not declared explicitly in audit logs\n- ```omitStages``` - now it does not discern any stages\n- ```omitManagedFields``` - omitManagedFields is not declared explicitly in audit logs\n",
"bugtrack_url": null,
"license": null,
"summary": "A tool to filter k8s audit logs",
"version": "0.3.0",
"split_keywords": [
"audit",
"filter",
"k8s"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "5c67fde502161a2b251666b4a46494d65b993869f3b7f210758f8f552867b384",
"md5": "6412862ea857a37d18ef28139fe16e42",
"sha256": "b9caaa3ad7990a36329e6eb298e745a7ac18f0aa0f1706680d1609b4d626a626"
},
"downloads": -1,
"filename": "k8s_audit_filter-0.3.0-py3-none-any.whl",
"has_sig": false,
"md5_digest": "6412862ea857a37d18ef28139fe16e42",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.8",
"size": 19607,
"upload_time": "2023-04-26T13:13:17",
"upload_time_iso_8601": "2023-04-26T13:13:17.646094Z",
"url": "https://files.pythonhosted.org/packages/5c/67/fde502161a2b251666b4a46494d65b993869f3b7f210758f8f552867b384/k8s_audit_filter-0.3.0-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2023-04-26 13:13:17",
"github": false,
"gitlab": false,
"bitbucket": false,
"lcname": "k8s-audit-filter"
}