o
    ?e;                     @   sL  d Z ddlZddlZddlmZ ddlmZ ddlmZ ddl	m
Z ddlmZ ddlmZ dd	lmZ dd
lmZ ddlmZ ddlmZ ddlmZ ddlmZ daG dd dejZd'ddZG dd deZdd Z G dd dej!Z"			d(ddZ#dd Z$G dd  d ej%Z&G d!d" d"eZ'G d#d$ d$eZ(G d%d& d&eZ)dS ))zCOperations for handling session logging and shutdown notifications.    N)text_format)
config_pb2)	event_pb2)session)dtypes)errors)ops)	array_ops)
tf_logging)tpu_ops)session_run_hook)training_utilc                   @   s   e Zd ZdZdd ZdS )CoordinatorResetErrorz/Raised when the monitored session should reset.c                 C   s   t j| d d d d S )Nz.Resetting session loop due to worker shutdown.)r   AbortedError__init__self r   f/home/www/facesmatcher.com/pyenv/lib/python3.10/site-packages/tensorflow/python/tpu/session_support.pyr   (   s   zCoordinatorResetError.__init__N)__name__
__module____qualname____doc__r   r   r   r   r   r   %   s    r   c                 C   s"   t j| j| j|r|dS | jdS )N)targetconfiggraph)session_libSessionsess_str_configr   )r   r   r   r   r   _clone_session-   s   r    c                   @   sX   e Zd ZdZdd Zedd Zdd Zdd	 ZdddZ	dd Z
dd ZdddZd
S )WorkerHeartbeatManagerz:Manages the status/heartbeat monitor for a set of workers.c                 C   s   || _ || _|| _|| _dS )a  Construct a new WorkerHeartbeatManager.

    (Prefer using `WorkerHeartbeatManager.from_devices` when possible.)

    Args:
      session: `tf.compat.v1.Session`, session to use for heartbeat operations.
      devices: `list[string]` Set of devices to connect to.
      heartbeat_ops: `list[tf.Operation]` Heartbeat operations.
      request_placeholder: `tf.Placeholder[String]` Placeholder used to specify
        the WorkerHeartbeatRequest protocol buffer.
    N)_session_devices_ops_request_placeholder)r   r   devicesheartbeat_opsrequest_placeholderr   r   r   r   7   s   
zWorkerHeartbeatManager.__init__c              	   C   s   |st d t d| tjdtjd}g }|D ]}t| |	t
| W d   n1 s3w   Y  qt| |||S )z4Construct a heartbeat manager for the given devices.z3Trying to create heartbeat manager with no devices?z!Creating heartbeat manager for %sZworker_heartbeat_request)nameZdtypeN)loggingerrorinfor	   placeholderr   stringr   deviceappendr   Zworker_heartbeatr!   )r   r&   r(   r'   r/   r   r   r   from_devicesH   s   
z#WorkerHeartbeatManager.from_devicesc                 C   s
   t | jS N)lenr#   r   r   r   r   num_workersZ   s   
z"WorkerHeartbeatManager.num_workersc                 C   s0   t dt| | j| j| j| i dS )zConfigure heartbeat manager for all devices.

    Args:
      message: `event_pb2.WorkerHeartbeatRequest`
    Returns: `None`
    z Configuring worker heartbeat: %sN)	r*   r,   r   ZMessageToStringr"   runr$   r%   SerializeToString)r   messager   r   r   	configure]   s   
z WorkerHeartbeatManager.configureN`  c                 C   sX   |du rt  }tj|d}| jj| j| j| i|d}dd |D }t	
d| |S )z6Ping all workers, returning the parsed status results.N)timeout_in_ms)Z	feed_dictoptionsc                 S   s   g | ]}t j|qS r   )r   ZWorkerHeartbeatResponseZ
FromString).0Zres_pbr   r   r   
<listcomp>s   s    
z/WorkerHeartbeatManager.ping.<locals>.<listcomp>zPing results: %s)r   WorkerHeartbeatRequestr   Z
RunOptionsr"   r5   r$   r%   r6   r*   debug)r   requestr:   r;   resultsZparsed_resultsr   r   r   pingi   s   zWorkerHeartbeatManager.pingc                 C   sh   |   }g }t|| j| jD ]\}}}|jtjkr |||f q|s%dS t| \}}t| j	||| j
S )zFPing all workers, returning manager containing lame workers (or None).N)rB   zipr#   r$   Zhealth_statusr   OKr0   r!   r"   r%   )r   Zping_resultslame_workersZping_responser/   opZbad_devicesZbad_opsr   r   r   rE   z   s   
z#WorkerHeartbeatManager.lame_workersc                 C   s   dd | j S )NzHeartbeatManager(%s),)joinr#   r   r   r   r   __repr__      zWorkerHeartbeatManager.__repr__r   c                 C   s^   t d|  tjtj|dtjtj|dd}| | d|d  }t d| t	| dS )	z3Shutdown all workers after `shutdown_timeout_secs`.zShutting down %s.
timeout_ms	exit_code)watchdog_configshutdown_moderN         $@  z)Waiting %.2f seconds for worker shutdown.N)
r*   r,   r   r>   WatchdogConfigZSHUTDOWN_AFTER_TIMEOUTZRequestedExitCoder8   timesleep)r   Zwait_time_in_msrN   reqZ	sleep_secr   r   r   shutdown   s   


zWorkerHeartbeatManager.shutdown)Nr9   )r9   r   )r   r   r   r   r   staticmethodr1   r4   r8   rB   rE   rI   rW   r   r   r   r   r!   4   s    

r!   c                 C   sB   |   }g }|D ]}|j}d|v rd|vr||dd q|S )z7Return a list of devices for each worker in the system.z:TPU:0ZcoordinatorZTPUZCPU)Zlist_devicesr)   r0   replace)r   r&   Zdevices_that_support_heartbeatsr/   r)   r   r   r   all_worker_devices   s   rZ   c                   @   sR   e Zd ZdZ			dddZddd	Zd
d Zdd Zdd Zdd Z	dd Z
dS )WatchdogManagera  Configures worker watchdog timer and handles periodic pings.

  Usage:
    # Ping workers every minute, shutting down workers if they haven't received
    # a ping after 1 hour.
    watchdog_manager = WatchdogManager(
      ping_interval=60, shutdown_timeout=3600
    )

    # Use as a context manager, resetting watchdog on context exit:
    with watchdog_manager:
      session.run(...)

    # Or setup globally; watchdog will remain active until program exit.
    watchdog_manager.configure_and_run()
  N<      c                 C   sP   t j|  || _|| _d| _|j| _|j| _d| _	|| _
d| _d| _d| _dS )a  Initialize a watchdog manager.

    Args:
      session: Session connected to worker devices.  A cloned session and graph
        will be created for managing worker pings.
      devices: Set of devices to monitor.  If none, all workers will be
        monitored.
      ping_interval: Time, in seconds, between watchdog pings.
      shutdown_timeout: Time, in seconds, before watchdog timeout.
    TFN)	threadingThreadr   ping_intervalshutdown_timeoutdaemonr   r   _target_runningr#   _graphr"   _worker_manager)r   r   r&   r`   ra   r   r   r   r      s   
zWatchdogManager.__init__Fc                 C   s   t  | _tj| j| j| jd| _| jdu rt	| j| _| j
  t| j| j| _W d   n1 s5w   Y  |rBd}tj}n| jd }tj}| jtjtj|d|d dS )z,Reset the graph, session and worker manager.)r   r   r   NrR   rK   )rO   rP   )r   Graphre   r   r   rc   r   r"   r#   rZ   
as_defaultr!   r1   rf   r   ZNOT_CONFIGUREDra   WAIT_FOR_COORDINATORr8   r>   rS   )r   stoppingrL   rP   r   r   r   _reset_manager   s0   



zWatchdogManager._reset_managerc                 C   s,   t d| j| j |   d| _|   d S )NzKEnabling watchdog timer with %d second timeout and %d second ping interval.T)r*   r,   ra   r`   rl   rd   startr   r   r   r   configure_and_run   s   z!WatchdogManager.configure_and_runc                 C   s(   t d | jdd d| _|   d S )NzStopping worker watchdog.T)rk   F)r*   r,   rl   rd   rH   r   r   r   r   stop  s   
zWatchdogManager.stopc                 C      |    d S r2   )rn   r   r   r   r   	__enter__     zWatchdogManager.__enter__c                 C   rp   r2   )ro   )r   exc_typeexc_valexc_tbr   r   r   __exit__
  rr   zWatchdogManager.__exit__c              
   C   sn   | j r5z| jjd d t| j W n tjy/ } zt	d| | 
  W Y d }~nd }~ww | j sd S d S )N)r@   z(Caught error while sending heartbeat: %s)rd   rf   rB   rT   rU   r`   r   ZOpErrorr*   r?   rl   )r   er   r   r   r5     s   zWatchdogManager.run)Nr\   r]   )F)r   r   r   r   r   rl   rn   ro   rq   rv   r5   r   r   r   r   r[      s    

	r[   r\     c                 C   s4   t du rt|d |}t| |||a t   dS dS )zEStart global worker watchdog to shutdown workers on coordinator exit.NrQ   )	_WATCHDOGminr[   rn   )r   r&   r`   ra   r   r   r   start_worker_watchdog  s   r{   c                   C   s   t durt   da dS dS )zStop global worker watchdog.N)ry   ro   r   r   r   r   stop_worker_watchdog)  s   r|   c                   @   s2   e Zd ZdZdddZdd Zdd Zd	d
 ZdS )GracefulShutdownHooka%  Session hook that watches for shutdown events.

  If a shutdown is indicated, `saver.save(checkpoint_prefix)` is executed, and a
  SystemShutdown exception is raised to terminate the main session.  If `saver`
  is None the `SAVERS` collection will be read to find a saver.

  `on_shutdown_hooks` is an optional list of functions that should be called
  after checkpointing.  The function is called with (`run_context`,
  `all_workers`, `lame_workers`).

  If `heartbeat_group` is not specified, it will default to all CPU workers
  in the system.
  Nc                 C   s:   || _ || _|r
|ng | _t | _d | _d | _d| _d S )NF)	_saver_checkpoint_prefix_on_shutdown_hooksr   rh   re   _workersr"   _heartbeat_supported)r   Zcheckpoint_prefixsaverZon_shutdown_hooksr   r   r   r   @  s   

zGracefulShutdownHook.__init__c              	   C   s   t  d u r|  d urtd| j ` td t|| j| _	t
| j	t| j	| _| j dk| _| jrXz| jtjtjd W qe tjyW   td d| _Y qmw td W d    d S W d    d S W d    d S 1 sxw   Y  d S )NzuSaver defined but no global step.  Run `get_or_create_global_step()` in your model definition to allow checkpointing.z"Installing graceful shutdown hook.r   )rP   zJTPU device does not support heartbeats. Failure handling will be disabled.FzANo workers support heartbeats. Failure handling will be disabled.)r   get_global_stepr   
ValueErrorre   ri   r*   r,   r    r"   r!   r1   rZ   r   r4   r   r8   r   r>   rj   r   ZInvalidArgumentErrorwarn)r   Ztraining_sessionZcoordr   r   r   after_create_sessionK  s@   

"z)GracefulShutdownHook.after_create_sessionc                 C   sR   | j r| j S ttjj}|sd S t|ts|S t|dkr%t	d d S |d S )N   zMultiple savers in the SAVERS collection.  On-demand checkpointing will be disabled. Pass an explicit `saver` to the constructor to override this behavior.r   )
r~   r   Zget_collectionZ	GraphKeysZSAVERS
isinstancelistr3   r*   r+   )r   Zsaversr   r   r   r   g  s   
zGracefulShutdownHook.saverc                 C   s   ~| j sd S | j }|r@td| |  r.td| j |  j|j| jt	
 dd ntd | jD ]}||| j| q6d S d S )Nz$ShutdownHook: lame workers found: %sz%ShutdownHook: saving checkpoint to %sT)Zglobal_stepZwrite_statezShutdownHook: no Saver defined.)r   r   rE   r*   r,   r   r   saver   r   r   r   )r   run_contextZ
run_valuesrE   fnr   r   r   	after_run{  s*   


zGracefulShutdownHook.after_run)NN)r   r   r   r   r   r   r   r   r   r   r   r   r}   1  s    
r}   c                   @       e Zd ZdZdd Zdd ZdS )ResetComputationzHook to reset a TPUEstimator computation loop.

  This hook shuts down all workers and resets the monitored session loop by
  throwing a CoordinatorResetError.
  c                 C      d S r2   r   r   r   r   r   r        zResetComputation.__init__c                 C   s    ~~|j dd td t )N*   rM   zResetting coordinator.)rW   r*   r,   r   r   r   Zall_workersrE   r   r   r   __call__  s   
zResetComputation.__call__Nr   r   r   r   r   r   r   r   r   r   r         r   c                   @   r   )ShutdownLameWorkersz~Shutdown lamed workers.

  Processing will continue normally (typically by waiting for the down
  workers to be restarted).
  c                 C   r   r2   r   r   r   r   r   r     r   zShutdownLameWorkers.__init__c                 C   s   |j dd d S Nr   rM   rW   r   r   r   r   r     rJ   zShutdownLameWorkers.__call__Nr   r   r   r   r   r     r   r   c                   @   r   )ShutdownAllWorkersz|Shutdown all workers.

  Processing will continue normally (typically by waiting for the down
  workers to be restarted).
  c                 C   r   r2   r   r   r   r   r   r     r   zShutdownAllWorkers.__init__c                 C   s   |j dd d S r   r   r   r   r   r   r     rJ   zShutdownAllWorkers.__call__Nr   r   r   r   r   r     r   r   r2   )Nr\   rx   )*r   r^   rT   Zgoogle.protobufr   Ztensorflow.core.protobufr   Ztensorflow.core.utilr   Ztensorflow.python.clientr   r   Ztensorflow.python.frameworkr   r   r   Ztensorflow.python.opsr	   Ztensorflow.python.platformr
   r*   Ztensorflow.python.tpu.opsr   Ztensorflow.python.trainingr   r   ry   r   r   r    objectr!   rZ   r_   r[   r{   r|   ZSessionRunHookr}   r   r   r   r   r   r   r   <module>   s<   
kn
d