o
    ?eb                     @   s  d Z ddlZddlZddlZddlZddlZddlZddlZddlm	Z	 ddl
mZ ddlmZ ddlmZ ddlmZ ddlmZ dd	lmZ dd
lmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ da e! Z"dZ#dZ$dd Z%G dd dZ&dS )z>Utilities for saving/loading Trackable objects asynchronously.    N)logging)checkpoint_context)device_util)ShardedVariable)context)def_function)executor)device)ops)UninitializedVariable)Variable)metrics)object_identityZasync_checkpoint!_create_copy_for_async_checkpointc                 C   s   || k rdS t ||  d S )a  Calculate the duration between start and end time.

  Args:
    start_time_seconds: The start time in seconds.
    end_time_seconds: The end time in seconds.

  Returns:
    The duration between the start and the end time. Return 0 if
    end_time_seconds < start_time_seconds.
  r   i@B )round)Zstart_time_secondsZend_time_seconds r   u/home/www/facesmatcher.com/pyenv/lib/python3.10/site-packages/tensorflow/python/checkpoint/async_checkpoint_helper.py_get_duration_microseconds5   s   r   c                   @   s   e Zd ZdZd+ddZejdd Zejdd Zd	d
 Z	dd Z
dd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zedd Zd+dd Zd,d!d"Zd+d#d$Zd+d%d&Zd+d'd(Zd)d* ZdS )-AsyncCheckpointHelperz"Helper class for async checkpoint.Nc                 K   s  |rt |tjr| n|}||d< |  |du rtd|| _|| _d| _|   d| _	d| _
d| _d| _d| _d| _t pAd| _t| j| _d| _d| _d| _tjdd| _t| j d| _t tdu rvt aW d   dS W d   dS 1 sw   Y  dS )a  Initialize AsyncCheckpoint.

    Args:
      checkpointer_impl: The Checkpoint class to power the AsyncCheckpoint.
      root: The root object to checkpoint. `root` may be a trackable object or
        `WeakRef` of a trackable object.
      **kwargs: The keyword arguments representing the checkpointed variables.

    Raises:
      AttributeError: when checkpointer_impl is None.
    rootNz;checkpointer_impl cannot be None for AsyncCheckpointHelper.FzCPU:0   )maxsize) 
isinstanceweakrefrefZ_maybe_initialize_trackableAttributeError_checkpointer_impl_checkpoint_items_checkpointcheckpointer_checkpoint_options_initialized_async_write_done_callback_original_nodes_object_map_tpu_embedding_objectsr   current_default_deviceZcanonicalize_save_file_prefix_use_checkpoint_save_async_save_threadqueueQueue_queueatexitregister_join_async_save_thread_async_error"_END_TIME_OF_LAST_ASYNC_WRITE_LOCK_END_TIME_OF_LAST_ASYNC_WRITEtime)selfZcheckpointer_implr   kwargsZtrackable_rootr   r   r   __init__I   s@   	
"zAsyncCheckpointHelper.__init__c              	   C   sh   | j  D ],\}}t|tst|trqt|j ||	  W d   n1 s,w   Y  qdS )zCopy the checkpointed variables from the host CPU to the accelerator.

    TODO(chienchunh): Get the concrete function before firstly called to avoid
                      hangining the accelerators idle during function tracing.
    N)
r$   itemsr   r   hasattr_TPU_EMBEDDING_ATTRr
   r	   assign
read_value)r5   accelerator_varcpu_varr   r   r   _copy_from_cpu   s   z$AsyncCheckpointHelper._copy_from_cpuc              	   C   s|   | j  D ],\}}t|tst|trqt|j ||	  W d   n1 s,w   Y  q| j
D ]}|  q5dS )zCopy the checkpointed variables from the accelerator to the host CPU.

    TODO(chienchunh): Get the concrete function before firstly called to avoid
                      hangining the accelerators idle during function tracing.
    N)r$   r8   r   r   r9   r:   r
   r	   r;   r<   r%   Z_retrieve_variables)r5   r=   r>   tpu_embeddingr   r   r   _copy_to_cpu   s   

z"AsyncCheckpointHelper._copy_to_cpuc                 C   s   |r@|  }| j| t|ttfr| | t|tr"| 	| |j
dd D ]}||v r1q*|| || q*|sdS dS )a  Create the copied nodes and variables while traversing the nodes.

    This method performs a BFS to traverse the nodes while avoiding duplicated
    visits. Throughout the process, self._mapping, self._original_nodes, and
    self._var_pairs are populated.

    Args:
      to_traverse: A deque that stores the nodes to be traversed.
      visited: A list of nodes that have been visited.
    
checkpoint)	save_typeN)popleftr#   appendr   r   r   _copy_trackabler9   r:   _handle_tpu_embeddingZ_trackable_childrenvaluesadd)r5   to_traversevisitedcurrent_trackablechildr   r   r   _traverse_variables   s"   



z)AsyncCheckpointHelper._traverse_variablesc                 C   s$   | j du r| jdi | j| _ | j S )z3Gets or creates the underlying Checkpoint instance.Nr   )r   r   r   r5   r   r   r   r      s   
z"AsyncCheckpointHelper.checkpointerc           
      C   s`  | j rdS g | _t | _g | _tg }t }| j	
 D ]#}t|ttfr-| | n
t|tr7| | || || q| || | jD ]=}dt|v r| }|D ].}| jD ](}t|tsfq^z|||}W n ttfyy   Y q^w t|ttfr| | q^qYqK|  j }	td|	 | j|  j_t j!| j"dd| _#| j#$  d| _ dS )z/Initialize the async checkpoint internal state.Nget_slot_namesz0Initializing async checkpoint's save_counter: %dT)targetdaemon)%r!   r#   r   ZObjectIdentityDictionaryr$   r%   collectionsdequeZObjectIdentitySetr   rH   r   r   r   rF   r9   r:   rG   rE   rI   rN   dirrP   Zget_slotr   KeyErrorr   save_counternumpyr   infoZ_saver	threadingThread_async_saver*   start)
r5   rJ   rK   vrL   Z
slot_namesZ	slot_nameZoriginal_variableZoriginal_slot_variablerW   r   r   r   _ensure_initialized   sX   










z)AsyncCheckpointHelper._ensure_initializedc                 C   s*   | j r| j }d| _ tdt| |dS )zMExpose the most recent error from the async saving thread to the caller.
    NzJPropagating the most recent error from the async thread before joining: %s)r1   r   errorstr)r5   er   r   r   _check_async_thread_error  s   z/AsyncCheckpointHelper._check_async_thread_errorc                 C   sz   z7z| j jddd td | jdur| j  W n tjy)   td Y nw W | 	  dS W | 	  dS | 	  w )a  Join the async save thread.

    The steps for terminating the async save thread:
    1). Put will succeed when the last async save event is done. Putting a false
        triggers the async save thread's while loop to end. We use put instead
        of sync because sync does not have a timeout argument.
    2). Join the async save thread. (The thread may finish before joining.)
    Fi,  )timeoutzJoining the async save thread.NzrTimeout waiting for the async save thread; terminating the thread instead. The last checkpoint may be incomeplete.)
r-   putr   rY   r*   joinr+   Fullr`   rc   rO   r   r   r   r0     s   	


z-AsyncCheckpointHelper._join_async_save_threadc                 C   s  t tjddd | j rtd| j t		 }zdzGt
| j7 t # | jr7|  | j| j n|  j| j| j| jd W d   n1 sNw   Y  W d   n1 s]w   Y  W n tyx } z	|| _W Y d}~nd}~ww W | j  n| j  w t		 }tjtt||d t tjttt|d |aW d   n1 sw   Y  | j sW d   n1 sw   Y  td dS )z2The thread function for the async checkpoint save.F)Zenable_asyncZenable_streaming_enqueuez0Starting async checkpoint save on the device: %s)optionswrite_done_callbackNZ	api_labelmicrosecondsz3Async save thread reached the end of the execution.)r   Zexecutor_scoper   Znew_executorr-   getr   rY   r'   r4   r
   r	   r   Zasync_metrics_contextr)   r   saver(   r    _writer"   	Exceptionr1   	task_doner   ZAddAsyncCheckpointWriteDuration_ASYNC_CHECKPOINTr   r2   ZAddTrainingTimeSavedr3   )r5   Zasync_save_start_timerb   Zasync_save_end_timer   r   r   r\   -  sj   


5z!AsyncCheckpointHelper._async_savec                 C   sl   t j|jjddd }t| t|j|j	|j
|jd}W d   n1 s*w   Y  || j|< dS )zvCreate a new instance for the input trackable.

    Args:
      original_var: Input Variable object to be copied.
    ZCPUr   )Zdevice_typeZdevice_index)	trainableshapedtypenameN)pydevZ
DeviceSpecZfrom_stringr	   replaceZ	to_stringr
   r   rr   rs   rt   Z_shared_namer$   )r5   original_varZ	op_deviceZnew_varr   r   r   _copy_for_variablef  s   z(AsyncCheckpointHelper._copy_for_variablec                 C   sB   g }|j D ]}| | || j|  qt||jd| j|< dS )zCreate a new instance for the input ShardedVariable.

    Args:
      original_var: Input ShardedVariable object to be copied.
    )ru   N)Z
_variablesry   rE   r$   r   ru   )r5   rx   Zcopied_varsr^   r   r   r   _copy_for_sharded_variablev  s   

z0AsyncCheckpointHelper._copy_for_sharded_variablec                 C   s8   t |tr| | dS t |tr| | dS td)zCreate a new instance for the input trackable.

    Args:
      original_trackable: The trackable instance to be copied.

    Raises:
      AttributeError: if the input trackable is not Variable or ShardedVariable.
    z/Only Variable or ShardedVariable can be copied.N)r   r   rz   r   ry   r   )r5   Zoriginal_trackabler   r   r   rF     s
   
	
z%AsyncCheckpointHelper._copy_trackablec                 C   sp   t |tr
t|jstdt| |j|j|jr|jd nd|jd}|| j	|< || j
vr6| j
| dS dS )zHandle TPUEmbedding.

    Args:
      tpu_embedding: TPUEmbedding object to be handled.

    Raises:
      AttributeError: if the input trackable is not TPUEmbedding type.
    z#Expecting TPUEmbedding type; got %sr   N)Zfeature_configZ	optimizerZ#pipeline_execution_with_tensor_core)r9   r:   callabler   r   typeZ_feature_configZ_table_configZ$_pipeline_execution_with_tensor_corer$   r%   rE   )r5   r@   Znew_embeddingr   r   r   rG     s&   	


z+AsyncCheckpointHelper._handle_tpu_embeddingc                 C   s
   |   jS )a  An integer variable numbering the checkpoint events.

    This is maintained by the underlying tf.train.Checkpoing object employed by
    AsyncCheckpoint class. The number starts at 0 and gets incremented for each
    checkpoint event.

    Returns:
      The save counter variable.
    )r   rW   rO   r   r   r   rW     s   
z"AsyncCheckpointHelper.save_counterc                 C   s   |  || dS )Save the checkpointed variables.

    Args:
      save_path: The file prefix of the checkpoint file.
      options: Optional CheckpointOption instance.

    Returns:
      The full path of the checkpoint file.
    N)rn   r5   	save_pathrh   r   r   r   write  s   
zAsyncCheckpointHelper.writec                 C   s   |    t }| j  |   |   t  || _d| _	|r&t

|nd| _| jr0d| j_|| _| jd t }tjtt||d |S )a  Save the checkpointed variables.

    This method has exactly the same logic as save(), except it does not
    increment the underlying save_counter, which is done by the caller, e.g.,
    CheckpointManager.

    Args:
      save_path: The file prefix of the checkpoint file.
      options: Optional CheckpointOption instance.
      write_done_callback: Optional callback function executed after the async
        write is done.

    Returns:
      The full path of the checkpoint file.
    FNTrj   )r_   r4   r-   rf   rA   rc   r   
async_waitr(   r)   copyr    $experimental_enable_async_checkpointr"   re   r   AddCheckpointWriteDurationrq   r   )r5   r   rh   ri   Zwrite_start_timeZwrite_end_timer   r   r   rn     s*   
zAsyncCheckpointHelper._writec                 C   s   |    t }| j  |   |   |  j d }d	||}t
  || _d| _|r5t|nd| _| jr?d| j_| jd t }tjtt||d |S )r}   r   z{}-{}TNFrj   )r_   r4   r-   rf   rA   rc   r   rW   rX   formatr   r   r(   r)   r   r    r   re   r   r   rq   r   )r5   r   rh   Zsave_start_timerW   	full_pathZsave_end_timer   r   r   rm     s(   
zAsyncCheckpointHelper.savec                 C   s   |  ||S )a  Restore the checkpointed variables.

    This method has exactly the same logic as restore(). This method is
    implemented only to fulfill the duty of subclassing tf.train.Checkpoint.

    Args:
      save_path: The full name of the checkpoint file to be restored.
      options: CheckpointOption instance.

    Returns:
      A load status object, which can be used to make assertions about the
      status of a checkpoint restoration. See tf.train.Checkpoint.restore()
      for more details.
    )restorer~   r   r   r   read@  s   zAsyncCheckpointHelper.readc                 C   sR   |rt  |n| j| _| jrd| j_| j  |  || j}| jr'|   |S )a`  Restore the checkpointed variables.

    Args:
      save_path: The full name of the checkpoint file to be restored.
      options: CheckpointOption instance.

    Returns:
      A load status object, which can be used to make assertions about the
      status of a checkpoint restoration. See tf.train.Checkpoint.restore()
      for more details.
    F)	r   r    r   r-   rf   r   r   r!   r?   )r5   r   rh   statusr   r   r   r   Q  s   
zAsyncCheckpointHelper.restorec                 C   s   | j   td dS )z+Sync on any ongoing save or restore events.zSync on ongoing save/restore.N)r-   rf   r   rY   rO   r   r   r   syncq  s   
zAsyncCheckpointHelper.sync)N)NN)__name__
__module____qualname____doc__r7   r   functionr?   rA   rN   r   r_   rc   r0   r\   ry   rz   rF   rG   propertyrW   r   rn   rm   r   r   r   r   r   r   r   r   F   s2    
K

6
9!



6
=
 r   )'r   r.   rS   r   r+   rZ   r4   r   Zabslr   Ztensorflow.python.checkpointr   Ztensorflow.python.distributer   Z-tensorflow.python.distribute.sharded_variabler   Ztensorflow.python.eagerr   r   r   Ztensorflow.python.frameworkr	   rv   r
   Z+tensorflow.python.ops.resource_variable_opsr   Ztensorflow.python.ops.variablesr   Z0tensorflow.python.saved_model.pywrap_saved_modelr   Ztensorflow.python.utilr   r3   Lockr2   rq   r:   r   r   r   r   r   r   <module>   s6   