Source code for oneworldsync.utils
"""
Utility functions for the 1WorldSync API client
"""
import json
import logging
from datetime import datetime, timezone
from typing import Dict, List, Any, Optional, Union
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
[docs]
def format_timestamp(dt=None):
"""
Format a datetime object as a timestamp for the 1WorldSync API
Args:
dt (datetime, optional): Datetime object to format. Defaults to current UTC time.
Returns:
str: Formatted timestamp
"""
if dt is None:
dt = datetime.now(timezone.utc)
return dt.strftime('%Y-%m-%dT%H:%M:%SZ')
[docs]
def parse_timestamp(timestamp_str):
"""
Parse a timestamp string from the 1WorldSync API
Args:
timestamp_str (str): Timestamp string in ISO 8601 format
Returns:
datetime: Parsed datetime object
"""
return datetime.strptime(timestamp_str, '%Y-%m-%dT%H:%M:%SZ')
[docs]
def pretty_print_json(data):
"""
Pretty print JSON data
Args:
data (dict): JSON data to print
"""
print(json.dumps(data, indent=2))
[docs]
def extract_nested_value(data, path, default=None):
"""
Extract a value from a nested dictionary using a path
Args:
data (dict): Dictionary to extract from
path (list): List of keys to traverse
default: Value to return if path doesn't exist
Returns:
The value at the path or the default value
"""
current = data
try:
for key in path:
if isinstance(current, dict):
current = current.get(key)
elif isinstance(current, list) and isinstance(key, int) and 0 <= key < len(current):
current = current[key]
else:
return default
if current is None:
return default
return current
except (KeyError, IndexError, TypeError):
return default
[docs]
def get_nested_dict_value(data: Dict, path: str, default: Any = None) -> Any:
"""
Extract a value from a nested dictionary using a dot-separated path
Args:
data (dict): Dictionary to extract from
path (str): Dot-separated path (e.g., "item.tradeItemInformation.0.tradeItemDescriptionModule")
default: Value to return if path doesn't exist
Returns:
The value at the path or the default value
"""
if not data or not isinstance(data, dict):
return default
parts = path.split('.')
current = data
for part in parts:
# Handle array indices
if part.isdigit() and isinstance(current, list):
index = int(part)
if 0 <= index < len(current):
current = current[index]
else:
return default
elif isinstance(current, dict):
current = current.get(part)
else:
return default
if current is None:
return default
return current
[docs]
def extract_product_data(product_data: Dict) -> Dict:
"""
Extract relevant product data from a 1WorldSync product object based on the Swagger Search schema.
Args:
product_data (dict): A product data dictionary from the 1WorldSync Search REST API
Returns:
dict: A dictionary with structured product data
"""
# Initialize with default values
extracted_data = {
'gtin': '',
'brand_name': '',
'product_name': '',
'description': '',
'manufacturer': '',
'image_url': '',
'category': '',
'subcategory': '',
'gpc_code': '',
'ingredients': '',
'dimensions': {},
'country_of_origin': '',
'allergen_info': [],
'item_id': '',
'images': []
}
try:
item = product_data.get('item', {})
# Extract GTIN (item identifier)
identifiers = get_nested_dict_value(item, 'itemIdentificationInformation.itemIdentifier', [])
for identifier in identifiers:
if identifier.get('itemIdType', {}).get('value') == 'GTIN':
extracted_data['gtin'] = identifier.get('itemId', '')
break
# Extract item reference ID
item_ref_info = get_nested_dict_value(item, 'itemIdentificationInformation.itemReferenceIdInformation', {})
extracted_data['item_id'] = item_ref_info.get('itemReferenceId', '')
# Extract trade item information
trade_item_info = get_nested_dict_value(item, 'tradeItemInformation', [])
if trade_item_info:
# Extract brand name and product name
for info in trade_item_info:
desc_info = get_nested_dict_value(info, 'tradeItemDescriptionModule.tradeItemDescriptionInformation', [])
for desc in desc_info:
# Brand name
brand_info = desc.get('brandNameInformation', {})
if brand_info:
extracted_data['brand_name'] = brand_info.get('brandName', '')
# Product name
reg_names = desc.get('regulatedProductName', [])
for reg_name in reg_names:
values = get_nested_dict_value(reg_name, 'statement.values', [])
for value in values:
if value.get('value'):
extracted_data['product_name'] = value.get('value')
break
# Description
add_desc = get_nested_dict_value(desc, 'additionalTradeItemDescription.values', [])
for desc_val in add_desc:
if desc_val.get('value'):
extracted_data['description'] = desc_val.get('value')
break
# Extract images
file_module = get_nested_dict_value(info, 'referencedFileDetailInformationModule', {})
file_headers = file_module.get('referencedFileHeader', [])
for file_header in file_headers:
file_type = get_nested_dict_value(file_header, 'referencedFileTypeCode.value', '')
if file_type == 'PRODUCT_IMAGE':
uri = file_header.get('uniformResourceIdentifier', '')
is_primary = get_nested_dict_value(file_header, 'isPrimaryFile.value', '') == 'true'
image_data = {
'url': uri,
'is_primary': is_primary
}
extracted_data['images'].append(image_data)
# Set primary image as the main image URL
if is_primary and uri:
extracted_data['image_url'] = uri
# Extract dimensions
measurement_groups = get_nested_dict_value(info, 'tradeItemMeasurementsModuleGroup', [])
for group in measurement_groups:
measurements = get_nested_dict_value(group, 'tradeItemMeasurementsModule.tradeItemMeasurements', {})
if measurements:
# Height
height = measurements.get('height', {})
if height:
extracted_data['dimensions']['height'] = {
'value': height.get('value', ''),
'unit': height.get('qual', '')
}
# Width
width = measurements.get('width', {})
if width:
extracted_data['dimensions']['width'] = {
'value': width.get('value', ''),
'unit': width.get('qual', '')
}
# Depth
depth = measurements.get('depth', {})
if depth:
extracted_data['dimensions']['depth'] = {
'value': depth.get('value', ''),
'unit': depth.get('qual', '')
}
# Extract ingredients
ingredient_modules = get_nested_dict_value(info, 'foodAndBeverageIngredientModule', [])
for module in ingredient_modules:
statements = get_nested_dict_value(module, 'ingredientStatement', [])
for statement in statements:
values = get_nested_dict_value(statement, 'statement.values', [])
for value in values:
if value.get('value'):
extracted_data['ingredients'] = value.get('value')
break
# Extract country of origin
place_module = get_nested_dict_value(info, 'placeOfItemActivityModule', {})
countries = get_nested_dict_value(place_module, 'placeOfProductActivity.countryOfOrigin', [])
for country in countries:
country_code = get_nested_dict_value(country, 'countryCode.value', '')
if country_code:
extracted_data['country_of_origin'] = country_code
break
# Extract GPC code and category
product_categories = get_nested_dict_value(item, 'productCategory', [])
for category in product_categories:
scheme = get_nested_dict_value(category, 'productCategoryScheme.value', '')
if scheme == 'GPC':
category_codes = category.get('productCategoryCodes', [])
for code in category_codes:
gpc_code = get_nested_dict_value(code, 'productCategoryCode.value', '')
if gpc_code:
extracted_data['gpc_code'] = gpc_code
# Try to get category component
component = get_nested_dict_value(code, 'productCategoryComponent.value', '')
if component:
if component == 'BRICK':
extracted_data['category'] = gpc_code
elif component == 'SEGMENT':
extracted_data['subcategory'] = gpc_code
except Exception as e:
logger.error(f"Error extracting product data: {e}")
return extracted_data
[docs]
def extract_search_results(search_results: Dict) -> Dict:
"""
Extract structured data from search results
Args:
search_results (dict): Search results from the API
Returns:
dict: Structured search results with metadata and products
"""
result = {
'metadata': {
'response_code': search_results.get('responseCode'),
'response_message': search_results.get('responseMessage'),
'total_results': int(search_results.get('totalNumOfResults', '0')),
'next_cursor': search_results.get('nextCursorMark')
},
'products': []
}
# Extract product data
for product in search_results.get('results', []):
product_data = extract_product_data(product)
result['products'].append(product_data)
return result
[docs]
def get_primary_image(product_data: Dict) -> str:
"""
Get the primary image URL from product data
Args:
product_data (dict): Product data
Returns:
str: Primary image URL or empty string if not found
"""
# First check if we already extracted the image URL
if product_data.get('image_url'):
return product_data['image_url']
# Otherwise, look through images for a primary one
for image in product_data.get('images', []):
if image.get('is_primary'):
return image.get('url', '')
# If no primary image, return the first image if available
if product_data.get('images'):
return product_data['images'][0].get('url', '')
return ''
[docs]
def format_dimensions(dimensions: Dict) -> str:
"""
Format dimensions as a string
Args:
dimensions (dict): Dimensions dictionary
Returns:
str: Formatted dimensions string
"""
if not dimensions:
return ''
parts = []
for dim_name in ['height', 'width', 'depth']:
dim = dimensions.get(dim_name, {})
if dim and dim.get('value') and dim.get('unit'):
parts.append(f"{dim_name.capitalize()}: {dim['value']} {dim['unit']}")
return ', '.join(parts)