UNCLASSIFIED

Commit 85417450 authored by Tim Seagren's avatar Tim Seagren
Browse files

adding a few scripts

parent bb517848
# purge-gitlab-user
Script used to remove a user from Gitlab projects
\ No newline at end of file
#!/usr/bin/env python3
import gitlab
import os
import sys
import argparse
import logging
import requests
from requests.auth import HTTPBasicAuth
def _remove_from_groups(gl, user):
groups = gl.groups.list(all=True)
for group in groups:
try:
membership = group.members.get(user.id)
print(group.name)
except gitlab.exceptions.GitlabGetError as e:
logs.warning(f'{e}: {user.name} is not a member of group {group.name}')
continue
if membership:
logs.info(f"Removing {user.name} from {group.name}")
try:
group.members.delete(user.id)
except gitlab.exceptions.GitlabDeleteError as e:
logs.warning(e)
continue
def _remove_from_projects(gl, user):
logs.info(f"Searching projects for {user.name}")
projects = gl.projects.list(all=True)
for project in projects:
try:
membership = project.members.get(user.id)
print(project.name)
except gitlab.exceptions.GitlabGetError as e:
logs.warning(f'{e}: {user.name} is not a member of project {project.name}')
continue
if membership:
logs.info(f"Removing {user.name} from {project.name}")
try:
project.members.delete(user.id)
except gitlab.exceptions.GitlabDeleteError as e:
logs.warning(e)
continue
# This was done with requests package because gitlab-python offers not obvious way of doing this
def _remove_user_tokens(gitlab_url, user):
try:
r = requests.get(f'{gitlab_url}/api/v4/personal_access_tokens?user_id={user.id}', headers={'PRIVATE_TOKEN': os.environ["PRIVATE_TOKEN"]})
r.raise_for_status()
except requests.exceptions.HTTPError as err:
raise sys.exit(err)
for token in r.json():
if token["active"]:
try:
r = requests.delete(f'{gitlab_url}/api/v4/personal_access_tokens/{token["id"]}?user_id={user.id}', headers={'PRIVATE_TOKEN': os.environ["PRIVATE_TOKEN"]})
r.raise_for_status()
except requests.exceptions.HTTPError as err:
raise logs.exit(err)
continue
def main():
parser= argparse.ArgumentParser(description="Remove project permissions from user")
parser.add_argument(
"users",
help="specify a user to remove",
metavar='N',
type=str,
nargs='+'
)
args = parser.parse_args()
gitlab_url = "https://repo1.dso.mil"
gl = gitlab.Gitlab(
gitlab_url, private_token=os.environ["PRIVATE_TOKEN"]
)
for username in args.users:
print(username)
user = gl.users.list(username=username)[0]
_remove_user_tokens(gitlab_url, user)
_remove_from_groups(gl, user)
_remove_from_projects(gl, user)
if __name__ == "__main__":
logs = logging.getLogger("rotate")
# Get logging level, set manually when running pipeline
loglevel = os.environ.get("LOGLEVEL", "INFO").upper()
if loglevel == "DEBUG":
logging.basicConfig(
level=loglevel,
filename="credential_rotation_log.out",
format="%(levelname)s [%(filename)s:%(lineno)d]: %(message)s",
)
logging.debug("Log level set to debug")
else:
logging.basicConfig(level=loglevel, format="%(levelname)s: %(message)s")
logging.info("Log level set to info")
sys.exit(main())
FROM registry1.dso.mil/ironbank/redhat/python/python36:3.6
RUN pip install boto3 requests
COPY rotate-credentials.py /opt/app-root/src/rotate-crednetials.py
ENTRYPOINT ["python3", "rotate-crednetials.py"]
# cluster-management
This repo is home to various scripts that are used to facilitate ad-hoc changes to cluster and application configuration.
\ No newline at end of file
#!/usr/bin/python
# Please see the checklist that this script automates in https://repo1.dso.mil/platform-one/private/big-bang/ironbank/ironbank-bootstrap/-/issues/353
#TODO update above to point to doc created for it
# NOTE not all of the items in that list can be automated: rotating AWS Access Keys must be done by C1D (or another superuser), SSH keys in bastions should be removed manually (hard to predict the contents of authorized_keys), SSH keys and admin.conf generated by Konvoy to supply access to the cluster and nodes respectively (updated by running konvoy on the bastion - don't want to automate that), and Satellite permissions (if satellite/IPA does have an API I don't want to know about it)
import requests
import argparse
import boto3
import os
import sys
import json
import base64
import logging
import ast
import random
import string
import secrets
from botocore.exceptions import ClientError
from requests.auth import HTTPBasicAuth
def _create_client():
try:
# Create a Secrets Manager client
session = boto3.session.Session(
region_name='us-gov-west-1'
)
client = session.client(
service_name='secretsmanager'
)
except:
logs.info(f"Error creating client: {e}")
sys.exit(1)
return client
def _store_new_password(client, secret_name, username, password, old_password):
secret_string = [
{
username: password
},
{
f'{username}-expired': old_password
},
]
try:
client.update_secret(
SecretId=secret_name,
SecretString=f'{secret_string}'
)
except ClientError as e:
if e.response['Error']['Code'] == 'DecryptionFailureException':
# Secrets Manager can't decrypt the protected secret text using the provided KMS key.
# Deal with the exception here, and/or rethrow at your discretion.
raise e
elif e.response['Error']['Code'] == 'InternalServiceErrorException':
# An error occurred on the server side.
# Deal with the exception here, and/or rethrow at your discretion.
raise e
elif e.response['Error']['Code'] == 'InvalidParameterException':
# You provided an invalid value for a parameter.
# Deal with the exception here, and/or rethrow at your discretion.
raise e
elif e.response['Error']['Code'] == 'InvalidRequestException':
# You provided a parameter value that is not valid for the current state of the resource.
# Deal with the exception here, and/or rethrow at your discretion.
raise e
elif e.response['Error']['Code'] == 'ResourceNotFoundException':
# We can't find the resource that you asked for.
# Deal with the exception here, and/or rethrow at your discretion.
raise e
def _get_credential(client, secret_name):
try:
get_secret_value_response = client.get_secret_value(
SecretId=secret_name
)
except ClientError as e:
if e.response['Error']['Code'] == 'DecryptionFailureException':
# Secrets Manager can't decrypt the protected secret text using the provided KMS key.
# Deal with the exception here, and/or rethrow at your discretion.
raise e
elif e.response['Error']['Code'] == 'InternalServiceErrorException':
# An error occurred on the server side.
# Deal with the exception here, and/or rethrow at your discretion.
raise e
elif e.response['Error']['Code'] == 'InvalidParameterException':
# You provided an invalid value for a parameter.
# Deal with the exception here, and/or rethrow at your discretion.
raise e
elif e.response['Error']['Code'] == 'InvalidRequestException':
# You provided a parameter value that is not valid for the current state of the resource.
# Deal with the exception here, and/or rethrow at your discretion.
raise e
elif e.response['Error']['Code'] == 'ResourceNotFoundException':
# We can't find the resource that you asked for.
# Deal with the exception here, and/or rethrow at your discretion.
raise e
else:
# Decrypts secret using the associated KMS CMK.
# Depending on whether the secret is a string or binary, one of these fields will be populated.
if 'SecretString' in get_secret_value_response:
secret_obj = ast.literal_eval(get_secret_value_response['SecretString'])
if isinstance(secret_obj, list):
for secret in secret_obj:
for key, value in secret.items():
if not "expired" in key:
username, password = key, value
else:
for key, value in secret_obj.items():
username, password = key, value
return username, password
else:
logs.error("secret not of the expected type, exiting")
sys.exit(1)
def _rotate_twistlock(twistlock_url, insecure, secret_name, proxies):
client = _create_client()
twistlock_username, twistlock_password = _get_credential(client, secret_name)
new_twistlock_password = secrets.token_urlsafe(16)
body = {
"oldPassword": twistlock_password,
"newPassword": new_twistlock_password
}
json_body = json.dumps(body)
base_url = f'https://{twistlock_url}/api/v1/users/password'
try:
r = requests.put(f'{base_url}', data=json.dumps(body), proxies=proxies, headers={"content-type": "application/json"}, auth=(twistlock_username, twistlock_password), verify=insecure)
r.raise_for_status()
except requests.exceptions.HTTPError as err:
raise sys.exit(err)
_store_new_password(client, secret_name, twistlock_username, new_twistlock_password, twistlock_password)
def _rotate_anchore(anchore_url, insecure, secret_name, proxies):
client = _create_client()
anchore_username, anchore_password = _get_credential(client, secret_name)
new_anchore_password = secrets.token_urlsafe(16)
body = {
"type": "password",
"value": new_anchore_password,
}
#TODO currently harcoding account and username for `admin` because this is mainly focused initially on rotation of users stored in secrets manager (shared). We should be able to make this more flexible though
base_url = f'https://{anchore_url}/v1/accounts/admin/users/admin/credentials'
try:
r = requests.post(f'{base_url}', data=json.dumps(body), proxies=proxies, headers={"content-type": "application/json"}, auth=(anchore_username, anchore_password), verify=insecure)
r.raise_for_status()
except requests.exceptions.HTTPError as err:
raise sys.exit(err)
_store_new_password(client, secret_name, anchore_username, new_anchore_password, anchore_password)
def _rotate_harbor(harbor_url, insecure, secret_name, proxies):
client = _create_client()
harbor_username, harbor_password = _get_credential(client, secret_name)
new_harbor_password = secrets.token_urlsafe(16)
body = {
"new_password": new_harbor_password,
"old_password": harbor_password,
}
#TODO currently harcoding account and username for `admin` because this is mainly focused initially on rotation of users stored in secrets manager (shared). We should be able to make this more flexible though
base_url = f'https://{harbor_url}/v1/accounts/admin/users/admin/credentials'
try:
r = requests.put(f'{base_url}', data=json.dumps(body), proxies=proxies, headers={"content-type": "application/json"}, auth=(harbor_username, harbor_password), verify=insecure)
r.raise_for_status()
except requests.exceptions.HTTPError as err:
raise sys.exit(err)
_store_new_password(client, secret_name, harbor_username, new_harbor_password, harbor_password)
def _rotate_elastic(elastic_url, insecure, secret_name, proxies):
client = _create_client()
elastic_username, elastic_password = _get_credential(client, secret_name)
new_elastic_password = secrets.token_urlsafe(16)
body = {
"password": new_elastic_password,
}
#TODO currently harcoding account and username for `admin` because this is mainly focused initially on rotation of users stored in secrets manager (shared). We should be able to make this more flexible though
base_url = f'https://{elastic_url}/_security/user/elastic/_password'
try:
r = requests.post(f'{base_url}', data=json.dumps(body), proxies=proxies, headers={"content-type": "application/json"}, auth=(elastic_username, elastic_password), verify=insecure)
r.raise_for_status()
except requests.exceptions.HTTPError as err:
raise sys.exit(err)
_store_new_password(client, secret_name, elastic_username, new_elastic_password, elastic_password)
def main():
parser = argparse.ArgumentParser(description="Rotate credentials arguments")
parser.add_argument(
"targets",
help="Select which service passwords to rotate.",
type=str,
default="all",
nargs=1,
choices=['twistlock', 'anchore', 'all']
)
parser.add_argument(
"--anchore-url",
help="Set anchore console endpoint",
type=str,
default="anchore-api.admin.dso.mil",
dest='anchore_url',
action='store'
)
parser.add_argument(
"--anchore-secret",
help="Set anchore secret name",
type=str,
default="Ironbank-bootstrap-prod-anchore",
dest='anchore_secret',
action='store'
)
parser.add_argument(
"--twistlock-url",
help="Set twistlock console endpoint",
type=str,
default="twistlock.admin.dso.mil",
dest='twistlock_url',
action='store'
)
parser.add_argument(
"--twistlock-secret",
help="Set twistlock secret name",
type=str,
default="ironbank-bootstrap-prod-twistlock",
dest='twistlock_secret',
action='store'
)
parser.add_argument(
'--proxy',
help="Add proxy for staging",
default={}
)
parser.add_argument(
'--insecure',
help="ignore SSL verification",
default=True,
action='store_false'
)
args = parser.parse_args()
proxies = args.proxy
if args.proxy:
proxies = {
'http': f'{args.proxy}',
'https': f'{args.proxy}',
}
print(proxies)
target = args.targets[0]
if target == "all":
logs.info("Rotating all service credentials")
_rotate_anchore(args.anchore_url, args.insecure, args.anchore_secret, proxies)
_rotate_twistlock(args.twistlock_url, args.insecure, args.twistlock_secret, proxies)
#TODO We should update this password via CaC rather than the api. Maybe if we move to robot accounts with API access we can revisit
#_rotate_harbor(args.harbor_url, args.insecure, args.harbor_secret, proxies)
#TODO For elastic, we'll want to configure this password via the elasticsearch-es-elastic-user.
#_rotate_elastic(args.elastic_url, args.insecure, args.elastic_secret, proxies)
#TODO For grafana, we'll want to configure this password via the helm chart. Therefore, we'll need to continue manually changing it until we upgrade to flux.
#_rotate_elastic(args.elastic_url, args.insecure, args.elastic_secret, proxies)
elif target == "twistlock":
_rotate_twistlock(args.twistlock_url, args.insecure, args.twistlock_secret, proxies)
elif target == "anchore":
_rotate_anchore(args.anchore_url, args.insecure, args.anchore_secret, proxies)
elif target == "harbor":
print("not yet")
sys.exit(0)
#_rotate_harbor(args.harbor_url, args.insecure, args.harbor_secret, proxies)
elif target == "elastic":
print("not yet")
sys.exit(0)
#_rotate_elastic(args.elastic_url, args.insecure, args.elastic_secret, proxies)
if __name__ == "__main__":
logs = logging.getLogger("rotate")
# Get logging level, set manually when running pipeline
loglevel = os.environ.get("LOGLEVEL", "INFO").upper()
if loglevel == "DEBUG":
logging.basicConfig(
level=loglevel,
filename="credential_rotation_log.out",
format="%(levelname)s [%(filename)s:%(lineno)d]: %(message)s",
)
logging.debug("Log level set to debug")
else:
logging.basicConfig(level=loglevel, format="%(levelname)s: %(message)s")
logging.info("Log level set to info")
sys.exit(main())
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment