from falconpy import IdentityProtection
import pandas as pd
from getpass import getpass
from datetime import datetime, timezone
import json
# Set cloud dict
cloud_dict = {
'1': 'https://api.crowdstrike.com',
'2': 'https://api.us-2.crowdstrike.com',
'3': 'https://api.eu-1.crowdstrike.com',
'4': 'https://api.lagger.gcw.crowdstrike.com'
}
print("Which Falcon Cloud to use?")
print("1: (US-1)")
print("2: (US-2)")
print("3: (EU-1)")
print("4: (US-GOV-1)")
cloud_select = input("Enter your choice (1-4): ").strip()
if cloud_select not in cloud_dict:
print("invalid selection")
exit(1)
# set your creds and headers
API_URL = cloud_dict[cloud_select]
API_ID = getpass('Enter your API KEY ID: ')
API_KEY = getpass('Enter your API Secret: ')
# authenticate to API
falcon = IdentityProtection(client_id=API_ID, client_secret=API_KEY)
# initialize variables
has_next_page = True
# graphQL query from Crowdstrike
idp_query = """\
query ($after: Cursor) {
entities(types: [USER], riskFactorTypes: [DUPLICATE_PASSWORD, STALE_ACCOUNT], enabled:true, domainPattern: "(*mydomain*)", archived:false, after:$after, first: 1000) {
nodes {
accounts {
... on ActiveDirectoryAccountDescriptor {
samAccountName
domain
}
}
riskFactors {
type
... on DuplicatePasswordRiskEntityFactor {
groupId
}
}
}
pageInfo {
hasNextPage
startCursor
endCursor
}
}
}
"""
# get first cursor
try:
variables = {}
response = falcon.graphql(query=idp_query, variables=variables)
entities = response['body']['data']['entities']
end_cursor = entities['pageInfo']['startCursor']
except Exception as err:
print(f"Unexpected {err=}, {type(err)=}")
dataset = {'samAccountName': [], 'domain': [], 'groupId': [], 'weakPassword': [], 'stale_account': [], 'DUPLICATE_PASSWORD':[]}
# iterate all the pages
while has_next_page:
# set the after parameter in the GraphQL variables
variables = {"after": end_cursor}
# send GraphQL request
response = falcon.graphql(query=idp_query, variables=variables)
users = response
# process the response
if 'body' in users:
data = response['body']['data']
entities = data['entities']
page_info = entities['pageInfo']
nodes = entities['nodes']
# append records to the list
new_nodes = [nodes[i] for i in range(len(nodes))]
for node in new_nodes:
dataset['samAccountName'].append(node['accounts'][0]['samAccountName'])
dataset['domain'].append(node['accounts'][0]['domain'])
IS_WEAK_PASSWORD = any(factor['type'] == 'WEAK_PASSWORD' for factor in node['riskFactors'])
dataset['weakPassword'].append(IS_WEAK_PASSWORD)
is_stale_account = any(factor['type'] == 'STALE_ACCOUNT' for factor in node['riskFactors'])
dataset['stale_account'].append(is_stale_account)
IS_DUPLICATE_PASSWORD = any(factor['type'] == 'DUPLICATE_PASSWORD' for factor in node['riskFactors'])
dataset['DUPLICATE_PASSWORD'].append(IS_DUPLICATE_PASSWORD)
duplicate_password_group_ids = [factor['groupId'] for factor in node['riskFactors'] if factor['type'] == 'DUPLICATE_PASSWORD']
if duplicate_password_group_ids:
dataset['groupId'].extend(duplicate_password_group_ids)
else:
dataset['groupId'].append(None)
print(f'Record Count: {len(dataset["samAccountName"])}')
# check if there's a next page
has_next_page = page_info['hasNextPage']
end_cursor = page_info['endCursor']
# skip records after the endCursor on the current page
if not has_next_page:
break
# create the dataframe, fix group ID, export
df = pd.DataFrame(dataset)
df['groupId'] = df['groupId'].str.split('_').str[0]
# Print to desired location
df.to_csv("IDP_Data_" + str(datetime.now(timezone.utc).strftime('%Y_%m_%d-%H-%M-%S')) + ".csv", index=False)
|