Source code for aws_session_recorder.lib.schema.functions

import json
from typing import TYPE_CHECKING, Iterator

from aws_session_recorder.lib.helpers import AlwaysDoNothing
from aws_session_recorder.lib.schema.group import Group, GroupPolicy
from aws_session_recorder.lib.schema.role import Role, InstanceProfile, RolePolicy
from aws_session_recorder.lib.schema.policy import Policy, PolicyVersion, UserPolicyAttachments, RolePolicyAttachments, \
    GroupPolicyAttachments
from aws_session_recorder.lib.schema.user import User, AccessKey, UserPolicy

if TYPE_CHECKING:
    from mypy_boto3_iam import type_defs as t  # type: ignore
else:
    t = AlwaysDoNothing()
    client = AlwaysDoNothing()


[docs]def GetUser(req_params: dict, resp: 't.GetUserResponseTypeDef'): return User(resp['User'])
[docs]def ListUsers(req_params: dict, resp: 't.ListUsersResponseTypeDef'): for node in resp['Users']: yield User(node)
[docs]def GetGroup(req_params: dict, resp: 't.GetUserResponseTypeDef') -> Iterator[Group]: grp: Group = Group(resp['Group']) grp.users = [User(user) for user in resp['Users']] yield grp
[docs]def ListGroups(req_params: dict, resp: 't.ListGroupsResponseTypeDef'): for node in resp['Groups']: yield Group(node)
[docs]def ListAccessKeys(req_params: dict, resp: 't.ListAccessKeysResponseTypeDef') -> Iterator[AccessKey]: for key in resp['AccessKeyMetadata']: yield AccessKey(key)
[docs]def ListUserPolicies(req_params: dict, resp: 't.ListUserPoliciesResponseTypeDef'): # UserName is only present in the req_params, merge with resp so we can correlate the list to a user for policy_name in resp['PolicyNames']: yield UserPolicy({'UserName': req_params['UserName'], 'PolicyName': policy_name})
[docs]def GetUserPolicy(req_params: dict, resp: 't.GetUserPolicyResponseTypeDef'): # This key actually does exist, tests will fail if we remove this if resp.get('ResponseMetadata'): # type: ignore[misc] del resp['ResponseMetadata'] # type: ignore[misc] return UserPolicy(resp)
[docs]def GetRolePolicy(req_params: dict, resp: 't.GetRolePolicyResponseTypeDef'): # This key actually does exist, tests will fail if we remove this if resp.get('ResponseMetadata'): # type: ignore[misc] del resp['ResponseMetadata'] # type: ignore[misc] return RolePolicy(resp)
[docs]def ListRolePolicies(req_params: dict, resp: 't.ListRolePoliciesResponseTypeDef'): # UserName is only present in the req_params, merge with resp so we can correlate the list to a user for policy_name in resp['PolicyNames']: yield RolePolicy({'RoleName': req_params['RoleName'], 'PolicyName': policy_name})
[docs]def GetGroupPolicy(req_params: dict, resp: 't.GetGroupPolicyResponseTypeDef') -> GroupPolicy: # This key actually does exist, tests will fail if we remove this if resp.get('ResponseMetadata'): # type: ignore[misc] del resp['ResponseMetadata'] # type: ignore[misc] return GroupPolicy(resp)
[docs]def ListGroupPolicies(req_params: dict, resp: 't.ListGroupPoliciesResponseTypeDef'): # UserName is only present in the req_params, merge with resp so we can correlate the list to a user for policy_name in resp['PolicyNames']: yield GroupPolicy({'GroupName': req_params['GroupName'], 'PolicyName': policy_name})
[docs]def GetPolicy(req_params: dict, resp: 't.GetPolicyResponseTypeDef') -> Policy: return Policy(resp['Policy'])
[docs]def ListPolicies(req_params: dict, resp: 't.ListPoliciesResponseTypeDef') -> Iterator[Policy]: for p in resp['Policies']: yield Policy(p)
[docs]def GetPolicyVersion(req_params: dict, resp: 't.GetPolicyVersionResponseTypeDef'): return PolicyVersion(resp['PolicyVersion'])
[docs]def ListPolicyVersions(req_params: dict, resp: t.ListPolicyVersionsResponseTypeDef): for version in resp['Versions']: yield PolicyVersion(version)
[docs]def GetInstanceProfile(req_params: dict, resp: 't.GetInstanceProfileResponseTypeDef'): # TODO: handle datetime in role policy resp['InstanceProfile']['Roles'] = json.loads(json.dumps(resp['InstanceProfile']['Roles'], default=str)) return InstanceProfile(resp['InstanceProfile'])
[docs]def GetRole(req_params: dict, resp: 't.GetRoleResponseTypeDef'): # RoleLastUsed.LastUsedDate returns a datetime.datetime object, workaround this by serialize/deserializing # with default=str # TODO: better way to handle this without making RoleLastUsed a JSONType? if 'RoleLastUsed' in resp['Role']: resp['Role']['RoleLastUsed'] = json.loads(json.dumps(resp['Role']['RoleLastUsed'], default=str)) return Role(resp["Role"])
[docs]def ListRoles(req_params: dict, resp: t.ListRolesResponseTypeDef): for node in resp['Roles']: yield Role(node)
[docs]def ListAttachedUserPolicies(req_params: dict, resp: 't.ListAttachedUserPoliciesResponseTypeDef'): for node in resp['AttachedPolicies']: node.update(req_params) yield Policy({'Arn': node['PolicyArn'], 'PolicyName': node['PolicyName']}) yield UserPolicyAttachments(node)
[docs]def ListAttachedRolePolicies(req_params: dict, resp: 't.ListAttachedRolePoliciesResponseTypeDef'): for node in resp['AttachedPolicies']: node.update(req_params) yield Policy({'Arn': node['PolicyArn'], 'PolicyName': node['PolicyName']}) yield RolePolicyAttachments(node)
[docs]def ListAttachedGroupPolicies(req_params: dict, resp: 't.ListAttachedGroupPoliciesResponseTypeDef'): for node in resp['AttachedPolicies']: node.update(req_params) yield Policy({'Arn': node['PolicyArn'], 'PolicyName': node['PolicyName']}) yield GroupPolicyAttachments(node)
# TODO: # * get-account-authorization-details # * get-ssh-public-key # * list-entities-for-policy # * list-ssh-public-keys # * list-service-specific-credentials # * list-virtual-mfa-devices # * *-tags # # # * list-saml-providers # * list-server-certificates # # o simulate-custom-policy ? # o simulate-principal-policy ? # * get-organizations-access-report? # * get-account-summary ? # * get-access-key-last-used ? # ApiCallMap = { 'GetUser': GetUser, 'ListUsers': ListUsers, 'GetRole': GetRole, 'ListRoles': ListRoles, 'ListAttachedUserPolicies': ListAttachedUserPolicies, 'ListAttachedRolePolicies': ListAttachedRolePolicies, 'ListAttachedGroupPolicies': ListAttachedGroupPolicies, 'GetUserPolicy': GetUserPolicy, 'ListUserPolicies': ListUserPolicies, 'ListPolicies': ListPolicies, 'GetRolePolicy': GetRolePolicy, 'ListRolePolicies': ListRolePolicies, 'GetGroupPolicy': GetGroupPolicy, 'ListGroupPolicies': ListGroupPolicies, 'GetPolicy': GetPolicy, 'GetPolicyVersion': GetPolicyVersion, 'ListPolicyVersions': ListPolicyVersions, 'GetInstanceProfile': GetInstanceProfile, 'ListAccessKeys': ListAccessKeys, 'GetGroup': GetGroup, 'ListGroups': ListGroups, }