secops


Namesecops JSON
Version 0.8.1 PyPI version JSON
download
home_pageNone
SummaryPython SDK for wrapping the Google SecOps API for common use cases
upload_time2025-07-24 08:52:41
maintainerNone
docs_urlNone
authorNone
requires_python>=3.7
licenseNone
keywords chronicle google secops security
VCS
bugtrack_url
requirements pytest pytest-cov build black packaging pathspec protobuf pylint twine python-dotenv
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # Google SecOps SDK for Python

[![PyPI version](https://img.shields.io/pypi/v/secops.svg)](https://pypi.org/project/secops/)


A Python SDK for interacting with Google Security Operations products, currently supporting Chronicle/SecOps SIEM.
This wraps the API for common use cases, including UDM searches, entity lookups, IoCs, alert management, case management, and detection rule management.

## Installation

```bash
pip install secops
```

## Command Line Interface

The SDK also provides a comprehensive command-line interface (CLI) that makes it easy to interact with Google Security Operations products from your terminal:

```bash
# Save your credentials
secops config set --customer-id "your-instance-id" --project-id "your-project-id" --region "us"

# Now use commands without specifying credentials each time
secops search --query "metadata.event_type = \"NETWORK_CONNECTION\""
```

For detailed CLI documentation and examples, see the [CLI Documentation](https://github.com/google/secops-wrapper/blob/main/CLI.md).


## Authentication

The SDK supports two main authentication methods:

### 1. Application Default Credentials (ADC)

The simplest and recommended way to authenticate the SDK. Application Default Credentials provide a consistent authentication method that works across different Google Cloud environments and local development.

There are several ways to use ADC:

#### a. Using `gcloud` CLI (Recommended for Local Development)

```bash
# Login and set up application-default credentials
gcloud auth application-default login
```

Then in your code:
```python
from secops import SecOpsClient

# Initialize with default credentials - no explicit configuration needed
client = SecOpsClient()
```

#### b. Using Environment Variable

Set the environment variable pointing to your service account key:
```bash
export GOOGLE_APPLICATION_CREDENTIALS="/path/to/service-account.json"
```

Then in your code:
```python
from secops import SecOpsClient

# Initialize with default credentials - will automatically use the credentials file
client = SecOpsClient()
```

#### c. Google Cloud Environment (Automatic)

When running on Google Cloud services (Compute Engine, Cloud Functions, Cloud Run, etc.), ADC works automatically without any configuration:

```python
from secops import SecOpsClient

# Initialize with default credentials - will automatically use the service account 
# assigned to your Google Cloud resource
client = SecOpsClient()
```

ADC will automatically try these authentication methods in order:
1. Environment variable `GOOGLE_APPLICATION_CREDENTIALS`
2. Google Cloud SDK credentials (set by `gcloud auth application-default login`)
3. Google Cloud-provided service account credentials
4. Local service account impersonation credentials

### 2. Service Account Authentication

For more explicit control, you can authenticate using a service account that is created in the Google Cloud project associated with Google SecOps.

**Important Note on Permissions:**
* This service account needs to be granted the appropriate Identity and Access Management (IAM) role to interact with the Google Secops (Chronicle) API. The recommended predefined role is **Chronicle API Admin** (`roles/chronicle.admin`). Alternatively, if your security policies require more granular control, you can create a custom IAM role with the specific permissions needed for the operations you intend to use (e.g., `chronicle.instances.get`, `chronicle.events.create`, `chronicle.rules.list`, etc.). 

Once the service account is properly permissioned, you can authenticate using it in two ways: 

#### a. Using a Service Account JSON File

```python
from secops import SecOpsClient

# Initialize with service account JSON file
client = SecOpsClient(service_account_path="/path/to/service-account.json")
```

#### b. Using Service Account Info Dictionary

If you prefer to manage credentials programmatically without a file, you can create a dictionary containing the service account key's contents.

```python
from secops import SecOpsClient

# Service account details as a dictionary
service_account_info = {
    "type": "service_account",
    "project_id": "your-project-id",
    "private_key_id": "key-id",
    "private_key": "-----BEGIN PRIVATE KEY-----\n...",
    "client_email": "service-account@project.iam.gserviceaccount.com",
    "client_id": "client-id",
    "auth_uri": "https://accounts.google.com/o/oauth2/auth",
    "token_uri": "https://oauth2.googleapis.com/token",
    "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
    "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/..."
}

# Initialize with service account info
client = SecOpsClient(service_account_info=service_account_info)
```

### Impersonate Service Account

Both [Application Default Credentials](#1-application-default-credentials-adc) and [Service Account Authentication](#2-service-account-authentication) supports impersonating a Service Account leveraging the corresponding `impersonate_service_account` parameter as per the following configuration:

```python
from secops import SecOpsClient

# Initialize with default credentials and impersonate service account
client = SecOpsClient(impersonate_service_account="secops@test-project.iam.gserviceaccount.com")
```

## Using the Chronicle API

### Initializing the Chronicle Client

After creating a SecOpsClient, you need to initialize the Chronicle-specific client:

```python
# Initialize Chronicle client
chronicle = client.chronicle(
    customer_id="your-chronicle-instance-id",  # Your Chronicle instance ID
    project_id="your-project-id",             # Your GCP project ID
    region="us"                               # Chronicle API region 
)
```
[See available regions](https://github.com/google/secops-wrapper/blob/main/regions.md)

### Log Ingestion

Ingest raw logs directly into Chronicle:

```python
from datetime import datetime, timezone
import json

# Create a sample log (this is an OKTA log)
current_time = datetime.now(timezone.utc).isoformat().replace('+00:00', 'Z')
okta_log = {
    "actor": {
        "alternateId": "mark.taylor@cymbal-investments.org",
        "displayName": "Mark Taylor",
        "id": "00u4j7xcb5N6zfiRP5d8",
        "type": "User"
    },
    "client": {
        "userAgent": {
            "rawUserAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36",
            "os": "Windows 10",
            "browser": "CHROME"
        },
        "ipAddress": "96.6.127.53",
        "geographicalContext": {
            "city": "New York",
            "state": "New York",
            "country": "United States",
            "postalCode": "10118",
            "geolocation": {"lat": 40.7123, "lon": -74.0068}
        }
    },
    "displayMessage": "Max sign in attempts exceeded",
    "eventType": "user.account.lock",
    "outcome": {"result": "FAILURE", "reason": "LOCKED_OUT"},
    "published": "2025-06-19T21:51:50.116Z",
    "securityContext": {
        "asNumber": 20940,
        "asOrg": "akamai technologies inc.",
        "isp": "akamai international b.v.",
        "domain": "akamaitechnologies.com",
        "isProxy": false
    },
    "severity": "DEBUG",
    "legacyEventType": "core.user_auth.account_locked",
    "uuid": "5b90a94a-d7ba-11ea-834a-85c24a1b2121",
    "version": "0"
    # ... additional OKTA log fields may be included
}

# Ingest a single log using the default forwarder
result = chronicle.ingest_log(
    log_type="OKTA",  # Chronicle log type
    log_message=json.dumps(okta_log)  # JSON string of the log
)

print(f"Operation: {result.get('operation')}")

# Batch ingestion: Ingest multiple logs in a single request
batch_logs = [
    json.dumps({"actor": {"displayName": "User 1"}, "eventType": "user.session.start"}),
    json.dumps({"actor": {"displayName": "User 2"}, "eventType": "user.session.start"}),
    json.dumps({"actor": {"displayName": "User 3"}, "eventType": "user.session.start"})
]

# Ingest multiple logs in a single API call
batch_result = chronicle.ingest_log(
    log_type="OKTA",
    log_message=batch_logs  # List of log message strings
)

print(f"Batch operation: {batch_result.get('operation')}")

# Add custom labels to your logs
labeled_result = chronicle.ingest_log(
    log_type="OKTA",
    log_message=json.dumps(okta_log),
    labels={"environment": "production", "app": "web-portal", "team": "security"}
)
```
The SDK also supports non-JSON log formats. Here's an example with XML for Windows Event logs:

```python
# Create a Windows Event XML log
xml_content = """<Event xmlns='http://schemas.microsoft.com/win/2004/08/events/event'>
  <System>
    <Provider Name='Microsoft-Windows-Security-Auditing' Guid='{54849625-5478-4994-A5BA-3E3B0328C30D}'/>
    <EventID>4624</EventID>
    <Version>1</Version>
    <Level>0</Level>
    <Task>12544</Task>
    <Opcode>0</Opcode>
    <Keywords>0x8020000000000000</Keywords>
    <TimeCreated SystemTime='2024-05-10T14:30:00Z'/>
    <EventRecordID>202117513</EventRecordID>
    <Correlation/>
    <Execution ProcessID='656' ThreadID='700'/>
    <Channel>Security</Channel>
    <Computer>WIN-SERVER.xyz.net</Computer>
    <Security/>
  </System>
  <EventData>
    <Data Name='SubjectUserSid'>S-1-0-0</Data>
    <Data Name='SubjectUserName'>-</Data>
    <Data Name='TargetUserName'>svcUser</Data>
    <Data Name='WorkstationName'>CLIENT-PC</Data>
    <Data Name='LogonType'>3</Data>
  </EventData>
</Event>"""

# Ingest the XML log - no json.dumps() needed for XML
result = chronicle.ingest_log(
    log_type="WINEVTLOG_XML",  # Windows Event Log XML format
    log_message=xml_content    # Raw XML content
)

print(f"Operation: {result.get('operation')}")
```
The SDK supports all log types available in Chronicle. You can:

1. View available log types:
```python
# Get all available log types
log_types = chronicle.get_all_log_types()
for lt in log_types[:5]:  # Show first 5
    print(f"{lt.id}: {lt.description}")
```

2. Search for specific log types:
```python
# Search for log types related to firewalls
firewall_types = chronicle.search_log_types("firewall")
for lt in firewall_types:
    print(f"{lt.id}: {lt.description}")
```

3. Validate log types:
```python
# Check if a log type is valid
if chronicle.is_valid_log_type("OKTA"):
    print("Valid log type")
else:
    print("Invalid log type")
```

4. Use custom forwarders:
```python
# Create or get a custom forwarder
forwarder = chronicle.get_or_create_forwarder(display_name="MyCustomForwarder")
forwarder_id = forwarder["name"].split("/")[-1]

# Use the custom forwarder for log ingestion
result = chronicle.ingest_log(
    log_type="WINDOWS",
    log_message=json.dumps(windows_log),
    forwarder_id=forwarder_id
)
```

5. Use custom timestamps:
```python
from datetime import datetime, timedelta, timezone

# Define custom timestamps
log_entry_time = datetime.now(timezone.utc) - timedelta(hours=1)
collection_time = datetime.now(timezone.utc)

result = chronicle.ingest_log(
    log_type="OKTA",
    log_message=json.dumps(okta_log),
    log_entry_time=log_entry_time,  # When the log was generated
    collection_time=collection_time  # When the log was collected
)
```

Ingest UDM events directly into Chronicle:

```python
import uuid
from datetime import datetime, timezone

# Generate a unique ID
event_id = str(uuid.uuid4())

# Get current time in ISO 8601 format
current_time = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")

# Create a UDM event for a network connection
network_event = {
    "metadata": {
        "id": event_id,
        "event_timestamp": current_time,
        "event_type": "NETWORK_CONNECTION",
        "product_name": "My Security Product", 
        "vendor_name": "My Company"
    },
    "principal": {
        "hostname": "workstation-1",
        "ip": "192.168.1.100",
        "port": 12345
    },
    "target": {
        "ip": "203.0.113.10",
        "port": 443
    },
    "network": {
        "application_protocol": "HTTPS",
        "direction": "OUTBOUND"
    }
}

# Ingest a single UDM event
result = chronicle.ingest_udm(udm_events=network_event)
print(f"Ingested event with ID: {event_id}")

# Create a second event
process_event = {
    "metadata": {
        # No ID - one will be auto-generated
        "event_timestamp": current_time,
        "event_type": "PROCESS_LAUNCH",
        "product_name": "My Security Product", 
        "vendor_name": "My Company"
    },
    "principal": {
        "hostname": "workstation-1",
        "process": {
            "command_line": "ping 8.8.8.8",
            "pid": 1234
        },
        "user": {
            "userid": "user123"
        }
    }
}

# Ingest multiple UDM events in a single call
result = chronicle.ingest_udm(udm_events=[network_event, process_event])
print("Multiple events ingested successfully")
```

### Data Export

> **Note**: The Data Export API features are currently under test and review. We welcome your feedback and encourage you to submit any issues or unexpected behavior to the issue tracker so we can improve this functionality.

You can export Chronicle logs to Google Cloud Storage using the Data Export API:

```python
from datetime import datetime, timedelta, timezone

# Set time range for export
end_time = datetime.now(timezone.utc)
start_time = end_time - timedelta(days=1)  # Last 24 hours

# Get available log types for export
available_log_types = chronicle.fetch_available_log_types(
    start_time=start_time,
    end_time=end_time
)

# Print available log types
for log_type in available_log_types["available_log_types"]:
    print(f"{log_type.display_name} ({log_type.log_type.split('/')[-1]})")
    print(f"  Available from {log_type.start_time} to {log_type.end_time}")

# Create a data export for a specific log type
export = chronicle.create_data_export(
    gcs_bucket="projects/my-project/buckets/my-export-bucket",
    start_time=start_time,
    end_time=end_time,
    log_type="GCP_DNS"  # Specify log type to export
)

# Get the export ID
export_id = export["name"].split("/")[-1]
print(f"Created export with ID: {export_id}")
print(f"Status: {export['data_export_status']['stage']}")

# Check export status
status = chronicle.get_data_export(export_id)
print(f"Export status: {status['data_export_status']['stage']}")
print(f"Progress: {status['data_export_status'].get('progress_percentage', 0)}%")

# Cancel an export if needed
if status['data_export_status']['stage'] in ['IN_QUEUE', 'PROCESSING']:
    cancelled = chronicle.cancel_data_export(export_id)
    print(f"Export has been cancelled. New status: {cancelled['data_export_status']['stage']}")

# Export all log types at once
export_all = chronicle.create_data_export(
    gcs_bucket="projects/my-project/buckets/my-export-bucket",
    start_time=start_time,
    end_time=end_time,
    export_all_logs=True
)

print(f"Created export for all logs. Status: {export_all['data_export_status']['stage']}")
```

The Data Export API supports:
- Exporting one or all log types to Google Cloud Storage
- Checking export status and progress
- Cancelling exports in progress
- Fetching available log types for a specific time range

If you encounter any issues with the Data Export functionality, please submit them to our issue tracker with detailed information about the problem and steps to reproduce.

### Basic UDM Search

Search for network connection events:

```python
from datetime import datetime, timedelta, timezone

# Set time range for queries
end_time = datetime.now(timezone.utc)
start_time = end_time - timedelta(hours=24)  # Last 24 hours

# Perform UDM search
results = chronicle.search_udm(
    query="""
    metadata.event_type = "NETWORK_CONNECTION"
    ip != ""
    """,
    start_time=start_time,
    end_time=end_time,
    max_events=5
)

# Example response:
{
    "events": [
        {
            "name": "projects/my-project/locations/us/instances/my-instance/events/encoded-event-id",
            "udm": {
                "metadata": {
                    "eventTimestamp": "2024-02-09T10:30:00Z",
                    "eventType": "NETWORK_CONNECTION"
                },
                "target": {
                    "ip": ["192.168.1.100"],
                    "port": 443
                },
                "principal": {
                    "hostname": "workstation-1"
                }
            }
        }
    ],
    "total_events": 1,
    "more_data_available": false
}
```

### Statistics Queries

Get statistics about network connections grouped by hostname:

```python
stats = chronicle.get_stats(
    query="""metadata.event_type = "NETWORK_CONNECTION"
match:
    target.hostname
outcome:
    $count = count(metadata.id)
order:
    $count desc""",
    start_time=start_time,
    end_time=end_time,
    max_events=1000,
    max_values=10,
    timeout=180
)

# Example response:
{
    "columns": ["hostname", "count"],
    "rows": [
        {"hostname": "server-1", "count": 1500},
        {"hostname": "server-2", "count": 1200}
    ],
    "total_rows": 2
}
```

### CSV Export

Export specific fields to CSV format:

```python
csv_data = chronicle.fetch_udm_search_csv(
    query='metadata.event_type = "NETWORK_CONNECTION"',
    start_time=start_time,
    end_time=end_time,
    fields=["timestamp", "user", "hostname", "process name"]
)

# Example response:
"""
metadata.eventTimestamp,principal.hostname,target.ip,target.port
2024-02-09T10:30:00Z,workstation-1,192.168.1.100,443
2024-02-09T10:31:00Z,workstation-2,192.168.1.101,80
"""
```

### Query Validation

Validate a UDM query before execution:

```python
query = 'target.ip != "" and principal.hostname = "test-host"'
validation = chronicle.validate_query(query)

# Example response:
{
    "isValid": true,
    "queryType": "QUERY_TYPE_UDM_QUERY",
    "suggestedFields": [
        "target.ip",
        "principal.hostname"
    ]
}
```

### Natural Language Search

Search for events using natural language instead of UDM query syntax:

```python
from datetime import datetime, timedelta, timezone

# Set time range for queries
end_time = datetime.now(timezone.utc)
start_time = end_time - timedelta(hours=24)  # Last 24 hours

# Option 1: Translate natural language to UDM query
udm_query = chronicle.translate_nl_to_udm("show me network connections")
print(f"Translated query: {udm_query}")
# Example output: 'metadata.event_type="NETWORK_CONNECTION"'

# Then run the query manually if needed
results = chronicle.search_udm(
    query=udm_query,
    start_time=start_time,
    end_time=end_time
)

# Option 2: Perform complete search with natural language
results = chronicle.nl_search(
    text="show me failed login attempts",
    start_time=start_time,
    end_time=end_time,
    max_events=100
)

# Example response (same format as search_udm):
{
    "events": [
        {
            "event": {
                "metadata": {
                    "eventTimestamp": "2024-02-09T10:30:00Z",
                    "eventType": "USER_LOGIN"
                },
                "principal": {
                    "user": {
                        "userid": "jdoe"
                    }
                },
                "securityResult": {
                    "action": "BLOCK",
                    "summary": "Failed login attempt"
                }
            }
        }
    ],
    "total_events": 1
}
```

The natural language search feature supports various query patterns:
- "Show me network connections"
- "Find suspicious processes"
- "Show login failures in the last hour"
- "Display connections to IP address 192.168.1.100"

If the natural language cannot be translated to a valid UDM query, an `APIError` will be raised with a message indicating that no valid query could be generated.

### Entity Summary

Get detailed information about specific entities like IP addresses, domains, or file hashes. The function automatically detects the entity type based on the provided value and fetches a comprehensive summary including related entities, alerts, timeline, prevalence, and more.

```python
# IP address summary
ip_summary = chronicle.summarize_entity(
    value="8.8.8.8",
    start_time=start_time,
    end_time=end_time
)

# Domain summary
domain_summary = chronicle.summarize_entity(
    value="google.com",
    start_time=start_time,
    end_time=end_time
)

# File hash summary (SHA256)
file_hash = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855" 
file_summary = chronicle.summarize_entity(
    value=file_hash,
    start_time=start_time,
    end_time=end_time
)

# Optionally hint the preferred type if auto-detection might be ambiguous
user_summary = chronicle.summarize_entity(
    value="jdoe",
    start_time=start_time,
    end_time=end_time,
    preferred_entity_type="USER"
)


# Example response structure (EntitySummary object):
# Access attributes like: ip_summary.primary_entity, ip_summary.related_entities,
# ip_summary.alert_counts, ip_summary.timeline, ip_summary.prevalence, etc.

# Example fields within the EntitySummary object:
# primary_entity: {
#     "name": "entities/...",
#     "metadata": {
#         "entityType": "ASSET",  # Or FILE, DOMAIN_NAME, USER, etc.
#         "interval": { "startTime": "...", "endTime": "..." }
#     },
#     "metric": { "firstSeen": "...", "lastSeen": "..." },
#     "entity": {  # Contains specific details like 'asset', 'file', 'domain'
#         "asset": { "ip": ["8.8.8.8"] }
#     }
# }
# related_entities: [ { ... similar to primary_entity ... } ]
# alert_counts: [ { "rule": "Rule Name", "count": 5 } ]
# timeline: { "buckets": [ { "alertCount": 1, "eventCount": 10 } ], "bucketSize": "3600s" }
# prevalence: [ { "prevalenceTime": "...", "count": 100 } ]
# file_metadata_and_properties: {  # Only for FILE entities
#     "metadata": [ { "key": "...", "value": "..." } ],
#     "properties": [ { "title": "...", "properties": [ { "key": "...", "value": "..." } ] } ]
# }
```

### List IoCs (Indicators of Compromise)

Retrieve IoC matches against ingested events:

```python
iocs = chronicle.list_iocs(
    start_time=start_time,
    end_time=end_time,
    max_matches=1000,
    add_mandiant_attributes=True,
    prioritized_only=False
)

# Process the results
for ioc in iocs['matches']:
    ioc_type = next(iter(ioc['artifactIndicator'].keys()))
    ioc_value = next(iter(ioc['artifactIndicator'].values()))
    print(f"IoC Type: {ioc_type}, Value: {ioc_value}")
    print(f"Sources: {', '.join(ioc['sources'])}")
```

The IoC response includes:
- The indicator itself (domain, IP, hash, etc.)
- Sources and categories
- Affected assets in your environment
- First and last seen timestamps
- Confidence scores and severity ratings
- Associated threat actors and malware families (with Mandiant attributes)

### Alerts and Case Management

Retrieve alerts and their associated cases:

```python
# Get non-closed alerts
alerts = chronicle.get_alerts(
    start_time=start_time,
    end_time=end_time,
    snapshot_query='feedback_summary.status != "CLOSED"',
    max_alerts=100
)

# Get alerts from the response
alert_list = alerts.get('alerts', {}).get('alerts', [])

# Extract case IDs from alerts
case_ids = {alert.get('caseName') for alert in alert_list if alert.get('caseName')}

# Get case details using the batch API
if case_ids:
    cases = chronicle.get_cases(list(case_ids))
    
    # Process cases
    for case in cases.cases:
        print(f"Case: {case.display_name}")
        print(f"Priority: {case.priority}")
        print(f"Status: {case.status}")
        print(f"Stage: {case.stage}")
        
        # Access SOAR platform information if available
        if case.soar_platform_info:
            print(f"SOAR Case ID: {case.soar_platform_info.case_id}")
            print(f"SOAR Platform: {case.soar_platform_info.platform_type}")
```

The alerts response includes:
- Progress status and completion status
- Alert counts (baseline and filtered)
- Alert details (rule information, detection details, etc.)
- Case associations

You can filter alerts using the snapshot query parameter with fields like:
- `detection.rule_name`
- `detection.alert_state`
- `feedback_summary.verdict`
- `feedback_summary.priority`
- `feedback_summary.status`

### Case Management Helpers

The `CaseList` class provides helper methods for working with cases:

```python
# Get details for specific cases (uses the batch API)
cases = chronicle.get_cases(["case-id-1", "case-id-2"])

# Filter cases by priority
high_priority = cases.filter_by_priority("PRIORITY_HIGH")

# Filter cases by status
open_cases = cases.filter_by_status("STATUS_OPEN")

# Look up a specific case
case = cases.get_case("case-id-1")
```

> **Note**: The case management API uses the `legacy:legacyBatchGetCases` endpoint to retrieve multiple cases in a single request. You can retrieve up to 1000 cases in a single batch.

## Parser Management

Chronicle parsers are used to process and normalize raw log data into Chronicle's Unified Data Model (UDM) format. Parsers transform various log formats (JSON, XML, CEF, etc.) into a standardized structure that enables consistent querying and analysis across different data sources.

The SDK provides comprehensive support for managing Chronicle parsers:

### Creating Parsers

Create new parser:

```python
parser_text = """
filter {
    mutate {
      replace => {
        "event1.idm.read_only_udm.metadata.event_type" => "GENERIC_EVENT"
        "event1.idm.read_only_udm.metadata.vendor_name" =>  "ACME Labs"
      }
    }
    grok {
      match => {
        "message" => ["^(?P<_firstWord>[^\s]+)\s.*$"]
      }
      on_error => "_grok_message_failed"
    }
    if ![_grok_message_failed] {
      mutate {
        replace => {
          "event1.idm.read_only_udm.metadata.description" => "%{_firstWord}"
        }
      }
    }
    mutate {
      merge => {
        "@output" => "event1"
      }
    }
}
"""

log_type = "WINDOWS_AD"

# Create the parser
parser = chronicle.create_parser(
    log_type=log_type, 
    parser_code=parser_text,
    validated_on_empty_logs=True  # Whether to validate parser on empty logs
)
parser_id = parser.get("name", "").split("/")[-1]
print(f"Parser ID: {parser_id}")
```

### Managing Parsers

Retrieve, list, copy, activate/deactivate, and delete parsers:

```python
# List all parsers
parsers = chronicle.list_parsers()
for parser in parsers:
    parser_id = parser.get("name", "").split("/")[-1]
    state = parser.get("state")
    print(f"Parser ID: {parser_id}, State: {state}")

log_type = "WINDOWS_AD"
    
# Get specific parser
parser = chronicle.get_parser(log_type=log_type, id=parser_id)
print(f"Parser content: {parser.get('text')}")

# Activate/Deactivate parser
chronicle.activate_parser(log_type=log_type, id=parser_id)
chronicle.deactivate_parser(log_type=log_type, id=parser_id)

# Copy an existing parser as a starting point
copied_parser = chronicle.copy_parser(log_type=log_type, id="pa_existing_parser")

# Delete parser
chronicle.delete_parser(log_type=log_type, id=parser_id)

# Force delete an active parser
chronicle.delete_parser(log_type=log_type, id=parser_id, force=True)

# Activate a release candidate parser
chronicle.activate_release_candidate_parser(log_type=log_type, id="pa_release_candidate")
```

> **Note:** Parsers work in conjunction with log ingestion. When you ingest logs using `chronicle.ingest_log()`, Chronicle automatically applies the appropriate parser based on the log type to transform your raw logs into UDM format. If you're working with custom log formats, you may need to create or configure custom parsers first.

### Run Parser against sample logs

Run the parser on one or more sample logs:

```python
# Sample parser code that extracts fields from logs
parser_text = """
filter {
    mutate {
      replace => {
        "event1.idm.read_only_udm.metadata.event_type" => "GENERIC_EVENT"
        "event1.idm.read_only_udm.metadata.vendor_name" =>  "ACME Labs"
      }
    }
    grok {
      match => {
        "message" => ["^(?P<_firstWord>[^\s]+)\s.*$"]
      }
      on_error => "_grok_message_failed"
    }
    if ![_grok_message_failed] {
      mutate {
        replace => {
          "event1.idm.read_only_udm.metadata.description" => "%{_firstWord}"
        }
      }
    }
    mutate {
      merge => {
        "@output" => "event1"
      }
    }
}
"""

log_type = "WINDOWS_AD"

# Sample log entries to test
sample_logs = [
    '{"message": "ERROR: Failed authentication attempt"}',
    '{"message": "WARNING: Suspicious activity detected"}',
    '{"message": "INFO: User logged in successfully"}'
]

# Run parser evaluation
result = chronicle.run_parser(
    log_type=log_type, 
    parser_code=parser_text,
    parser_extension_code=None,  # Optional parser extension
    logs=sample_logs,
    statedump_allowed=False  # Enable if using statedump filters
)

# Check the results
if "runParserResults" in result:
    for i, parser_result in enumerate(result["runParserResults"]):
        print(f"\nLog {i+1} parsing result:")
        if "parsedEvents" in parser_result:
            print(f"  Parsed events: {parser_result['parsedEvents']}")
        if "errors" in parser_result:
            print(f"  Errors: {parser_result['errors']}")
```

The `run_parser` function includes comprehensive validation:
- Validates log type and parser code are provided
- Ensures logs are provided as a list of strings
- Enforces size limits (10MB per log, 50MB total, max 1000 logs)
- Provides detailed error messages for different failure scenarios

### Complete Parser Workflow Example

Here's a complete example that demonstrates retrieving a parser, running it against a log, and ingesting the parsed UDM event:

```python
# Step 1: List and retrieve an OKTA parser
parsers = chronicle.list_parsers(log_type="OKTA")
parser_id = parsers[0]["name"].split("/")[-1]
parser_details = chronicle.get_parser(log_type="OKTA", id=parser_id)

# Extract and decode parser code
import base64
parser_code = base64.b64decode(parser_details["cbn"]).decode('utf-8')

# Step 2: Run the parser against a sample log
okta_log = {
    "actor": {"alternateId": "user@example.com", "displayName": "Test User"},
    "eventType": "user.account.lock",
    "outcome": {"result": "FAILURE", "reason": "LOCKED_OUT"},
    "published": "2025-06-19T21:51:50.116Z"
    # ... other OKTA log fields
}

result = chronicle.run_parser(
    log_type="OKTA",
    parser_code=parser_code,
    parser_extension_code=None,
    logs=[json.dumps(okta_log)]
)

# Step 3: Extract and ingest the parsed UDM event
if result["runParserResults"][0]["parsedEvents"]:
    # parsedEvents is a dict with 'events' key containing the actual events list
    parsed_events_data = result["runParserResults"][0]["parsedEvents"]
    if isinstance(parsed_events_data, dict) and "events" in parsed_events_data:
        events = parsed_events_data["events"]
        if events and len(events) > 0:
            # Extract the first event
            if "event" in events[0]:
                udm_event = events[0]["event"]
            else:
                udm_event = events[0]
            
            # Ingest the parsed UDM event back into Chronicle
            ingest_result = chronicle.ingest_udm(udm_events=udm_event)
            print(f"UDM event ingested: {ingest_result}")
```

This workflow is useful for:
- Testing parsers before deployment
- Understanding how logs are transformed to UDM format
- Re-processing logs with updated parsers
- Debugging parsing issues

## Rule Management

The SDK provides comprehensive support for managing Chronicle detection rules:

### Creating Rules

Create new detection rules using YARA-L 2.0 syntax:

```python
rule_text = """
rule simple_network_rule {
    meta:
        description = "Example rule to detect network connections"
        author = "SecOps SDK Example"
        severity = "Medium"
        priority = "Medium"
        yara_version = "YL2.0"
        rule_version = "1.0"
    events:
        $e.metadata.event_type = "NETWORK_CONNECTION"
        $e.principal.hostname != ""
    condition:
        $e
}
"""

# Create the rule
rule = chronicle.create_rule(rule_text)
rule_id = rule.get("name", "").split("/")[-1]
print(f"Rule ID: {rule_id}")
```

### Managing Rules

Retrieve, list, update, enable/disable, and delete rules:

```python
# List all rules
rules = chronicle.list_rules()
for rule in rules.get("rules", []):
    rule_id = rule.get("name", "").split("/")[-1]
    enabled = rule.get("deployment", {}).get("enabled", False)
    print(f"Rule ID: {rule_id}, Enabled: {enabled}")

# List paginated rules and `REVISION_METADATA_ONLY` view
rules = chronicle.list_rules(view="REVISION_METADATA_ONLY",page_size=50)
print(f"Fetched {len(rules.get("rules"))} rules")

# Get specific rule
rule = chronicle.get_rule(rule_id)
print(f"Rule content: {rule.get('text')}")

# Update rule
updated_rule = chronicle.update_rule(rule_id, updated_rule_text)

# Enable/disable rule
deployment = chronicle.enable_rule(rule_id, enabled=True)  # Enable
deployment = chronicle.enable_rule(rule_id, enabled=False) # Disable

# Delete rule
chronicle.delete_rule(rule_id)
```

### Searching Rules

Search for rules using regular expressions:

```python
# Search for rules containing specific patterns
results = chronicle.search_rules("suspicious process")
for rule in results.get("rules", []):
    rule_id = rule.get("name", "").split("/")[-1]
    print(f"Rule ID: {rule_id}, contains: 'suspicious process'")
    
# Find rules mentioning a specific MITRE technique
mitre_rules = chronicle.search_rules("T1055")
print(f"Found {len(mitre_rules.get('rules', []))} rules mentioning T1055 technique")
```

### Testing Rules

Test rules against historical data to validate their effectiveness before deployment:

```python
from datetime import datetime, timedelta, timezone

# Define time range for testing
end_time = datetime.now(timezone.utc)
start_time = end_time - timedelta(days=7)  # Test against last 7 days

# Rule to test
rule_text = """
rule test_rule {
    meta:
        description = "Test rule for validation"
        author = "Test Author"
        severity = "Low"
        yara_version = "YL2.0"
        rule_version = "1.0"
    events:
        $e.metadata.event_type = "NETWORK_CONNECTION"
    condition:
        $e
}
"""

# Test the rule
test_results = chronicle.run_rule_test(
    rule_text=rule_text,
    start_time=start_time,
    end_time=end_time,
    max_results=100
)

# Process streaming results
detection_count = 0
for result in test_results:
    result_type = result.get("type")
    
    if result_type == "progress":
        # Progress update
        percent_done = result.get("percentDone", 0)
        print(f"Progress: {percent_done}%")
    
    elif result_type == "detection":
        # Detection result
        detection_count += 1
        detection = result.get("detection", {})
        print(f"Detection {detection_count}:")
        
        # Process detection details
        if "rule_id" in detection:
            print(f"  Rule ID: {detection['rule_id']}")
        if "data" in detection:
            print(f"  Data: {detection['data']}")
            
    elif result_type == "error":
        # Error information
        print(f"Error: {result.get('message', 'Unknown error')}")

print(f"Finished testing. Found {detection_count} detection(s).")
```

# Extract just the UDM events for programmatic processing
```python
udm_events = []
for result in chronicle.run_rule_test(rule_text, start_time, end_time, max_results=100):
    if result.get("type") == "detection":
        detection = result.get("detection", {})
        result_events = detection.get("resultEvents", {})
        
        for var_name, var_data in result_events.items():
            event_samples = var_data.get("eventSamples", [])
            for sample in event_samples:
                event = sample.get("event")
                if event:
                    udm_events.append(event)

# Process the UDM events
for event in udm_events:
    # Process each UDM event
    metadata = event.get("metadata", {})
    print(f"Event type: {metadata.get('eventType')}")
```

### Retrohunts

Run rules against historical data to find past matches:

```python
from datetime import datetime, timedelta, timezone

# Set time range for retrohunt
end_time = datetime.now(timezone.utc)
start_time = end_time - timedelta(days=7)  # Search past 7 days

# Create retrohunt
retrohunt = chronicle.create_retrohunt(rule_id, start_time, end_time)
operation_id = retrohunt.get("name", "").split("/")[-1]

# Check retrohunt status
retrohunt_status = chronicle.get_retrohunt(rule_id, operation_id)
is_complete = retrohunt_status.get("metadata", {}).get("done", False)
```

### Detections and Errors

Monitor rule detections and execution errors:

```python
# List detections for a rule
detections = chronicle.list_detections(rule_id)
for detection in detections.get("detections", []):
    detection_id = detection.get("id", "")
    event_time = detection.get("eventTime", "")
    alerting = detection.get("alertState", "") == "ALERTING"
    print(f"Detection: {detection_id}, Time: {event_time}, Alerting: {alerting}")

# List execution errors for a rule
errors = chronicle.list_errors(rule_id)
for error in errors.get("ruleExecutionErrors", []):
    error_message = error.get("error_message", "")
    create_time = error.get("create_time", "")
    print(f"Error: {error_message}, Time: {create_time}")
```

### Rule Alerts

Search for alerts generated by rules:

```python
# Set time range for alert search
end_time = datetime.now(timezone.utc)
start_time = end_time - timedelta(days=7)  # Search past 7 days

# Search for rule alerts
alerts_response = chronicle.search_rule_alerts(
    start_time=start_time,
    end_time=end_time,
    page_size=10
)

# The API returns a nested structure where alerts are grouped by rule
# Extract and process all alerts from this structure
all_alerts = []
too_many_alerts = alerts_response.get('tooManyAlerts', False)

# Process the nested response structure - alerts are grouped by rule
for rule_alert in alerts_response.get('ruleAlerts', []):
    # Extract rule metadata
    rule_metadata = rule_alert.get('ruleMetadata', {})
    rule_id = rule_metadata.get('properties', {}).get('ruleId', 'Unknown')
    rule_name = rule_metadata.get('properties', {}).get('name', 'Unknown')
    
    # Get alerts for this rule
    rule_alerts = rule_alert.get('alerts', [])
    
    # Process each alert
    for alert in rule_alerts:
        # Extract important fields
        alert_id = alert.get("id", "")
        detection_time = alert.get("detectionTimestamp", "")
        commit_time = alert.get("commitTimestamp", "")
        alerting_type = alert.get("alertingType", "")
        
        print(f"Alert ID: {alert_id}")
        print(f"Rule ID: {rule_id}")
        print(f"Rule Name: {rule_name}")
        print(f"Detection Time: {detection_time}")
        
        # Extract events from the alert
        if 'resultEvents' in alert:
            for var_name, event_data in alert.get('resultEvents', {}).items():
                if 'eventSamples' in event_data:
                    for sample in event_data.get('eventSamples', []):
                        if 'event' in sample:
                            event = sample['event']
                            # Process event data
                            event_type = event.get('metadata', {}).get('eventType', 'Unknown')
                            print(f"Event Type: {event_type}")
```

If `tooManyAlerts` is True in the response, consider narrowing your search criteria using a smaller time window or more specific filters.

### Rule Sets

Manage curated rule sets:

```python
# Define deployments for rule sets
deployments = [
    {
        "category_id": "category-uuid",
        "rule_set_id": "ruleset-uuid",
        "precision": "broad",
        "enabled": True,
        "alerting": False
    }
]

# Update rule set deployments
chronicle.batch_update_curated_rule_set_deployments(deployments)
```

### Rule Validation

Validate a YARA-L2 rule before creating or updating it:

```python
# Example rule
rule_text = """
rule test_rule {
    meta:
        description = "Test rule for validation"
        author = "Test Author"
        severity = "Low"
        yara_version = "YL2.0"
        rule_version = "1.0"
    events:
        $e.metadata.event_type = "NETWORK_CONNECTION"
    condition:
        $e
}
"""

# Validate the rule
result = chronicle.validate_rule(rule_text)

if result.success:
    print("Rule is valid")
else:
    print(f"Rule is invalid: {result.message}")
    if result.position:
        print(f"Error at line {result.position['startLine']}, column {result.position['startColumn']}")
```

## Data Tables and Reference Lists

Chronicle provides two ways to manage and reference structured data in detection rules: Data Tables and Reference Lists. These can be used to maintain lists of trusted/suspicious entities, mappings of contextual information, or any other structured data useful for detection.

### Data Tables

Data Tables are collections of structured data with defined columns and data types. They can be referenced in detection rules to enhance your detections with additional context.

#### Creating Data Tables

```python
from secops.chronicle.data_table import DataTableColumnType

# Create a data table with different column types
data_table = chronicle.create_data_table(
    name="suspicious_ips",
    description="Known suspicious IP addresses with context",
    header={
        "ip_address": DataTableColumnType.CIDR,
        "severity": DataTableColumnType.STRING,
        "description": DataTableColumnType.STRING
    },
    # Optional: Add initial rows
    rows=[
        ["192.168.1.100", "High", "Scanning activity"],
        ["10.0.0.5", "Medium", "Suspicious login attempts"]
    ]
)

print(f"Created table: {data_table['name']}")
```

#### Managing Data Tables

```python
# List all data tables
tables = chronicle.list_data_tables()
for table in tables:
    table_id = table["name"].split("/")[-1]
    print(f"Table: {table_id}, Created: {table.get('createTime')}")

# Get a specific data table's details
table_details = chronicle.get_data_table("suspicious_ips")
print(f"Column count: {len(table_details.get('columnInfo', []))}")

# Add rows to a data table
chronicle.create_data_table_rows(
    "suspicious_ips",
    [
        ["172.16.0.1", "Low", "Unusual outbound connection"],
        ["192.168.2.200", "Critical", "Data exfiltration attempt"]
    ]
)

# List rows in a data table
rows = chronicle.list_data_table_rows("suspicious_ips")
for row in rows:
    row_id = row["name"].split("/")[-1]
    values = row.get("values", [])
    print(f"Row {row_id}: {values}")

# Delete specific rows by ID
row_ids = [rows[0]["name"].split("/")[-1], rows[1]["name"].split("/")[-1]]
chronicle.delete_data_table_rows("suspicious_ips", row_ids)

# Delete a data table
chronicle.delete_data_table("suspicious_ips", force=True)  # force=True deletes even if it has rows
```

### Reference Lists

Reference Lists are simple lists of values (strings, CIDR blocks, or regex patterns) that can be referenced in detection rules. They are useful for maintaining whitelists, blacklists, or any other categorized sets of values.

#### Creating Reference Lists

```python
from secops.chronicle.reference_list import ReferenceListSyntaxType, ReferenceListView

# Create a reference list with string entries
string_list = chronicle.create_reference_list(
    name="admin_accounts",
    description="Administrative user accounts",
    entries=["admin", "administrator", "root", "system"],
    syntax_type=ReferenceListSyntaxType.STRING
)

print(f"Created reference list: {string_list['name']}")

# Create a reference list with CIDR entries
cidr_list = chronicle.create_reference_list(
    name="trusted_networks",
    description="Internal network ranges",
    entries=["10.0.0.0/8", "192.168.0.0/16", "172.16.0.0/12"],
    syntax_type=ReferenceListSyntaxType.CIDR
)

# Create a reference list with regex patterns
regex_list = chronicle.create_reference_list(
    name="email_patterns",
    description="Email patterns to watch for",
    entries=[".*@suspicious\\.com", "malicious_.*@.*\\.org"],
    syntax_type=ReferenceListSyntaxType.REGEX
)
```

#### Managing Reference Lists

```python
# List all reference lists (basic view without entries)
lists = chronicle.list_reference_lists(view=ReferenceListView.BASIC)
for ref_list in lists:
    list_id = ref_list["name"].split("/")[-1]
    print(f"List: {list_id}, Description: {ref_list.get('description')}")

# Get a specific reference list including all entries
admin_list = chronicle.get_reference_list("admin_accounts", view=ReferenceListView.FULL)
entries = [entry.get("value") for entry in admin_list.get("entries", [])]
print(f"Admin accounts: {entries}")

# Update reference list entries
chronicle.update_reference_list(
    name="admin_accounts",
    entries=["admin", "administrator", "root", "system", "superuser"]
)

# Update reference list description
chronicle.update_reference_list(
    name="admin_accounts",
    description="Updated administrative user accounts list"
)

```

### Using in YARA-L Rules

Both Data Tables and Reference Lists can be referenced in YARA-L detection rules.

#### Using Data Tables in Rules

```
rule detect_with_data_table {
    meta:
        description = "Detect connections to suspicious IPs"
        author = "SecOps SDK Example"
        severity = "Medium"
        yara_version = "YL2.0"
    events:
        $e.metadata.event_type = "NETWORK_CONNECTION"
        $e.target.ip != ""
        $lookup in data_table.suspicious_ips
        $lookup.ip_address = $e.target.ip
        $severity = $lookup.severity
        
    condition:
        $e and $lookup and $severity = "High"
}
```

#### Using Reference Lists in Rules

```
rule detect_with_reference_list {
    meta:
        description = "Detect admin account usage from untrusted networks"
        author = "SecOps SDK Example" 
        severity = "High"
        yara_version = "YL2.0"
    events:
        $login.metadata.event_type = "USER_LOGIN"
        $login.principal.user.userid in reference_list.admin_accounts
        not $login.principal.ip in reference_list.trusted_networks
        
    condition:
        $login
}
```

## Gemini AI

You can use Chronicle's Gemini AI to get security insights, generate detection rules, explain security concepts, and more:

> **Note:** Only enterprise tier users have access to Advanced Gemini features. Users must opt-in to use Gemini in Chronicle before accessing this functionality. 
The SDK will automatically attempt to opt you in when you first use the Gemini functionality. If the automatic opt-in fails due to permission issues, 
you'll see an error message that includes "users must opt-in before using Gemini."

```python
# Query Gemini with a security question
response = chronicle.gemini("What is Windows event ID 4625?")

# Get text content (combines TEXT blocks and stripped HTML content)
text_explanation = response.get_text_content()
print("Explanation:", text_explanation)

# Work with different content blocks
for block in response.blocks:
    print(f"Block type: {block.block_type}")
    if block.block_type == "TEXT":
        print("Text content:", block.content)
    elif block.block_type == "CODE":
        print(f"Code ({block.title}):", block.content)
    elif block.block_type == "HTML":
        print("HTML content (with tags):", block.content)

# Get all code blocks
code_blocks = response.get_code_blocks()
for code_block in code_blocks:
    print(f"Code block ({code_block.title}):", code_block.content)

# Get all HTML blocks (with HTML tags preserved)
html_blocks = response.get_html_blocks()
for html_block in html_blocks:
    print(f"HTML block (with tags):", html_block.content)

# Check for references
if response.references:
    print(f"Found {len(response.references)} references")

# Check for suggested actions
for action in response.suggested_actions:
    print(f"Suggested action: {action.display_text} ({action.action_type})")
    if action.navigation:
        print(f"Action URI: {action.navigation.target_uri}")
```

### Response Content Methods

The `GeminiResponse` class provides several methods to work with response content:

- `get_text_content()`: Returns a combined string of all TEXT blocks plus the text content from HTML blocks with HTML tags removed
- `get_code_blocks()`: Returns a list of blocks with `block_type == "CODE"`
- `get_html_blocks()`: Returns a list of blocks with `block_type == "HTML"` (HTML tags preserved)
- `get_raw_response()`: Returns the complete, unprocessed API response as a dictionary

These methods help you work with different types of content in a structured way.

### Accessing Raw API Response

For advanced use cases or debugging, you can access the raw API response:

```python
# Get the complete raw API response
response = chronicle.gemini("What is Windows event ID 4625?")
raw_response = response.get_raw_response()

# Now you can access any part of the original JSON structure
print(json.dumps(raw_response, indent=2))

# Example of navigating the raw response structure
if "responses" in raw_response:
    for resp in raw_response["responses"]:
        if "blocks" in resp:
            print(f"Found {len(resp['blocks'])} blocks in raw response")
```

This gives you direct access to the original API response format, which can be useful for accessing advanced features or troubleshooting.

### Manual Opt-In

If your account has sufficient permissions, you can manually opt-in to Gemini before using it:

```python
# Manually opt-in to Gemini
opt_success = chronicle.opt_in_to_gemini()
if opt_success:
    print("Successfully opted in to Gemini")
else:
    print("Unable to opt-in due to permission issues")

# Then use Gemini as normal
response = chronicle.gemini("What is Windows event ID 4625?")
```

This can be useful in environments where you want to explicitly control when the opt-in happens.

### Generate Detection Rules

Chronicle Gemini can generate YARA-L rules for detection:

```python
# Generate a rule to detect potential security issues
rule_response = chronicle.gemini("Write a rule to detect powershell downloading a file called gdp.zip")

# Extract the generated rule(s)
code_blocks = rule_response.get_code_blocks()
if code_blocks:
    rule = code_blocks[0].content
    print("Generated rule:", rule)
    
    # Check for rule editor action
    for action in rule_response.suggested_actions:
        if action.display_text == "Open in Rule Editor" and action.action_type == "NAVIGATION":
            rule_editor_url = action.navigation.target_uri
            print("Rule can be opened in editor:", rule_editor_url)
```

### Get Intel Information

Get detailed information about malware, threat actors, files, vulnerabilities:

```python
# Ask about a CVE
cve_response = chronicle.gemini("tell me about CVE-2021-44228")

# Get the explanation
cve_explanation = cve_response.get_text_content()
print("CVE explanation:", cve_explanation)
```

### Maintain Conversation Context

You can maintain conversation context by reusing the same conversation ID:

```python
# Start a conversation
initial_response = chronicle.gemini("What is a DDoS attack?")

# Get the conversation ID from the response
conversation_id = initial_response.name.split('/')[-3]  # Extract from format: .../conversations/{id}/messages/{id}

# Ask a follow-up question in the same conversation context
followup_response = chronicle.gemini(
    "What are the most common mitigation techniques?",
    conversation_id=conversation_id
)

# Gemini will remember the context of the previous question about DDoS
```

### Feed Management

Feeds are used to ingest data into Chronicle. The SDK provides methods to manage feeds.

```python
import json

# List existing feeds
feeds = chronicle.list_feeds()
print(f"Found {len(feeds)} feeds")

# Create a new feed
feed_details = {
    "logType": f"projects/your-project-id/locations/us/instances/your-chronicle-instance-id/logTypes/WINEVTLOG",
    "feedSourceType": "HTTP",
    "httpSettings": {
        "uri": "https://example.com/example_feed",
        "sourceType": "FILES",
    },
    "labels": {"environment": "production", "created_by": "secops_sdk"}
}

created_feed = chronicle.create_feed(
    display_name="My New Feed",
    details=feed_details
)

# Get feed ID from name
feed_id = created_feed["name"].split("/")[-1]
print(f"Feed created with ID: {feed_id}")

# Get feed details
feed_details = chronicle.get_feed(feed_id)
print(f"Feed state: {feed_details.get('state')}")

# Update feed
updated_details = {
    "httpSettings": {
        "uri": "https://example.com/updated_feed_url",
        "sourceType": "FILES"
    },
    "labels": {"environment": "production", "updated": "true"}
}

updated_feed = chronicle.update_feed(
    feed_id=feed_id,
    display_name="Updated Feed Name",
    details=updated_details
)

# Disable feed
disabled_feed = chronicle.disable_feed(feed_id)
print(f"Feed disabled. State: {disabled_feed.get('state')}")

# Enable feed
enabled_feed = chronicle.enable_feed(feed_id)
print(f"Feed enabled. State: {enabled_feed.get('state')}")

# Generate secret for feed (for supported feed types)
try:
    secret_result = chronicle.generate_secret(feed_id)
    print(f"Generated secret: {secret_result.get('secret')}")
except Exception as e:
    print(f"Error generating secret for feed: {e}")

# Delete feed
chronicle.delete_feed(feed_id)
print("Feed deleted successfully")
```

The Feed API supports different feed types such as HTTP, HTTPS Push, and S3 bucket data sources etc. Each feed type has specific configuration options that can be specified in the `details` dictionary.

> **Note**: Secret generation is only available for certain feed types that require authentication.

## Error Handling

The SDK defines several custom exceptions:

```python
from secops.exceptions import SecOpsError, AuthenticationError, APIError

try:
    results = chronicle.search_udm(...)
except AuthenticationError as e:
    print(f"Authentication failed: {e}")
except APIError as e:
    print(f"API request failed: {e}")
except SecOpsError as e:
    print(f"General error: {e}")
```

## Value Type Detection

The SDK automatically detects the most common entity types when using the `summarize_entity` function:
- IP addresses (IPv4 and IPv6)
- MD5/SHA1/SHA256 hashes
- Domain names
- Email addresses
- MAC addresses
- Hostnames

This detection happens internally within `summarize_entity`, simplifying its usage. You only need to provide the `value` argument.

```python
# The SDK automatically determines how to query for these values
ip_summary = chronicle.summarize_entity(value="192.168.1.100", ...)
domain_summary = chronicle.summarize_entity(value="example.com", ...)
hash_summary = chronicle.summarize_entity(value="e17dd4eef8b4978673791ef4672f4f6a", ...)
```

You can optionally provide a `preferred_entity_type` hint to `summarize_entity` if the automatic detection might be ambiguous (e.g., a string could be a username or a hostname).

## License

This project is licensed under the Apache License 2.0 - [see the LICENSE file for details.](https://github.com/google/secops-wrapper/blob/main/LICENSE)

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "secops",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.7",
    "maintainer_email": null,
    "keywords": "chronicle, google, secops, security",
    "author": null,
    "author_email": "Google SecOps Team <chronicle@google.com>",
    "download_url": "https://files.pythonhosted.org/packages/71/31/02a7179d8c52c7a6c0c9f346e2642101351162ed60adee0c099667da7a66/secops-0.8.1.tar.gz",
    "platform": null,
    "description": "# Google SecOps SDK for Python\n\n[![PyPI version](https://img.shields.io/pypi/v/secops.svg)](https://pypi.org/project/secops/)\n\n\nA Python SDK for interacting with Google Security Operations products, currently supporting Chronicle/SecOps SIEM.\nThis wraps the API for common use cases, including UDM searches, entity lookups, IoCs, alert management, case management, and detection rule management.\n\n## Installation\n\n```bash\npip install secops\n```\n\n## Command Line Interface\n\nThe SDK also provides a comprehensive command-line interface (CLI) that makes it easy to interact with Google Security Operations products from your terminal:\n\n```bash\n# Save your credentials\nsecops config set --customer-id \"your-instance-id\" --project-id \"your-project-id\" --region \"us\"\n\n# Now use commands without specifying credentials each time\nsecops search --query \"metadata.event_type = \\\"NETWORK_CONNECTION\\\"\"\n```\n\nFor detailed CLI documentation and examples, see the [CLI Documentation](https://github.com/google/secops-wrapper/blob/main/CLI.md).\n\n\n## Authentication\n\nThe SDK supports two main authentication methods:\n\n### 1. Application Default Credentials (ADC)\n\nThe simplest and recommended way to authenticate the SDK. Application Default Credentials provide a consistent authentication method that works across different Google Cloud environments and local development.\n\nThere are several ways to use ADC:\n\n#### a. Using `gcloud` CLI (Recommended for Local Development)\n\n```bash\n# Login and set up application-default credentials\ngcloud auth application-default login\n```\n\nThen in your code:\n```python\nfrom secops import SecOpsClient\n\n# Initialize with default credentials - no explicit configuration needed\nclient = SecOpsClient()\n```\n\n#### b. Using Environment Variable\n\nSet the environment variable pointing to your service account key:\n```bash\nexport GOOGLE_APPLICATION_CREDENTIALS=\"/path/to/service-account.json\"\n```\n\nThen in your code:\n```python\nfrom secops import SecOpsClient\n\n# Initialize with default credentials - will automatically use the credentials file\nclient = SecOpsClient()\n```\n\n#### c. Google Cloud Environment (Automatic)\n\nWhen running on Google Cloud services (Compute Engine, Cloud Functions, Cloud Run, etc.), ADC works automatically without any configuration:\n\n```python\nfrom secops import SecOpsClient\n\n# Initialize with default credentials - will automatically use the service account \n# assigned to your Google Cloud resource\nclient = SecOpsClient()\n```\n\nADC will automatically try these authentication methods in order:\n1. Environment variable `GOOGLE_APPLICATION_CREDENTIALS`\n2. Google Cloud SDK credentials (set by `gcloud auth application-default login`)\n3. Google Cloud-provided service account credentials\n4. Local service account impersonation credentials\n\n### 2. Service Account Authentication\n\nFor more explicit control, you can authenticate using a service account that is created in the Google Cloud project associated with Google SecOps.\n\n**Important Note on Permissions:**\n* This service account needs to be granted the appropriate Identity and Access Management (IAM) role to interact with the Google Secops (Chronicle) API. The recommended predefined role is **Chronicle API Admin** (`roles/chronicle.admin`). Alternatively, if your security policies require more granular control, you can create a custom IAM role with the specific permissions needed for the operations you intend to use (e.g., `chronicle.instances.get`, `chronicle.events.create`, `chronicle.rules.list`, etc.). \n\nOnce the service account is properly permissioned, you can authenticate using it in two ways: \n\n#### a. Using a Service Account JSON File\n\n```python\nfrom secops import SecOpsClient\n\n# Initialize with service account JSON file\nclient = SecOpsClient(service_account_path=\"/path/to/service-account.json\")\n```\n\n#### b. Using Service Account Info Dictionary\n\nIf you prefer to manage credentials programmatically without a file, you can create a dictionary containing the service account key's contents.\n\n```python\nfrom secops import SecOpsClient\n\n# Service account details as a dictionary\nservice_account_info = {\n    \"type\": \"service_account\",\n    \"project_id\": \"your-project-id\",\n    \"private_key_id\": \"key-id\",\n    \"private_key\": \"-----BEGIN PRIVATE KEY-----\\n...\",\n    \"client_email\": \"service-account@project.iam.gserviceaccount.com\",\n    \"client_id\": \"client-id\",\n    \"auth_uri\": \"https://accounts.google.com/o/oauth2/auth\",\n    \"token_uri\": \"https://oauth2.googleapis.com/token\",\n    \"auth_provider_x509_cert_url\": \"https://www.googleapis.com/oauth2/v1/certs\",\n    \"client_x509_cert_url\": \"https://www.googleapis.com/robot/v1/metadata/x509/...\"\n}\n\n# Initialize with service account info\nclient = SecOpsClient(service_account_info=service_account_info)\n```\n\n### Impersonate Service Account\n\nBoth [Application Default Credentials](#1-application-default-credentials-adc) and [Service Account Authentication](#2-service-account-authentication) supports impersonating a Service Account leveraging the corresponding `impersonate_service_account` parameter as per the following configuration:\n\n```python\nfrom secops import SecOpsClient\n\n# Initialize with default credentials and impersonate service account\nclient = SecOpsClient(impersonate_service_account=\"secops@test-project.iam.gserviceaccount.com\")\n```\n\n## Using the Chronicle API\n\n### Initializing the Chronicle Client\n\nAfter creating a SecOpsClient, you need to initialize the Chronicle-specific client:\n\n```python\n# Initialize Chronicle client\nchronicle = client.chronicle(\n    customer_id=\"your-chronicle-instance-id\",  # Your Chronicle instance ID\n    project_id=\"your-project-id\",             # Your GCP project ID\n    region=\"us\"                               # Chronicle API region \n)\n```\n[See available regions](https://github.com/google/secops-wrapper/blob/main/regions.md)\n\n### Log Ingestion\n\nIngest raw logs directly into Chronicle:\n\n```python\nfrom datetime import datetime, timezone\nimport json\n\n# Create a sample log (this is an OKTA log)\ncurrent_time = datetime.now(timezone.utc).isoformat().replace('+00:00', 'Z')\nokta_log = {\n    \"actor\": {\n        \"alternateId\": \"mark.taylor@cymbal-investments.org\",\n        \"displayName\": \"Mark Taylor\",\n        \"id\": \"00u4j7xcb5N6zfiRP5d8\",\n        \"type\": \"User\"\n    },\n    \"client\": {\n        \"userAgent\": {\n            \"rawUserAgent\": \"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36\",\n            \"os\": \"Windows 10\",\n            \"browser\": \"CHROME\"\n        },\n        \"ipAddress\": \"96.6.127.53\",\n        \"geographicalContext\": {\n            \"city\": \"New York\",\n            \"state\": \"New York\",\n            \"country\": \"United States\",\n            \"postalCode\": \"10118\",\n            \"geolocation\": {\"lat\": 40.7123, \"lon\": -74.0068}\n        }\n    },\n    \"displayMessage\": \"Max sign in attempts exceeded\",\n    \"eventType\": \"user.account.lock\",\n    \"outcome\": {\"result\": \"FAILURE\", \"reason\": \"LOCKED_OUT\"},\n    \"published\": \"2025-06-19T21:51:50.116Z\",\n    \"securityContext\": {\n        \"asNumber\": 20940,\n        \"asOrg\": \"akamai technologies inc.\",\n        \"isp\": \"akamai international b.v.\",\n        \"domain\": \"akamaitechnologies.com\",\n        \"isProxy\": false\n    },\n    \"severity\": \"DEBUG\",\n    \"legacyEventType\": \"core.user_auth.account_locked\",\n    \"uuid\": \"5b90a94a-d7ba-11ea-834a-85c24a1b2121\",\n    \"version\": \"0\"\n    # ... additional OKTA log fields may be included\n}\n\n# Ingest a single log using the default forwarder\nresult = chronicle.ingest_log(\n    log_type=\"OKTA\",  # Chronicle log type\n    log_message=json.dumps(okta_log)  # JSON string of the log\n)\n\nprint(f\"Operation: {result.get('operation')}\")\n\n# Batch ingestion: Ingest multiple logs in a single request\nbatch_logs = [\n    json.dumps({\"actor\": {\"displayName\": \"User 1\"}, \"eventType\": \"user.session.start\"}),\n    json.dumps({\"actor\": {\"displayName\": \"User 2\"}, \"eventType\": \"user.session.start\"}),\n    json.dumps({\"actor\": {\"displayName\": \"User 3\"}, \"eventType\": \"user.session.start\"})\n]\n\n# Ingest multiple logs in a single API call\nbatch_result = chronicle.ingest_log(\n    log_type=\"OKTA\",\n    log_message=batch_logs  # List of log message strings\n)\n\nprint(f\"Batch operation: {batch_result.get('operation')}\")\n\n# Add custom labels to your logs\nlabeled_result = chronicle.ingest_log(\n    log_type=\"OKTA\",\n    log_message=json.dumps(okta_log),\n    labels={\"environment\": \"production\", \"app\": \"web-portal\", \"team\": \"security\"}\n)\n```\nThe SDK also supports non-JSON log formats. Here's an example with XML for Windows Event logs:\n\n```python\n# Create a Windows Event XML log\nxml_content = \"\"\"<Event xmlns='http://schemas.microsoft.com/win/2004/08/events/event'>\n  <System>\n    <Provider Name='Microsoft-Windows-Security-Auditing' Guid='{54849625-5478-4994-A5BA-3E3B0328C30D}'/>\n    <EventID>4624</EventID>\n    <Version>1</Version>\n    <Level>0</Level>\n    <Task>12544</Task>\n    <Opcode>0</Opcode>\n    <Keywords>0x8020000000000000</Keywords>\n    <TimeCreated SystemTime='2024-05-10T14:30:00Z'/>\n    <EventRecordID>202117513</EventRecordID>\n    <Correlation/>\n    <Execution ProcessID='656' ThreadID='700'/>\n    <Channel>Security</Channel>\n    <Computer>WIN-SERVER.xyz.net</Computer>\n    <Security/>\n  </System>\n  <EventData>\n    <Data Name='SubjectUserSid'>S-1-0-0</Data>\n    <Data Name='SubjectUserName'>-</Data>\n    <Data Name='TargetUserName'>svcUser</Data>\n    <Data Name='WorkstationName'>CLIENT-PC</Data>\n    <Data Name='LogonType'>3</Data>\n  </EventData>\n</Event>\"\"\"\n\n# Ingest the XML log - no json.dumps() needed for XML\nresult = chronicle.ingest_log(\n    log_type=\"WINEVTLOG_XML\",  # Windows Event Log XML format\n    log_message=xml_content    # Raw XML content\n)\n\nprint(f\"Operation: {result.get('operation')}\")\n```\nThe SDK supports all log types available in Chronicle. You can:\n\n1. View available log types:\n```python\n# Get all available log types\nlog_types = chronicle.get_all_log_types()\nfor lt in log_types[:5]:  # Show first 5\n    print(f\"{lt.id}: {lt.description}\")\n```\n\n2. Search for specific log types:\n```python\n# Search for log types related to firewalls\nfirewall_types = chronicle.search_log_types(\"firewall\")\nfor lt in firewall_types:\n    print(f\"{lt.id}: {lt.description}\")\n```\n\n3. Validate log types:\n```python\n# Check if a log type is valid\nif chronicle.is_valid_log_type(\"OKTA\"):\n    print(\"Valid log type\")\nelse:\n    print(\"Invalid log type\")\n```\n\n4. Use custom forwarders:\n```python\n# Create or get a custom forwarder\nforwarder = chronicle.get_or_create_forwarder(display_name=\"MyCustomForwarder\")\nforwarder_id = forwarder[\"name\"].split(\"/\")[-1]\n\n# Use the custom forwarder for log ingestion\nresult = chronicle.ingest_log(\n    log_type=\"WINDOWS\",\n    log_message=json.dumps(windows_log),\n    forwarder_id=forwarder_id\n)\n```\n\n5. Use custom timestamps:\n```python\nfrom datetime import datetime, timedelta, timezone\n\n# Define custom timestamps\nlog_entry_time = datetime.now(timezone.utc) - timedelta(hours=1)\ncollection_time = datetime.now(timezone.utc)\n\nresult = chronicle.ingest_log(\n    log_type=\"OKTA\",\n    log_message=json.dumps(okta_log),\n    log_entry_time=log_entry_time,  # When the log was generated\n    collection_time=collection_time  # When the log was collected\n)\n```\n\nIngest UDM events directly into Chronicle:\n\n```python\nimport uuid\nfrom datetime import datetime, timezone\n\n# Generate a unique ID\nevent_id = str(uuid.uuid4())\n\n# Get current time in ISO 8601 format\ncurrent_time = datetime.now(timezone.utc).isoformat().replace(\"+00:00\", \"Z\")\n\n# Create a UDM event for a network connection\nnetwork_event = {\n    \"metadata\": {\n        \"id\": event_id,\n        \"event_timestamp\": current_time,\n        \"event_type\": \"NETWORK_CONNECTION\",\n        \"product_name\": \"My Security Product\", \n        \"vendor_name\": \"My Company\"\n    },\n    \"principal\": {\n        \"hostname\": \"workstation-1\",\n        \"ip\": \"192.168.1.100\",\n        \"port\": 12345\n    },\n    \"target\": {\n        \"ip\": \"203.0.113.10\",\n        \"port\": 443\n    },\n    \"network\": {\n        \"application_protocol\": \"HTTPS\",\n        \"direction\": \"OUTBOUND\"\n    }\n}\n\n# Ingest a single UDM event\nresult = chronicle.ingest_udm(udm_events=network_event)\nprint(f\"Ingested event with ID: {event_id}\")\n\n# Create a second event\nprocess_event = {\n    \"metadata\": {\n        # No ID - one will be auto-generated\n        \"event_timestamp\": current_time,\n        \"event_type\": \"PROCESS_LAUNCH\",\n        \"product_name\": \"My Security Product\", \n        \"vendor_name\": \"My Company\"\n    },\n    \"principal\": {\n        \"hostname\": \"workstation-1\",\n        \"process\": {\n            \"command_line\": \"ping 8.8.8.8\",\n            \"pid\": 1234\n        },\n        \"user\": {\n            \"userid\": \"user123\"\n        }\n    }\n}\n\n# Ingest multiple UDM events in a single call\nresult = chronicle.ingest_udm(udm_events=[network_event, process_event])\nprint(\"Multiple events ingested successfully\")\n```\n\n### Data Export\n\n> **Note**: The Data Export API features are currently under test and review. We welcome your feedback and encourage you to submit any issues or unexpected behavior to the issue tracker so we can improve this functionality.\n\nYou can export Chronicle logs to Google Cloud Storage using the Data Export API:\n\n```python\nfrom datetime import datetime, timedelta, timezone\n\n# Set time range for export\nend_time = datetime.now(timezone.utc)\nstart_time = end_time - timedelta(days=1)  # Last 24 hours\n\n# Get available log types for export\navailable_log_types = chronicle.fetch_available_log_types(\n    start_time=start_time,\n    end_time=end_time\n)\n\n# Print available log types\nfor log_type in available_log_types[\"available_log_types\"]:\n    print(f\"{log_type.display_name} ({log_type.log_type.split('/')[-1]})\")\n    print(f\"  Available from {log_type.start_time} to {log_type.end_time}\")\n\n# Create a data export for a specific log type\nexport = chronicle.create_data_export(\n    gcs_bucket=\"projects/my-project/buckets/my-export-bucket\",\n    start_time=start_time,\n    end_time=end_time,\n    log_type=\"GCP_DNS\"  # Specify log type to export\n)\n\n# Get the export ID\nexport_id = export[\"name\"].split(\"/\")[-1]\nprint(f\"Created export with ID: {export_id}\")\nprint(f\"Status: {export['data_export_status']['stage']}\")\n\n# Check export status\nstatus = chronicle.get_data_export(export_id)\nprint(f\"Export status: {status['data_export_status']['stage']}\")\nprint(f\"Progress: {status['data_export_status'].get('progress_percentage', 0)}%\")\n\n# Cancel an export if needed\nif status['data_export_status']['stage'] in ['IN_QUEUE', 'PROCESSING']:\n    cancelled = chronicle.cancel_data_export(export_id)\n    print(f\"Export has been cancelled. New status: {cancelled['data_export_status']['stage']}\")\n\n# Export all log types at once\nexport_all = chronicle.create_data_export(\n    gcs_bucket=\"projects/my-project/buckets/my-export-bucket\",\n    start_time=start_time,\n    end_time=end_time,\n    export_all_logs=True\n)\n\nprint(f\"Created export for all logs. Status: {export_all['data_export_status']['stage']}\")\n```\n\nThe Data Export API supports:\n- Exporting one or all log types to Google Cloud Storage\n- Checking export status and progress\n- Cancelling exports in progress\n- Fetching available log types for a specific time range\n\nIf you encounter any issues with the Data Export functionality, please submit them to our issue tracker with detailed information about the problem and steps to reproduce.\n\n### Basic UDM Search\n\nSearch for network connection events:\n\n```python\nfrom datetime import datetime, timedelta, timezone\n\n# Set time range for queries\nend_time = datetime.now(timezone.utc)\nstart_time = end_time - timedelta(hours=24)  # Last 24 hours\n\n# Perform UDM search\nresults = chronicle.search_udm(\n    query=\"\"\"\n    metadata.event_type = \"NETWORK_CONNECTION\"\n    ip != \"\"\n    \"\"\",\n    start_time=start_time,\n    end_time=end_time,\n    max_events=5\n)\n\n# Example response:\n{\n    \"events\": [\n        {\n            \"name\": \"projects/my-project/locations/us/instances/my-instance/events/encoded-event-id\",\n            \"udm\": {\n                \"metadata\": {\n                    \"eventTimestamp\": \"2024-02-09T10:30:00Z\",\n                    \"eventType\": \"NETWORK_CONNECTION\"\n                },\n                \"target\": {\n                    \"ip\": [\"192.168.1.100\"],\n                    \"port\": 443\n                },\n                \"principal\": {\n                    \"hostname\": \"workstation-1\"\n                }\n            }\n        }\n    ],\n    \"total_events\": 1,\n    \"more_data_available\": false\n}\n```\n\n### Statistics Queries\n\nGet statistics about network connections grouped by hostname:\n\n```python\nstats = chronicle.get_stats(\n    query=\"\"\"metadata.event_type = \"NETWORK_CONNECTION\"\nmatch:\n    target.hostname\noutcome:\n    $count = count(metadata.id)\norder:\n    $count desc\"\"\",\n    start_time=start_time,\n    end_time=end_time,\n    max_events=1000,\n    max_values=10,\n    timeout=180\n)\n\n# Example response:\n{\n    \"columns\": [\"hostname\", \"count\"],\n    \"rows\": [\n        {\"hostname\": \"server-1\", \"count\": 1500},\n        {\"hostname\": \"server-2\", \"count\": 1200}\n    ],\n    \"total_rows\": 2\n}\n```\n\n### CSV Export\n\nExport specific fields to CSV format:\n\n```python\ncsv_data = chronicle.fetch_udm_search_csv(\n    query='metadata.event_type = \"NETWORK_CONNECTION\"',\n    start_time=start_time,\n    end_time=end_time,\n    fields=[\"timestamp\", \"user\", \"hostname\", \"process name\"]\n)\n\n# Example response:\n\"\"\"\nmetadata.eventTimestamp,principal.hostname,target.ip,target.port\n2024-02-09T10:30:00Z,workstation-1,192.168.1.100,443\n2024-02-09T10:31:00Z,workstation-2,192.168.1.101,80\n\"\"\"\n```\n\n### Query Validation\n\nValidate a UDM query before execution:\n\n```python\nquery = 'target.ip != \"\" and principal.hostname = \"test-host\"'\nvalidation = chronicle.validate_query(query)\n\n# Example response:\n{\n    \"isValid\": true,\n    \"queryType\": \"QUERY_TYPE_UDM_QUERY\",\n    \"suggestedFields\": [\n        \"target.ip\",\n        \"principal.hostname\"\n    ]\n}\n```\n\n### Natural Language Search\n\nSearch for events using natural language instead of UDM query syntax:\n\n```python\nfrom datetime import datetime, timedelta, timezone\n\n# Set time range for queries\nend_time = datetime.now(timezone.utc)\nstart_time = end_time - timedelta(hours=24)  # Last 24 hours\n\n# Option 1: Translate natural language to UDM query\nudm_query = chronicle.translate_nl_to_udm(\"show me network connections\")\nprint(f\"Translated query: {udm_query}\")\n# Example output: 'metadata.event_type=\"NETWORK_CONNECTION\"'\n\n# Then run the query manually if needed\nresults = chronicle.search_udm(\n    query=udm_query,\n    start_time=start_time,\n    end_time=end_time\n)\n\n# Option 2: Perform complete search with natural language\nresults = chronicle.nl_search(\n    text=\"show me failed login attempts\",\n    start_time=start_time,\n    end_time=end_time,\n    max_events=100\n)\n\n# Example response (same format as search_udm):\n{\n    \"events\": [\n        {\n            \"event\": {\n                \"metadata\": {\n                    \"eventTimestamp\": \"2024-02-09T10:30:00Z\",\n                    \"eventType\": \"USER_LOGIN\"\n                },\n                \"principal\": {\n                    \"user\": {\n                        \"userid\": \"jdoe\"\n                    }\n                },\n                \"securityResult\": {\n                    \"action\": \"BLOCK\",\n                    \"summary\": \"Failed login attempt\"\n                }\n            }\n        }\n    ],\n    \"total_events\": 1\n}\n```\n\nThe natural language search feature supports various query patterns:\n- \"Show me network connections\"\n- \"Find suspicious processes\"\n- \"Show login failures in the last hour\"\n- \"Display connections to IP address 192.168.1.100\"\n\nIf the natural language cannot be translated to a valid UDM query, an `APIError` will be raised with a message indicating that no valid query could be generated.\n\n### Entity Summary\n\nGet detailed information about specific entities like IP addresses, domains, or file hashes. The function automatically detects the entity type based on the provided value and fetches a comprehensive summary including related entities, alerts, timeline, prevalence, and more.\n\n```python\n# IP address summary\nip_summary = chronicle.summarize_entity(\n    value=\"8.8.8.8\",\n    start_time=start_time,\n    end_time=end_time\n)\n\n# Domain summary\ndomain_summary = chronicle.summarize_entity(\n    value=\"google.com\",\n    start_time=start_time,\n    end_time=end_time\n)\n\n# File hash summary (SHA256)\nfile_hash = \"e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855\" \nfile_summary = chronicle.summarize_entity(\n    value=file_hash,\n    start_time=start_time,\n    end_time=end_time\n)\n\n# Optionally hint the preferred type if auto-detection might be ambiguous\nuser_summary = chronicle.summarize_entity(\n    value=\"jdoe\",\n    start_time=start_time,\n    end_time=end_time,\n    preferred_entity_type=\"USER\"\n)\n\n\n# Example response structure (EntitySummary object):\n# Access attributes like: ip_summary.primary_entity, ip_summary.related_entities,\n# ip_summary.alert_counts, ip_summary.timeline, ip_summary.prevalence, etc.\n\n# Example fields within the EntitySummary object:\n# primary_entity: {\n#     \"name\": \"entities/...\",\n#     \"metadata\": {\n#         \"entityType\": \"ASSET\",  # Or FILE, DOMAIN_NAME, USER, etc.\n#         \"interval\": { \"startTime\": \"...\", \"endTime\": \"...\" }\n#     },\n#     \"metric\": { \"firstSeen\": \"...\", \"lastSeen\": \"...\" },\n#     \"entity\": {  # Contains specific details like 'asset', 'file', 'domain'\n#         \"asset\": { \"ip\": [\"8.8.8.8\"] }\n#     }\n# }\n# related_entities: [ { ... similar to primary_entity ... } ]\n# alert_counts: [ { \"rule\": \"Rule Name\", \"count\": 5 } ]\n# timeline: { \"buckets\": [ { \"alertCount\": 1, \"eventCount\": 10 } ], \"bucketSize\": \"3600s\" }\n# prevalence: [ { \"prevalenceTime\": \"...\", \"count\": 100 } ]\n# file_metadata_and_properties: {  # Only for FILE entities\n#     \"metadata\": [ { \"key\": \"...\", \"value\": \"...\" } ],\n#     \"properties\": [ { \"title\": \"...\", \"properties\": [ { \"key\": \"...\", \"value\": \"...\" } ] } ]\n# }\n```\n\n### List IoCs (Indicators of Compromise)\n\nRetrieve IoC matches against ingested events:\n\n```python\niocs = chronicle.list_iocs(\n    start_time=start_time,\n    end_time=end_time,\n    max_matches=1000,\n    add_mandiant_attributes=True,\n    prioritized_only=False\n)\n\n# Process the results\nfor ioc in iocs['matches']:\n    ioc_type = next(iter(ioc['artifactIndicator'].keys()))\n    ioc_value = next(iter(ioc['artifactIndicator'].values()))\n    print(f\"IoC Type: {ioc_type}, Value: {ioc_value}\")\n    print(f\"Sources: {', '.join(ioc['sources'])}\")\n```\n\nThe IoC response includes:\n- The indicator itself (domain, IP, hash, etc.)\n- Sources and categories\n- Affected assets in your environment\n- First and last seen timestamps\n- Confidence scores and severity ratings\n- Associated threat actors and malware families (with Mandiant attributes)\n\n### Alerts and Case Management\n\nRetrieve alerts and their associated cases:\n\n```python\n# Get non-closed alerts\nalerts = chronicle.get_alerts(\n    start_time=start_time,\n    end_time=end_time,\n    snapshot_query='feedback_summary.status != \"CLOSED\"',\n    max_alerts=100\n)\n\n# Get alerts from the response\nalert_list = alerts.get('alerts', {}).get('alerts', [])\n\n# Extract case IDs from alerts\ncase_ids = {alert.get('caseName') for alert in alert_list if alert.get('caseName')}\n\n# Get case details using the batch API\nif case_ids:\n    cases = chronicle.get_cases(list(case_ids))\n    \n    # Process cases\n    for case in cases.cases:\n        print(f\"Case: {case.display_name}\")\n        print(f\"Priority: {case.priority}\")\n        print(f\"Status: {case.status}\")\n        print(f\"Stage: {case.stage}\")\n        \n        # Access SOAR platform information if available\n        if case.soar_platform_info:\n            print(f\"SOAR Case ID: {case.soar_platform_info.case_id}\")\n            print(f\"SOAR Platform: {case.soar_platform_info.platform_type}\")\n```\n\nThe alerts response includes:\n- Progress status and completion status\n- Alert counts (baseline and filtered)\n- Alert details (rule information, detection details, etc.)\n- Case associations\n\nYou can filter alerts using the snapshot query parameter with fields like:\n- `detection.rule_name`\n- `detection.alert_state`\n- `feedback_summary.verdict`\n- `feedback_summary.priority`\n- `feedback_summary.status`\n\n### Case Management Helpers\n\nThe `CaseList` class provides helper methods for working with cases:\n\n```python\n# Get details for specific cases (uses the batch API)\ncases = chronicle.get_cases([\"case-id-1\", \"case-id-2\"])\n\n# Filter cases by priority\nhigh_priority = cases.filter_by_priority(\"PRIORITY_HIGH\")\n\n# Filter cases by status\nopen_cases = cases.filter_by_status(\"STATUS_OPEN\")\n\n# Look up a specific case\ncase = cases.get_case(\"case-id-1\")\n```\n\n> **Note**: The case management API uses the `legacy:legacyBatchGetCases` endpoint to retrieve multiple cases in a single request. You can retrieve up to 1000 cases in a single batch.\n\n## Parser Management\n\nChronicle parsers are used to process and normalize raw log data into Chronicle's Unified Data Model (UDM) format. Parsers transform various log formats (JSON, XML, CEF, etc.) into a standardized structure that enables consistent querying and analysis across different data sources.\n\nThe SDK provides comprehensive support for managing Chronicle parsers:\n\n### Creating Parsers\n\nCreate new parser:\n\n```python\nparser_text = \"\"\"\nfilter {\n    mutate {\n      replace => {\n        \"event1.idm.read_only_udm.metadata.event_type\" => \"GENERIC_EVENT\"\n        \"event1.idm.read_only_udm.metadata.vendor_name\" =>  \"ACME Labs\"\n      }\n    }\n    grok {\n      match => {\n        \"message\" => [\"^(?P<_firstWord>[^\\s]+)\\s.*$\"]\n      }\n      on_error => \"_grok_message_failed\"\n    }\n    if ![_grok_message_failed] {\n      mutate {\n        replace => {\n          \"event1.idm.read_only_udm.metadata.description\" => \"%{_firstWord}\"\n        }\n      }\n    }\n    mutate {\n      merge => {\n        \"@output\" => \"event1\"\n      }\n    }\n}\n\"\"\"\n\nlog_type = \"WINDOWS_AD\"\n\n# Create the parser\nparser = chronicle.create_parser(\n    log_type=log_type, \n    parser_code=parser_text,\n    validated_on_empty_logs=True  # Whether to validate parser on empty logs\n)\nparser_id = parser.get(\"name\", \"\").split(\"/\")[-1]\nprint(f\"Parser ID: {parser_id}\")\n```\n\n### Managing Parsers\n\nRetrieve, list, copy, activate/deactivate, and delete parsers:\n\n```python\n# List all parsers\nparsers = chronicle.list_parsers()\nfor parser in parsers:\n    parser_id = parser.get(\"name\", \"\").split(\"/\")[-1]\n    state = parser.get(\"state\")\n    print(f\"Parser ID: {parser_id}, State: {state}\")\n\nlog_type = \"WINDOWS_AD\"\n    \n# Get specific parser\nparser = chronicle.get_parser(log_type=log_type, id=parser_id)\nprint(f\"Parser content: {parser.get('text')}\")\n\n# Activate/Deactivate parser\nchronicle.activate_parser(log_type=log_type, id=parser_id)\nchronicle.deactivate_parser(log_type=log_type, id=parser_id)\n\n# Copy an existing parser as a starting point\ncopied_parser = chronicle.copy_parser(log_type=log_type, id=\"pa_existing_parser\")\n\n# Delete parser\nchronicle.delete_parser(log_type=log_type, id=parser_id)\n\n# Force delete an active parser\nchronicle.delete_parser(log_type=log_type, id=parser_id, force=True)\n\n# Activate a release candidate parser\nchronicle.activate_release_candidate_parser(log_type=log_type, id=\"pa_release_candidate\")\n```\n\n> **Note:** Parsers work in conjunction with log ingestion. When you ingest logs using `chronicle.ingest_log()`, Chronicle automatically applies the appropriate parser based on the log type to transform your raw logs into UDM format. If you're working with custom log formats, you may need to create or configure custom parsers first.\n\n### Run Parser against sample logs\n\nRun the parser on one or more sample logs:\n\n```python\n# Sample parser code that extracts fields from logs\nparser_text = \"\"\"\nfilter {\n    mutate {\n      replace => {\n        \"event1.idm.read_only_udm.metadata.event_type\" => \"GENERIC_EVENT\"\n        \"event1.idm.read_only_udm.metadata.vendor_name\" =>  \"ACME Labs\"\n      }\n    }\n    grok {\n      match => {\n        \"message\" => [\"^(?P<_firstWord>[^\\s]+)\\s.*$\"]\n      }\n      on_error => \"_grok_message_failed\"\n    }\n    if ![_grok_message_failed] {\n      mutate {\n        replace => {\n          \"event1.idm.read_only_udm.metadata.description\" => \"%{_firstWord}\"\n        }\n      }\n    }\n    mutate {\n      merge => {\n        \"@output\" => \"event1\"\n      }\n    }\n}\n\"\"\"\n\nlog_type = \"WINDOWS_AD\"\n\n# Sample log entries to test\nsample_logs = [\n    '{\"message\": \"ERROR: Failed authentication attempt\"}',\n    '{\"message\": \"WARNING: Suspicious activity detected\"}',\n    '{\"message\": \"INFO: User logged in successfully\"}'\n]\n\n# Run parser evaluation\nresult = chronicle.run_parser(\n    log_type=log_type, \n    parser_code=parser_text,\n    parser_extension_code=None,  # Optional parser extension\n    logs=sample_logs,\n    statedump_allowed=False  # Enable if using statedump filters\n)\n\n# Check the results\nif \"runParserResults\" in result:\n    for i, parser_result in enumerate(result[\"runParserResults\"]):\n        print(f\"\\nLog {i+1} parsing result:\")\n        if \"parsedEvents\" in parser_result:\n            print(f\"  Parsed events: {parser_result['parsedEvents']}\")\n        if \"errors\" in parser_result:\n            print(f\"  Errors: {parser_result['errors']}\")\n```\n\nThe `run_parser` function includes comprehensive validation:\n- Validates log type and parser code are provided\n- Ensures logs are provided as a list of strings\n- Enforces size limits (10MB per log, 50MB total, max 1000 logs)\n- Provides detailed error messages for different failure scenarios\n\n### Complete Parser Workflow Example\n\nHere's a complete example that demonstrates retrieving a parser, running it against a log, and ingesting the parsed UDM event:\n\n```python\n# Step 1: List and retrieve an OKTA parser\nparsers = chronicle.list_parsers(log_type=\"OKTA\")\nparser_id = parsers[0][\"name\"].split(\"/\")[-1]\nparser_details = chronicle.get_parser(log_type=\"OKTA\", id=parser_id)\n\n# Extract and decode parser code\nimport base64\nparser_code = base64.b64decode(parser_details[\"cbn\"]).decode('utf-8')\n\n# Step 2: Run the parser against a sample log\nokta_log = {\n    \"actor\": {\"alternateId\": \"user@example.com\", \"displayName\": \"Test User\"},\n    \"eventType\": \"user.account.lock\",\n    \"outcome\": {\"result\": \"FAILURE\", \"reason\": \"LOCKED_OUT\"},\n    \"published\": \"2025-06-19T21:51:50.116Z\"\n    # ... other OKTA log fields\n}\n\nresult = chronicle.run_parser(\n    log_type=\"OKTA\",\n    parser_code=parser_code,\n    parser_extension_code=None,\n    logs=[json.dumps(okta_log)]\n)\n\n# Step 3: Extract and ingest the parsed UDM event\nif result[\"runParserResults\"][0][\"parsedEvents\"]:\n    # parsedEvents is a dict with 'events' key containing the actual events list\n    parsed_events_data = result[\"runParserResults\"][0][\"parsedEvents\"]\n    if isinstance(parsed_events_data, dict) and \"events\" in parsed_events_data:\n        events = parsed_events_data[\"events\"]\n        if events and len(events) > 0:\n            # Extract the first event\n            if \"event\" in events[0]:\n                udm_event = events[0][\"event\"]\n            else:\n                udm_event = events[0]\n            \n            # Ingest the parsed UDM event back into Chronicle\n            ingest_result = chronicle.ingest_udm(udm_events=udm_event)\n            print(f\"UDM event ingested: {ingest_result}\")\n```\n\nThis workflow is useful for:\n- Testing parsers before deployment\n- Understanding how logs are transformed to UDM format\n- Re-processing logs with updated parsers\n- Debugging parsing issues\n\n## Rule Management\n\nThe SDK provides comprehensive support for managing Chronicle detection rules:\n\n### Creating Rules\n\nCreate new detection rules using YARA-L 2.0 syntax:\n\n```python\nrule_text = \"\"\"\nrule simple_network_rule {\n    meta:\n        description = \"Example rule to detect network connections\"\n        author = \"SecOps SDK Example\"\n        severity = \"Medium\"\n        priority = \"Medium\"\n        yara_version = \"YL2.0\"\n        rule_version = \"1.0\"\n    events:\n        $e.metadata.event_type = \"NETWORK_CONNECTION\"\n        $e.principal.hostname != \"\"\n    condition:\n        $e\n}\n\"\"\"\n\n# Create the rule\nrule = chronicle.create_rule(rule_text)\nrule_id = rule.get(\"name\", \"\").split(\"/\")[-1]\nprint(f\"Rule ID: {rule_id}\")\n```\n\n### Managing Rules\n\nRetrieve, list, update, enable/disable, and delete rules:\n\n```python\n# List all rules\nrules = chronicle.list_rules()\nfor rule in rules.get(\"rules\", []):\n    rule_id = rule.get(\"name\", \"\").split(\"/\")[-1]\n    enabled = rule.get(\"deployment\", {}).get(\"enabled\", False)\n    print(f\"Rule ID: {rule_id}, Enabled: {enabled}\")\n\n# List paginated rules and `REVISION_METADATA_ONLY` view\nrules = chronicle.list_rules(view=\"REVISION_METADATA_ONLY\",page_size=50)\nprint(f\"Fetched {len(rules.get(\"rules\"))} rules\")\n\n# Get specific rule\nrule = chronicle.get_rule(rule_id)\nprint(f\"Rule content: {rule.get('text')}\")\n\n# Update rule\nupdated_rule = chronicle.update_rule(rule_id, updated_rule_text)\n\n# Enable/disable rule\ndeployment = chronicle.enable_rule(rule_id, enabled=True)  # Enable\ndeployment = chronicle.enable_rule(rule_id, enabled=False) # Disable\n\n# Delete rule\nchronicle.delete_rule(rule_id)\n```\n\n### Searching Rules\n\nSearch for rules using regular expressions:\n\n```python\n# Search for rules containing specific patterns\nresults = chronicle.search_rules(\"suspicious process\")\nfor rule in results.get(\"rules\", []):\n    rule_id = rule.get(\"name\", \"\").split(\"/\")[-1]\n    print(f\"Rule ID: {rule_id}, contains: 'suspicious process'\")\n    \n# Find rules mentioning a specific MITRE technique\nmitre_rules = chronicle.search_rules(\"T1055\")\nprint(f\"Found {len(mitre_rules.get('rules', []))} rules mentioning T1055 technique\")\n```\n\n### Testing Rules\n\nTest rules against historical data to validate their effectiveness before deployment:\n\n```python\nfrom datetime import datetime, timedelta, timezone\n\n# Define time range for testing\nend_time = datetime.now(timezone.utc)\nstart_time = end_time - timedelta(days=7)  # Test against last 7 days\n\n# Rule to test\nrule_text = \"\"\"\nrule test_rule {\n    meta:\n        description = \"Test rule for validation\"\n        author = \"Test Author\"\n        severity = \"Low\"\n        yara_version = \"YL2.0\"\n        rule_version = \"1.0\"\n    events:\n        $e.metadata.event_type = \"NETWORK_CONNECTION\"\n    condition:\n        $e\n}\n\"\"\"\n\n# Test the rule\ntest_results = chronicle.run_rule_test(\n    rule_text=rule_text,\n    start_time=start_time,\n    end_time=end_time,\n    max_results=100\n)\n\n# Process streaming results\ndetection_count = 0\nfor result in test_results:\n    result_type = result.get(\"type\")\n    \n    if result_type == \"progress\":\n        # Progress update\n        percent_done = result.get(\"percentDone\", 0)\n        print(f\"Progress: {percent_done}%\")\n    \n    elif result_type == \"detection\":\n        # Detection result\n        detection_count += 1\n        detection = result.get(\"detection\", {})\n        print(f\"Detection {detection_count}:\")\n        \n        # Process detection details\n        if \"rule_id\" in detection:\n            print(f\"  Rule ID: {detection['rule_id']}\")\n        if \"data\" in detection:\n            print(f\"  Data: {detection['data']}\")\n            \n    elif result_type == \"error\":\n        # Error information\n        print(f\"Error: {result.get('message', 'Unknown error')}\")\n\nprint(f\"Finished testing. Found {detection_count} detection(s).\")\n```\n\n# Extract just the UDM events for programmatic processing\n```python\nudm_events = []\nfor result in chronicle.run_rule_test(rule_text, start_time, end_time, max_results=100):\n    if result.get(\"type\") == \"detection\":\n        detection = result.get(\"detection\", {})\n        result_events = detection.get(\"resultEvents\", {})\n        \n        for var_name, var_data in result_events.items():\n            event_samples = var_data.get(\"eventSamples\", [])\n            for sample in event_samples:\n                event = sample.get(\"event\")\n                if event:\n                    udm_events.append(event)\n\n# Process the UDM events\nfor event in udm_events:\n    # Process each UDM event\n    metadata = event.get(\"metadata\", {})\n    print(f\"Event type: {metadata.get('eventType')}\")\n```\n\n### Retrohunts\n\nRun rules against historical data to find past matches:\n\n```python\nfrom datetime import datetime, timedelta, timezone\n\n# Set time range for retrohunt\nend_time = datetime.now(timezone.utc)\nstart_time = end_time - timedelta(days=7)  # Search past 7 days\n\n# Create retrohunt\nretrohunt = chronicle.create_retrohunt(rule_id, start_time, end_time)\noperation_id = retrohunt.get(\"name\", \"\").split(\"/\")[-1]\n\n# Check retrohunt status\nretrohunt_status = chronicle.get_retrohunt(rule_id, operation_id)\nis_complete = retrohunt_status.get(\"metadata\", {}).get(\"done\", False)\n```\n\n### Detections and Errors\n\nMonitor rule detections and execution errors:\n\n```python\n# List detections for a rule\ndetections = chronicle.list_detections(rule_id)\nfor detection in detections.get(\"detections\", []):\n    detection_id = detection.get(\"id\", \"\")\n    event_time = detection.get(\"eventTime\", \"\")\n    alerting = detection.get(\"alertState\", \"\") == \"ALERTING\"\n    print(f\"Detection: {detection_id}, Time: {event_time}, Alerting: {alerting}\")\n\n# List execution errors for a rule\nerrors = chronicle.list_errors(rule_id)\nfor error in errors.get(\"ruleExecutionErrors\", []):\n    error_message = error.get(\"error_message\", \"\")\n    create_time = error.get(\"create_time\", \"\")\n    print(f\"Error: {error_message}, Time: {create_time}\")\n```\n\n### Rule Alerts\n\nSearch for alerts generated by rules:\n\n```python\n# Set time range for alert search\nend_time = datetime.now(timezone.utc)\nstart_time = end_time - timedelta(days=7)  # Search past 7 days\n\n# Search for rule alerts\nalerts_response = chronicle.search_rule_alerts(\n    start_time=start_time,\n    end_time=end_time,\n    page_size=10\n)\n\n# The API returns a nested structure where alerts are grouped by rule\n# Extract and process all alerts from this structure\nall_alerts = []\ntoo_many_alerts = alerts_response.get('tooManyAlerts', False)\n\n# Process the nested response structure - alerts are grouped by rule\nfor rule_alert in alerts_response.get('ruleAlerts', []):\n    # Extract rule metadata\n    rule_metadata = rule_alert.get('ruleMetadata', {})\n    rule_id = rule_metadata.get('properties', {}).get('ruleId', 'Unknown')\n    rule_name = rule_metadata.get('properties', {}).get('name', 'Unknown')\n    \n    # Get alerts for this rule\n    rule_alerts = rule_alert.get('alerts', [])\n    \n    # Process each alert\n    for alert in rule_alerts:\n        # Extract important fields\n        alert_id = alert.get(\"id\", \"\")\n        detection_time = alert.get(\"detectionTimestamp\", \"\")\n        commit_time = alert.get(\"commitTimestamp\", \"\")\n        alerting_type = alert.get(\"alertingType\", \"\")\n        \n        print(f\"Alert ID: {alert_id}\")\n        print(f\"Rule ID: {rule_id}\")\n        print(f\"Rule Name: {rule_name}\")\n        print(f\"Detection Time: {detection_time}\")\n        \n        # Extract events from the alert\n        if 'resultEvents' in alert:\n            for var_name, event_data in alert.get('resultEvents', {}).items():\n                if 'eventSamples' in event_data:\n                    for sample in event_data.get('eventSamples', []):\n                        if 'event' in sample:\n                            event = sample['event']\n                            # Process event data\n                            event_type = event.get('metadata', {}).get('eventType', 'Unknown')\n                            print(f\"Event Type: {event_type}\")\n```\n\nIf `tooManyAlerts` is True in the response, consider narrowing your search criteria using a smaller time window or more specific filters.\n\n### Rule Sets\n\nManage curated rule sets:\n\n```python\n# Define deployments for rule sets\ndeployments = [\n    {\n        \"category_id\": \"category-uuid\",\n        \"rule_set_id\": \"ruleset-uuid\",\n        \"precision\": \"broad\",\n        \"enabled\": True,\n        \"alerting\": False\n    }\n]\n\n# Update rule set deployments\nchronicle.batch_update_curated_rule_set_deployments(deployments)\n```\n\n### Rule Validation\n\nValidate a YARA-L2 rule before creating or updating it:\n\n```python\n# Example rule\nrule_text = \"\"\"\nrule test_rule {\n    meta:\n        description = \"Test rule for validation\"\n        author = \"Test Author\"\n        severity = \"Low\"\n        yara_version = \"YL2.0\"\n        rule_version = \"1.0\"\n    events:\n        $e.metadata.event_type = \"NETWORK_CONNECTION\"\n    condition:\n        $e\n}\n\"\"\"\n\n# Validate the rule\nresult = chronicle.validate_rule(rule_text)\n\nif result.success:\n    print(\"Rule is valid\")\nelse:\n    print(f\"Rule is invalid: {result.message}\")\n    if result.position:\n        print(f\"Error at line {result.position['startLine']}, column {result.position['startColumn']}\")\n```\n\n## Data Tables and Reference Lists\n\nChronicle provides two ways to manage and reference structured data in detection rules: Data Tables and Reference Lists. These can be used to maintain lists of trusted/suspicious entities, mappings of contextual information, or any other structured data useful for detection.\n\n### Data Tables\n\nData Tables are collections of structured data with defined columns and data types. They can be referenced in detection rules to enhance your detections with additional context.\n\n#### Creating Data Tables\n\n```python\nfrom secops.chronicle.data_table import DataTableColumnType\n\n# Create a data table with different column types\ndata_table = chronicle.create_data_table(\n    name=\"suspicious_ips\",\n    description=\"Known suspicious IP addresses with context\",\n    header={\n        \"ip_address\": DataTableColumnType.CIDR,\n        \"severity\": DataTableColumnType.STRING,\n        \"description\": DataTableColumnType.STRING\n    },\n    # Optional: Add initial rows\n    rows=[\n        [\"192.168.1.100\", \"High\", \"Scanning activity\"],\n        [\"10.0.0.5\", \"Medium\", \"Suspicious login attempts\"]\n    ]\n)\n\nprint(f\"Created table: {data_table['name']}\")\n```\n\n#### Managing Data Tables\n\n```python\n# List all data tables\ntables = chronicle.list_data_tables()\nfor table in tables:\n    table_id = table[\"name\"].split(\"/\")[-1]\n    print(f\"Table: {table_id}, Created: {table.get('createTime')}\")\n\n# Get a specific data table's details\ntable_details = chronicle.get_data_table(\"suspicious_ips\")\nprint(f\"Column count: {len(table_details.get('columnInfo', []))}\")\n\n# Add rows to a data table\nchronicle.create_data_table_rows(\n    \"suspicious_ips\",\n    [\n        [\"172.16.0.1\", \"Low\", \"Unusual outbound connection\"],\n        [\"192.168.2.200\", \"Critical\", \"Data exfiltration attempt\"]\n    ]\n)\n\n# List rows in a data table\nrows = chronicle.list_data_table_rows(\"suspicious_ips\")\nfor row in rows:\n    row_id = row[\"name\"].split(\"/\")[-1]\n    values = row.get(\"values\", [])\n    print(f\"Row {row_id}: {values}\")\n\n# Delete specific rows by ID\nrow_ids = [rows[0][\"name\"].split(\"/\")[-1], rows[1][\"name\"].split(\"/\")[-1]]\nchronicle.delete_data_table_rows(\"suspicious_ips\", row_ids)\n\n# Delete a data table\nchronicle.delete_data_table(\"suspicious_ips\", force=True)  # force=True deletes even if it has rows\n```\n\n### Reference Lists\n\nReference Lists are simple lists of values (strings, CIDR blocks, or regex patterns) that can be referenced in detection rules. They are useful for maintaining whitelists, blacklists, or any other categorized sets of values.\n\n#### Creating Reference Lists\n\n```python\nfrom secops.chronicle.reference_list import ReferenceListSyntaxType, ReferenceListView\n\n# Create a reference list with string entries\nstring_list = chronicle.create_reference_list(\n    name=\"admin_accounts\",\n    description=\"Administrative user accounts\",\n    entries=[\"admin\", \"administrator\", \"root\", \"system\"],\n    syntax_type=ReferenceListSyntaxType.STRING\n)\n\nprint(f\"Created reference list: {string_list['name']}\")\n\n# Create a reference list with CIDR entries\ncidr_list = chronicle.create_reference_list(\n    name=\"trusted_networks\",\n    description=\"Internal network ranges\",\n    entries=[\"10.0.0.0/8\", \"192.168.0.0/16\", \"172.16.0.0/12\"],\n    syntax_type=ReferenceListSyntaxType.CIDR\n)\n\n# Create a reference list with regex patterns\nregex_list = chronicle.create_reference_list(\n    name=\"email_patterns\",\n    description=\"Email patterns to watch for\",\n    entries=[\".*@suspicious\\\\.com\", \"malicious_.*@.*\\\\.org\"],\n    syntax_type=ReferenceListSyntaxType.REGEX\n)\n```\n\n#### Managing Reference Lists\n\n```python\n# List all reference lists (basic view without entries)\nlists = chronicle.list_reference_lists(view=ReferenceListView.BASIC)\nfor ref_list in lists:\n    list_id = ref_list[\"name\"].split(\"/\")[-1]\n    print(f\"List: {list_id}, Description: {ref_list.get('description')}\")\n\n# Get a specific reference list including all entries\nadmin_list = chronicle.get_reference_list(\"admin_accounts\", view=ReferenceListView.FULL)\nentries = [entry.get(\"value\") for entry in admin_list.get(\"entries\", [])]\nprint(f\"Admin accounts: {entries}\")\n\n# Update reference list entries\nchronicle.update_reference_list(\n    name=\"admin_accounts\",\n    entries=[\"admin\", \"administrator\", \"root\", \"system\", \"superuser\"]\n)\n\n# Update reference list description\nchronicle.update_reference_list(\n    name=\"admin_accounts\",\n    description=\"Updated administrative user accounts list\"\n)\n\n```\n\n### Using in YARA-L Rules\n\nBoth Data Tables and Reference Lists can be referenced in YARA-L detection rules.\n\n#### Using Data Tables in Rules\n\n```\nrule detect_with_data_table {\n    meta:\n        description = \"Detect connections to suspicious IPs\"\n        author = \"SecOps SDK Example\"\n        severity = \"Medium\"\n        yara_version = \"YL2.0\"\n    events:\n        $e.metadata.event_type = \"NETWORK_CONNECTION\"\n        $e.target.ip != \"\"\n        $lookup in data_table.suspicious_ips\n        $lookup.ip_address = $e.target.ip\n        $severity = $lookup.severity\n        \n    condition:\n        $e and $lookup and $severity = \"High\"\n}\n```\n\n#### Using Reference Lists in Rules\n\n```\nrule detect_with_reference_list {\n    meta:\n        description = \"Detect admin account usage from untrusted networks\"\n        author = \"SecOps SDK Example\" \n        severity = \"High\"\n        yara_version = \"YL2.0\"\n    events:\n        $login.metadata.event_type = \"USER_LOGIN\"\n        $login.principal.user.userid in reference_list.admin_accounts\n        not $login.principal.ip in reference_list.trusted_networks\n        \n    condition:\n        $login\n}\n```\n\n## Gemini AI\n\nYou can use Chronicle's Gemini AI to get security insights, generate detection rules, explain security concepts, and more:\n\n> **Note:** Only enterprise tier users have access to Advanced Gemini features. Users must opt-in to use Gemini in Chronicle before accessing this functionality. \nThe SDK will automatically attempt to opt you in when you first use the Gemini functionality. If the automatic opt-in fails due to permission issues, \nyou'll see an error message that includes \"users must opt-in before using Gemini.\"\n\n```python\n# Query Gemini with a security question\nresponse = chronicle.gemini(\"What is Windows event ID 4625?\")\n\n# Get text content (combines TEXT blocks and stripped HTML content)\ntext_explanation = response.get_text_content()\nprint(\"Explanation:\", text_explanation)\n\n# Work with different content blocks\nfor block in response.blocks:\n    print(f\"Block type: {block.block_type}\")\n    if block.block_type == \"TEXT\":\n        print(\"Text content:\", block.content)\n    elif block.block_type == \"CODE\":\n        print(f\"Code ({block.title}):\", block.content)\n    elif block.block_type == \"HTML\":\n        print(\"HTML content (with tags):\", block.content)\n\n# Get all code blocks\ncode_blocks = response.get_code_blocks()\nfor code_block in code_blocks:\n    print(f\"Code block ({code_block.title}):\", code_block.content)\n\n# Get all HTML blocks (with HTML tags preserved)\nhtml_blocks = response.get_html_blocks()\nfor html_block in html_blocks:\n    print(f\"HTML block (with tags):\", html_block.content)\n\n# Check for references\nif response.references:\n    print(f\"Found {len(response.references)} references\")\n\n# Check for suggested actions\nfor action in response.suggested_actions:\n    print(f\"Suggested action: {action.display_text} ({action.action_type})\")\n    if action.navigation:\n        print(f\"Action URI: {action.navigation.target_uri}\")\n```\n\n### Response Content Methods\n\nThe `GeminiResponse` class provides several methods to work with response content:\n\n- `get_text_content()`: Returns a combined string of all TEXT blocks plus the text content from HTML blocks with HTML tags removed\n- `get_code_blocks()`: Returns a list of blocks with `block_type == \"CODE\"`\n- `get_html_blocks()`: Returns a list of blocks with `block_type == \"HTML\"` (HTML tags preserved)\n- `get_raw_response()`: Returns the complete, unprocessed API response as a dictionary\n\nThese methods help you work with different types of content in a structured way.\n\n### Accessing Raw API Response\n\nFor advanced use cases or debugging, you can access the raw API response:\n\n```python\n# Get the complete raw API response\nresponse = chronicle.gemini(\"What is Windows event ID 4625?\")\nraw_response = response.get_raw_response()\n\n# Now you can access any part of the original JSON structure\nprint(json.dumps(raw_response, indent=2))\n\n# Example of navigating the raw response structure\nif \"responses\" in raw_response:\n    for resp in raw_response[\"responses\"]:\n        if \"blocks\" in resp:\n            print(f\"Found {len(resp['blocks'])} blocks in raw response\")\n```\n\nThis gives you direct access to the original API response format, which can be useful for accessing advanced features or troubleshooting.\n\n### Manual Opt-In\n\nIf your account has sufficient permissions, you can manually opt-in to Gemini before using it:\n\n```python\n# Manually opt-in to Gemini\nopt_success = chronicle.opt_in_to_gemini()\nif opt_success:\n    print(\"Successfully opted in to Gemini\")\nelse:\n    print(\"Unable to opt-in due to permission issues\")\n\n# Then use Gemini as normal\nresponse = chronicle.gemini(\"What is Windows event ID 4625?\")\n```\n\nThis can be useful in environments where you want to explicitly control when the opt-in happens.\n\n### Generate Detection Rules\n\nChronicle Gemini can generate YARA-L rules for detection:\n\n```python\n# Generate a rule to detect potential security issues\nrule_response = chronicle.gemini(\"Write a rule to detect powershell downloading a file called gdp.zip\")\n\n# Extract the generated rule(s)\ncode_blocks = rule_response.get_code_blocks()\nif code_blocks:\n    rule = code_blocks[0].content\n    print(\"Generated rule:\", rule)\n    \n    # Check for rule editor action\n    for action in rule_response.suggested_actions:\n        if action.display_text == \"Open in Rule Editor\" and action.action_type == \"NAVIGATION\":\n            rule_editor_url = action.navigation.target_uri\n            print(\"Rule can be opened in editor:\", rule_editor_url)\n```\n\n### Get Intel Information\n\nGet detailed information about malware, threat actors, files, vulnerabilities:\n\n```python\n# Ask about a CVE\ncve_response = chronicle.gemini(\"tell me about CVE-2021-44228\")\n\n# Get the explanation\ncve_explanation = cve_response.get_text_content()\nprint(\"CVE explanation:\", cve_explanation)\n```\n\n### Maintain Conversation Context\n\nYou can maintain conversation context by reusing the same conversation ID:\n\n```python\n# Start a conversation\ninitial_response = chronicle.gemini(\"What is a DDoS attack?\")\n\n# Get the conversation ID from the response\nconversation_id = initial_response.name.split('/')[-3]  # Extract from format: .../conversations/{id}/messages/{id}\n\n# Ask a follow-up question in the same conversation context\nfollowup_response = chronicle.gemini(\n    \"What are the most common mitigation techniques?\",\n    conversation_id=conversation_id\n)\n\n# Gemini will remember the context of the previous question about DDoS\n```\n\n### Feed Management\n\nFeeds are used to ingest data into Chronicle. The SDK provides methods to manage feeds.\n\n```python\nimport json\n\n# List existing feeds\nfeeds = chronicle.list_feeds()\nprint(f\"Found {len(feeds)} feeds\")\n\n# Create a new feed\nfeed_details = {\n    \"logType\": f\"projects/your-project-id/locations/us/instances/your-chronicle-instance-id/logTypes/WINEVTLOG\",\n    \"feedSourceType\": \"HTTP\",\n    \"httpSettings\": {\n        \"uri\": \"https://example.com/example_feed\",\n        \"sourceType\": \"FILES\",\n    },\n    \"labels\": {\"environment\": \"production\", \"created_by\": \"secops_sdk\"}\n}\n\ncreated_feed = chronicle.create_feed(\n    display_name=\"My New Feed\",\n    details=feed_details\n)\n\n# Get feed ID from name\nfeed_id = created_feed[\"name\"].split(\"/\")[-1]\nprint(f\"Feed created with ID: {feed_id}\")\n\n# Get feed details\nfeed_details = chronicle.get_feed(feed_id)\nprint(f\"Feed state: {feed_details.get('state')}\")\n\n# Update feed\nupdated_details = {\n    \"httpSettings\": {\n        \"uri\": \"https://example.com/updated_feed_url\",\n        \"sourceType\": \"FILES\"\n    },\n    \"labels\": {\"environment\": \"production\", \"updated\": \"true\"}\n}\n\nupdated_feed = chronicle.update_feed(\n    feed_id=feed_id,\n    display_name=\"Updated Feed Name\",\n    details=updated_details\n)\n\n# Disable feed\ndisabled_feed = chronicle.disable_feed(feed_id)\nprint(f\"Feed disabled. State: {disabled_feed.get('state')}\")\n\n# Enable feed\nenabled_feed = chronicle.enable_feed(feed_id)\nprint(f\"Feed enabled. State: {enabled_feed.get('state')}\")\n\n# Generate secret for feed (for supported feed types)\ntry:\n    secret_result = chronicle.generate_secret(feed_id)\n    print(f\"Generated secret: {secret_result.get('secret')}\")\nexcept Exception as e:\n    print(f\"Error generating secret for feed: {e}\")\n\n# Delete feed\nchronicle.delete_feed(feed_id)\nprint(\"Feed deleted successfully\")\n```\n\nThe Feed API supports different feed types such as HTTP, HTTPS Push, and S3 bucket data sources etc. Each feed type has specific configuration options that can be specified in the `details` dictionary.\n\n> **Note**: Secret generation is only available for certain feed types that require authentication.\n\n## Error Handling\n\nThe SDK defines several custom exceptions:\n\n```python\nfrom secops.exceptions import SecOpsError, AuthenticationError, APIError\n\ntry:\n    results = chronicle.search_udm(...)\nexcept AuthenticationError as e:\n    print(f\"Authentication failed: {e}\")\nexcept APIError as e:\n    print(f\"API request failed: {e}\")\nexcept SecOpsError as e:\n    print(f\"General error: {e}\")\n```\n\n## Value Type Detection\n\nThe SDK automatically detects the most common entity types when using the `summarize_entity` function:\n- IP addresses (IPv4 and IPv6)\n- MD5/SHA1/SHA256 hashes\n- Domain names\n- Email addresses\n- MAC addresses\n- Hostnames\n\nThis detection happens internally within `summarize_entity`, simplifying its usage. You only need to provide the `value` argument.\n\n```python\n# The SDK automatically determines how to query for these values\nip_summary = chronicle.summarize_entity(value=\"192.168.1.100\", ...)\ndomain_summary = chronicle.summarize_entity(value=\"example.com\", ...)\nhash_summary = chronicle.summarize_entity(value=\"e17dd4eef8b4978673791ef4672f4f6a\", ...)\n```\n\nYou can optionally provide a `preferred_entity_type` hint to `summarize_entity` if the automatic detection might be ambiguous (e.g., a string could be a username or a hostname).\n\n## License\n\nThis project is licensed under the Apache License 2.0 - [see the LICENSE file for details.](https://github.com/google/secops-wrapper/blob/main/LICENSE)\n",
    "bugtrack_url": null,
    "license": null,
    "summary": "Python SDK for wrapping the Google SecOps API for common use cases",
    "version": "0.8.1",
    "project_urls": {
        "Documentation": "https://github.com/google/secops-wrapper#readme",
        "Homepage": "https://github.com/google/secops-wrapper",
        "Issues": "https://github.com/google/secops-wrapper/issues",
        "Repository": "https://github.com/google/secops-wrapper.git"
    },
    "split_keywords": [
        "chronicle",
        " google",
        " secops",
        " security"
    ],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "52d5264cb5399e755aad5c4c7b19581288f9b342d6a9f67d6287c9a5bc47232a",
                "md5": "a498937aed9bba6e0db4cecc6cce1f77",
                "sha256": "0c45f53a318ffae80deb26636c792b4dba4aa5ba61bec501f59c65fc635a636f"
            },
            "downloads": -1,
            "filename": "secops-0.8.1-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "a498937aed9bba6e0db4cecc6cce1f77",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.7",
            "size": 145350,
            "upload_time": "2025-07-24T08:52:39",
            "upload_time_iso_8601": "2025-07-24T08:52:39.626732Z",
            "url": "https://files.pythonhosted.org/packages/52/d5/264cb5399e755aad5c4c7b19581288f9b342d6a9f67d6287c9a5bc47232a/secops-0.8.1-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "713102a7179d8c52c7a6c0c9f346e2642101351162ed60adee0c099667da7a66",
                "md5": "77ecd662ac12c3f8693c12d4efc6f1db",
                "sha256": "ae54d0441df3d051adcceb071d10ff24cc0472946ca92a52f469a4080a47920d"
            },
            "downloads": -1,
            "filename": "secops-0.8.1.tar.gz",
            "has_sig": false,
            "md5_digest": "77ecd662ac12c3f8693c12d4efc6f1db",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.7",
            "size": 243554,
            "upload_time": "2025-07-24T08:52:41",
            "upload_time_iso_8601": "2025-07-24T08:52:41.939488Z",
            "url": "https://files.pythonhosted.org/packages/71/31/02a7179d8c52c7a6c0c9f346e2642101351162ed60adee0c099667da7a66/secops-0.8.1.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-07-24 08:52:41",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "google",
    "github_project": "secops-wrapper#readme",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "requirements": [
        {
            "name": "pytest",
            "specs": []
        },
        {
            "name": "pytest-cov",
            "specs": []
        },
        {
            "name": "build",
            "specs": []
        },
        {
            "name": "black",
            "specs": []
        },
        {
            "name": "packaging",
            "specs": []
        },
        {
            "name": "pathspec",
            "specs": []
        },
        {
            "name": "protobuf",
            "specs": []
        },
        {
            "name": "pylint",
            "specs": []
        },
        {
            "name": "twine",
            "specs": []
        },
        {
            "name": "python-dotenv",
            "specs": []
        }
    ],
    "tox": true,
    "lcname": "secops"
}
        
Elapsed time: 1.02070s