| from flask import current_app |
| import requests |
| from requests_oauthlib import OAuth2Session |
| from urllib.parse import urlencode |
| import tempfile |
| import os |
| import logging |
|
|
| class LinkedInService: |
| """Service for LinkedIn API integration.""" |
|
|
| def __init__(self): |
| self.client_id = current_app.config['CLIENT_ID'] |
| self.client_secret = current_app.config['CLIENT_SECRET'] |
| self.redirect_uri = current_app.config['REDIRECT_URL'] |
| self.scope = ['openid', 'profile', 'email', 'w_member_social'] |
|
|
| def get_authorization_url(self, state: str) -> str: |
| """ |
| Get LinkedIn authorization URL. |
| |
| Args: |
| state (str): State parameter for security |
| |
| Returns: |
| str: Authorization URL |
| """ |
| linkedin = OAuth2Session( |
| self.client_id, |
| redirect_uri=self.redirect_uri, |
| scope=self.scope, |
| state=state |
| ) |
|
|
| authorization_url, _ = linkedin.authorization_url( |
| 'https://www.linkedin.com/oauth/v2/authorization' |
| ) |
|
|
| return authorization_url |
|
|
| def get_access_token(self, code: str) -> dict: |
| """ |
| Exchange authorization code for access token. |
| |
| Args: |
| code (str): Authorization code |
| |
| Returns: |
| dict: Token response |
| """ |
| import logging |
| logger = logging.getLogger(__name__) |
|
|
| logger.info(f"π [LinkedIn] Starting token exchange for code: {code[:20]}...") |
|
|
| url = "https://www.linkedin.com/oauth/v2/accessToken" |
| headers = { |
| "Content-Type": "application/x-www-form-urlencoded" |
| } |
| data = { |
| "grant_type": "authorization_code", |
| "code": code, |
| "redirect_uri": self.redirect_uri, |
| "client_id": self.client_id, |
| "client_secret": self.client_secret |
| } |
|
|
| logger.info(f"π [LinkedIn] Making request to LinkedIn API...") |
| logger.info(f"π [LinkedIn] Request URL: {url}") |
| logger.info(f"π [LinkedIn] Request data: {data}") |
|
|
| try: |
| response = requests.post(url, headers=headers, data=data) |
| logger.info(f"π [LinkedIn] Response status: {response.status_code}") |
| logger.info(f"π [LinkedIn] Response headers: {dict(response.headers)}") |
|
|
| response.raise_for_status() |
|
|
| token_data = response.json() |
| logger.info(f"π [LinkedIn] Token response: {token_data}") |
|
|
| return token_data |
| except requests.exceptions.RequestException as e: |
| logger.error(f"π [LinkedIn] Token exchange failed: {str(e)}") |
| logger.error(f"π [LinkedIn] Error type: {type(e)}") |
| raise e |
|
|
| def get_user_info(self, access_token: str) -> dict: |
| """ |
| Get user information from LinkedIn. |
| |
| Args: |
| access_token (str): LinkedIn access token |
| |
| Returns: |
| dict: User information |
| """ |
| import logging |
| logger = logging.getLogger(__name__) |
|
|
| logger.info(f"π [LinkedIn] Fetching user info with token length: {len(access_token)}") |
|
|
| url = "https://api.linkedin.com/v2/userinfo" |
| headers = { |
| "Authorization": f"Bearer {access_token}" |
| } |
|
|
| logger.info(f"π [LinkedIn] Making request to LinkedIn user info API...") |
| logger.info(f"π [LinkedIn] Request URL: {url}") |
| logger.info(f"π [LinkedIn] Request headers: {headers}") |
|
|
| try: |
| response = requests.get(url, headers=headers) |
| logger.info(f"π [LinkedIn] Response status: {response.status_code}") |
| logger.info(f"π [LinkedIn] Response headers: {dict(response.headers)}") |
|
|
| response.raise_for_status() |
|
|
| user_data = response.json() |
| logger.info(f"π [LinkedIn] User info response: {user_data}") |
|
|
| return user_data |
| except requests.exceptions.RequestException as e: |
| logger.error(f"π [LinkedIn] User info fetch failed: {str(e)}") |
| logger.error(f"π [LinkedIn] Error type: {type(e)}") |
| raise e |
|
|
| def _create_temp_image_file(self, image_bytes: bytes) -> str: |
| """ |
| Create a temporary file from image bytes. |
| |
| Args: |
| image_bytes: Image data as bytes |
| |
| Returns: |
| Path to the temporary file |
| """ |
| |
| temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.jpg') |
| temp_file_path = temp_file.name |
|
|
| try: |
| |
| temp_file.write(image_bytes) |
| temp_file.flush() |
| finally: |
| temp_file.close() |
|
|
| return temp_file_path |
|
|
| def _cleanup_temp_file(self, file_path: str) -> None: |
| """ |
| Safely remove a temporary file. |
| |
| Args: |
| file_path: Path to the temporary file to remove |
| """ |
| try: |
| if file_path and os.path.exists(file_path): |
| os.unlink(file_path) |
| except Exception as e: |
| |
| logging.error(f"Failed to cleanup temporary file {file_path}: {str(e)}") |
|
|
| def publish_post(self, access_token: str, user_id: str, text_content: str, image_url: str = None) -> dict: |
| """ |
| Publish a post to LinkedIn. |
| |
| Args: |
| access_token (str): LinkedIn access token |
| user_id (str): LinkedIn user ID |
| text_content (str): Post content |
| image_url (str or bytes, optional): Image URL or image bytes |
| |
| Returns: |
| dict: Publish response |
| """ |
| temp_file_path = None |
| url = "https://api.linkedin.com/v2/ugcPosts" |
| headers = { |
| "Authorization": f"Bearer {access_token}", |
| "X-Restli-Protocol-Version": "2.0.0", |
| "Content-Type": "application/json" |
| } |
|
|
| try: |
| if image_url and isinstance(image_url, bytes): |
| |
| temp_file_path = self._create_temp_image_file(image_url) |
|
|
| |
| register_body = { |
| "registerUploadRequest": { |
| "recipes": ["urn:li:digitalmediaRecipe:feedshare-image"], |
| "owner": f"urn:li:person:{user_id}", |
| "serviceRelationships": [{ |
| "relationshipType": "OWNER", |
| "identifier": "urn:li:userGeneratedContent" |
| }] |
| } |
| } |
|
|
| r = requests.post( |
| "https://api.linkedin.com/v2/assets?action=registerUpload", |
| headers=headers, |
| json=register_body |
| ) |
|
|
| if r.status_code not in (200, 201): |
| raise Exception(f"Failed to register upload: {r.status_code} {r.text}") |
|
|
| datar = r.json()["value"] |
| upload_url = datar["uploadMechanism"]["com.linkedin.digitalmedia.uploading.MediaUploadHttpRequest"]["uploadUrl"] |
| asset_urn = datar["asset"] |
|
|
| |
| upload_headers = { |
| "Authorization": f"Bearer {access_token}", |
| "X-Restli-Protocol-Version": "2.0.0", |
| "Content-Type": "application/octet-stream" |
| } |
|
|
| with open(temp_file_path, 'rb') as f: |
| image_data = f.read() |
| upload_response = requests.put(upload_url, headers=upload_headers, data=image_data) |
| if upload_response.status_code not in (200, 201): |
| raise Exception(f"Failed to upload image: {upload_response.status_code} {upload_response.text}") |
|
|
| |
| post_body = { |
| "author": f"urn:li:person:{user_id}", |
| "lifecycleState": "PUBLISHED", |
| "specificContent": { |
| "com.linkedin.ugc.ShareContent": { |
| "shareCommentary": {"text": text_content}, |
| "shareMediaCategory": "IMAGE", |
| "media": [{ |
| "status": "READY", |
| "media": asset_urn, |
| "description": {"text": "Post image"}, |
| "title": {"text": "Post image"} |
| }] |
| } |
| }, |
| "visibility": {"com.linkedin.ugc.MemberNetworkVisibility": "PUBLIC"} |
| } |
| elif image_url and isinstance(image_url, str): |
| |
| register_body = { |
| "registerUploadRequest": { |
| "recipes": ["urn:li:digitalmediaRecipe:feedshare-image"], |
| "owner": f"urn:li:person:{user_id}", |
| "serviceRelationships": [{ |
| "relationshipType": "OWNER", |
| "identifier": "urn:li:userGeneratedContent" |
| }] |
| } |
| } |
|
|
| r = requests.post( |
| "https://api.linkedin.com/v2/assets?action=registerUpload", |
| headers=headers, |
| json=register_body |
| ) |
|
|
| if r.status_code not in (200, 201): |
| raise Exception(f"Failed to register upload: {r.status_code} {r.text}") |
|
|
| datar = r.json()["value"] |
| upload_url = datar["uploadMechanism"]["com.linkedin.digitalmedia.uploading.MediaUploadHttpRequest"]["uploadUrl"] |
| asset_urn = datar["asset"] |
|
|
| |
| upload_headers = { |
| "Authorization": f"Bearer {access_token}", |
| "X-Restli-Protocol-Version": "2.0.0", |
| "Content-Type": "application/octet-stream" |
| } |
|
|
| |
| import os |
| if os.path.exists(image_url) and not image_url.startswith(('http://', 'https://', 'ftp://', 'file://')): |
| |
| with open(image_url, 'rb') as f: |
| image_content = f.read() |
| else: |
| |
| image_response = requests.get(image_url) |
| if image_response.status_code != 200: |
| raise Exception(f"Failed to download image from URL: {image_response.status_code} {image_response.text}") |
| image_content = image_response.content |
|
|
| |
| upload_response = requests.put(upload_url, headers=upload_headers, data=image_content) |
| if upload_response.status_code not in (200, 201): |
| raise Exception(f"Failed to upload image: {upload_response.status_code} {upload_response.text}") |
|
|
| |
| post_body = { |
| "author": f"urn:li:person:{user_id}", |
| "lifecycleState": "PUBLISHED", |
| "specificContent": { |
| "com.linkedin.ugc.ShareContent": { |
| "shareCommentary": {"text": text_content}, |
| "shareMediaCategory": "IMAGE", |
| "media": [{ |
| "status": "READY", |
| "media": asset_urn, |
| "description": {"text": "Post image"}, |
| "title": {"text": "Post image"} |
| }] |
| } |
| }, |
| "visibility": {"com.linkedin.ugc.MemberNetworkVisibility": "PUBLIC"} |
| } |
| else: |
| |
| post_body = { |
| "author": f"urn:li:person:{user_id}", |
| "lifecycleState": "PUBLISHED", |
| "specificContent": { |
| "com.linkedin.ugc.ShareContent": { |
| "shareCommentary": { |
| "text": text_content |
| }, |
| "shareMediaCategory": "NONE" |
| } |
| }, |
| "visibility": { |
| "com.linkedin.ugc.MemberNetworkVisibility": "PUBLIC" |
| } |
| } |
|
|
| response = requests.post(url, headers=headers, json=post_body) |
| response.raise_for_status() |
| return response.json() |
| except Exception as e: |
| |
| raise e |
| finally: |
| |
| if temp_file_path: |
| self._cleanup_temp_file(temp_file_path) |