Google Workflows est un outil d'orchestration serverless proposé par Google Cloud Platform (GCP) permettant de séquencer et orchestrer un ensemble d’étapes correspondant à des actions utilisant d’autres services tels que des APIs ou des solutions Google Cloud comme Pub/Sub ou Cloud Run.
Cette solution est puissante et très largement utilisée dans un grand nombre de projets Data. Cependant, une limitation peut rapidement devenir un problème bloquant. En effet, Google Workflows a une limite de temps d’exécution de 30 minutes par étape. Dans certains projets, nous pouvons avoir besoin d'exécuter des jobs qui nécessitent beaucoup plus de temps.
Dans ce tutoriel, on vous détaille comment contourner habilement cette limitation à l'aide de Cloud Run, Cloud Functions et Pub/Sub. Vous y découvrirez une pratique qui permet de déclencher vos jobs longs sans rupture de workflow, tout en gardant le bénéfice de la supervision centralisée.
Les principales étapes à réaliser
Les principales étapes de notre projet de contournement de la limitation de temps sont:
- Générer une URL de callback: Dans le workflow Google Cloud, générer une URL de callback. Cette URL sert de moyen de communication entre le workflow et d'autres composants ou systèmes. L'URL de callback est ensuite passée à votre script en tant que variable d'environnement.
- Déclencher un job : Dans le workflow, exécutez votre job Cloud Run.
- Envoyer une requête POST : À la toute fin de votre script concernant le job en cours (Cloud Run Job), une étape doit être ajoutée pour envoyer une requête POST à l'URL de callback générée par le workflow. Cette requête POST indique que la tâche ou le job en cours est terminé.
- Attendre le rappel : Le workflow entre ensuite dans un état d'attente, pendant lequel il attend une requête POST sur l'URL de callback. La période d'attente de cette étape est modifiable en fonction du job et peut s'étendre jusqu'à un an.
- Continuer sur le rappel : Une fois que le workflow reçoit une requête POST sur l'URL de callback, indiquant l'achèvement du job, il passe aux étapes suivantes.
Si vous avez plusieurs jobs exécutés en parallèle, vous pouvez boucler sur le nombre de jobs pour obtenir ce nombre de requêtes POST sur l'URL de callback.
Le trigger du job cloud run
Cependant, il y a un souci en parallèle : lorsqu'il y a une étape pour déclencher un job Cloud Run dans le workflow en utilisant googleapis.run.v1.namespaces.jobs.run
, par défaut le workflow déclenche le job et attend qu'il soit terminé avant de passer aux étapes suivantes. Nous avons donc besoin d'un moyen pour déclencher le job Cloud Run via le workflow sans utiliser directement googleapis.run.v1.namespaces.jobs.run
.
Pour ce faire, nous devons créer une fonction Google Cloud qui est responsable du déclenchement du job Cloud Run (la fonction cloud ne fait que déclencher et n'attend pas la fin du job pour sortir). Ainsi, tout ce que nous avons à faire dans le workflow est d'envoyer un message Pub/Sub pour déclencher la fonction cloud, qui à son tour déclenche le job Cloud Run.
L'intérêt de faire ce contournement est que Google Workflow a une faille concernant le délai d'expiration des étapes : lorsque l'appel utilisé à l'étape est events.await_callback, vous pouvez définir un délai d'expiration allant jusqu'à un an sans que le workflow ait des problèmes.
Utiliser Cloud Function comme déclencheur du job
Voici les étapes à suivre pour implémenter cela, brièvement :
- Créez un sujet Google Cloud Pub/Sub.
- Créez une cloud function et définissez son déclencheur pour être votre sujet Pub/Sub.
- Implémentez la logique pour déclencher la tâche Cloud Run dans votre fichier main.py de la cloud function.
La fonction cloud utilise l'API Python de Google Cloud pour exécuter la tâche Cloud Run, et toute variable que vous souhaitez passer à votre tâche Cloud Run peut être incluse dans le corps du message Pub/Sub, qui sera traité par la fonction cloud.
Voici un exemple simple de ce à quoi la fonction cloud peut ressembler pour déclencher une tâche Cloud Run :
import json
import base64
import time
from loguru import logger
from google.cloud import run, run_v2
import functions_framework
@functions_framework.http
def trigger_cloudrun_job(request):
client = run.JobsClient()
# parse the body of the sent pub/sub message
pubsub_message = request.get_data(as_text=True)
pubsub_data = json.loads(pubsub_message)['message']['data']
message_dict = base64.b64decode(pubsub_data).decode('utf-8')
json_message = json.loads(message_dict)
job_id = json_message["JOB_ID"] # ID of the cloudrun job to be triggered
project_id = json_message["PROJECT_ID"]
region = json_message["LOCATION"]
env_vars = json_message.get("ENV_VARS", {})
environment_variables = []
for key, value in env_vars.items():
env_var = run_v2.EnvVar(name=str(key), value=str(value))
environment_variables.append(env_var)
job_name = f"projects/{project_id}/locations/{region}/jobs/{job_id}"
# define the environment variables to be passed to the cloudrun job
override_spec = {
'container_overrides': [
{
'env': environment_variables
}
]
}
request = run_v2.RunJobRequest(
name=job_name,
overrides=override_spec
)
response = client.run_job(request=request)
return 'Cloud Run job triggered successfully!'
Il est très important de ne pas ajouter d'étape d'attente pour que la tâche Cloud Run se termine car nous voulons uniquement la déclencher puis passer à l'étape suivante du workflow qui attend la requête POST sur son URL de rappel.
L’envoi du post sur l’URL du callback dans le job
À la toute fin du code source de votre tâche Cloud Run, vous devez ajouter une étape qui envoie une requête POST à l'URL de rappel callback par le workflow afin de signaler à l'étape du workflow qui attend l'appel que la tâche est terminée et de passer à l'étape suivante. L'URL de callback est transmise à votre tâche Cloud Run depuis le workflow et via la cloud function en tant que variable d'environnement.
Voici la fonction à ajouter:
def send_callback(callback_url, access_token):
headers = {
'Authorization': f'Bearer {access_token}'
}
try:
response = requests.post(url=callback_url, headers=headers)
response.raise_for_status()
logger.info("Callback sent successfully.")
except requests.exceptions.RequestException as e:
raise Exception(f"Error sending callback: {e}")
Le workflow
Voici une vue d'ensemble générale de ce à quoi le workflow devrait ressembler :
main:
params: [event]
steps:
- init:
assign:
- project_id: tmf-image-analysis-prod
- location: us-west1
- location_repo: global
- run_date: ${event.RUN_DATE}
- x_start_date: ${event.X_START_DATE}
- sim_threshold: ${event.SIM_THRESHOLD}
- match_score_threshold: ${event.MATCH_SCORE_THRESHOLD}
- matching_frames_threshold: ${event.MATCHING_FRAMES_THRESHOLD}
# create the callback url
- create_first_callback:
call: events.create_callback_endpoint
args:
http_callback_method: "POST"
result: first_callback_details
# build the pub/sub message to be sent with the env vars in its body
# including the previously generated callback url
- build_event_job_compute_embeddings_puma:
call: json.encode
args:
data:
JOB_ID: compute_embeddings
INBOUND_DATE: ${run_date}
SOURCE: "puma"
CALLBACK_URL: ${first_callback_details.url}
result: event_job_compute_embeddings_puma_body
# send the pub/sub message that triggers the cloudfunction
- send_event_job_compute_embeddings_puma:
call: googleapis.pubsub.v1.projects.topics.publish
args:
topic: projects/tmf-image-analysis-prod/topics/cloud-run-trigger
body:
messages:
- data: ${base64.encode(event_job_compute_embeddings_puma_body)}
# wait for post to sent on the previously generated callback url
# ie when the cloud run job is completed
- await_first_callback:
call: events.await_callback
args:
callback: ${first_callback_details}
timeout: 18000 # set the wait time as needed - can be up to a year !
result: first_callback_request
Google Workflows est désormais essentiel pour orchestrer vos processus complexes tout en surmontant les défis de temps d'exécution. En tant qu'experts en gestion des workflows et en intégration de solutions Cloud, nous sommes prêts à vous aider à optimiser vos processus et à garantir une exécution fluide de vos tâches longues. Contactez-nous pour découvrir comment nous pouvons personnaliser une solution adaptée à vos besoins spécifiques !