Falconpy scripts

← Back to Categories
Category Description Action
👤 Identity Protection API Print accounts with Duplicate Passwords List accounts that share a password
            from falconpy import IdentityProtection
            import pandas as pd
            from getpass import getpass
            from datetime import datetime, timezone
            import json
            
            
            # Set cloud dict
            cloud_dict = {
                '1': 'https://api.crowdstrike.com',
                '2': 'https://api.us-2.crowdstrike.com',
                '3': 'https://api.eu-1.crowdstrike.com',
                '4': 'https://api.lagger.gcw.crowdstrike.com'
            }
            
            print("Which Falcon Cloud to use?")
            print("1: (US-1)")
            print("2: (US-2)")
            print("3: (EU-1)")
            print("4: (US-GOV-1)")
            
            cloud_select = input("Enter your choice (1-4): ").strip()
            if cloud_select not in cloud_dict:
                print("invalid selection")
                exit(1)
            
            # set your creds and headers
            API_URL = cloud_dict[cloud_select]
            API_ID = getpass('Enter your API KEY ID: ')
            API_KEY = getpass('Enter your API Secret: ')
            
            # authenticate to API
            falcon = IdentityProtection(client_id=API_ID, client_secret=API_KEY)
            
            # initialize variables
            has_next_page = True
            
            # graphQL query from Crowdstrike
            idp_query = """\
            query ($after: Cursor) {
              entities(types: [USER], riskFactorTypes: [DUPLICATE_PASSWORD, STALE_ACCOUNT], enabled:true, domainPattern: "(*mydomain*)", archived:false, after:$after, first: 1000) {
                nodes {
                  accounts {
                    ... on ActiveDirectoryAccountDescriptor {
                      samAccountName
                      domain
                    }
                  }
                  riskFactors {
                    type
                    ... on DuplicatePasswordRiskEntityFactor {
                      groupId
                    }
                  }
                }
                pageInfo {
                  hasNextPage
                  startCursor
                  endCursor
                }
              }
            }
            """
            
            # get first cursor
            try:
                variables = {}
                response = falcon.graphql(query=idp_query, variables=variables)
                entities  = response['body']['data']['entities']
                end_cursor = entities['pageInfo']['startCursor']
            except Exception as err:
                print(f"Unexpected {err=}, {type(err)=}")
            
            dataset = {'samAccountName': [], 'domain': [], 'groupId': [], 'weakPassword': [], 'stale_account': [], 'DUPLICATE_PASSWORD':[]}
            
            # iterate all the pages
            while has_next_page:
                # set the after parameter in the GraphQL variables 
                variables = {"after": end_cursor}
                
                # send GraphQL request
                response = falcon.graphql(query=idp_query, variables=variables)
                users = response
                
                # process the response
                if 'body' in users:
                    data = response['body']['data']
                    entities = data['entities']
                    page_info = entities['pageInfo']
                    nodes = entities['nodes']
                   
                # append records  to the list
                new_nodes = [nodes[i] for i in range(len(nodes))]
                for node in new_nodes:
                    dataset['samAccountName'].append(node['accounts'][0]['samAccountName'])
                    dataset['domain'].append(node['accounts'][0]['domain'])
                    IS_WEAK_PASSWORD = any(factor['type'] == 'WEAK_PASSWORD' for factor in node['riskFactors'])
                    dataset['weakPassword'].append(IS_WEAK_PASSWORD)
                    is_stale_account = any(factor['type'] == 'STALE_ACCOUNT' for factor in node['riskFactors'])
                    dataset['stale_account'].append(is_stale_account)
                    IS_DUPLICATE_PASSWORD = any(factor['type'] == 'DUPLICATE_PASSWORD' for factor in node['riskFactors']) 
                    dataset['DUPLICATE_PASSWORD'].append(IS_DUPLICATE_PASSWORD)
                    duplicate_password_group_ids = [factor['groupId'] for factor in node['riskFactors'] if factor['type'] == 'DUPLICATE_PASSWORD']
                    if duplicate_password_group_ids:
                        dataset['groupId'].extend(duplicate_password_group_ids)
                    else:
                        dataset['groupId'].append(None)
                    
                print(f'Record Count: {len(dataset["samAccountName"])}') 
                
                # check if there's a next page 
                has_next_page = page_info['hasNextPage']
                end_cursor = page_info['endCursor']
                
                # skip records after the endCursor on the current page
                if not has_next_page:
                    break
            
            # create the dataframe, fix group ID, export
            df = pd.DataFrame(dataset)
            df['groupId'] = df['groupId'].str.split('_').str[0]
            # Print to desired location
            df.to_csv("IDP_Data_" + str(datetime.now(timezone.utc).strftime('%Y_%m_%d-%H-%M-%S')) + ".csv", index=False)