o
    ?eZ                     @   sX  d Z ddlZddlZddlZddlmZ ddlmZ ddl	m
Z ddlmZ ddlmZ ddlmZ dd	lmZ dd
lmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlm Z  dd Z!d&ddZ"e# Z$ej%d'ddZ&ej%dd Z'dd Z(G dd de)Z*dd Z+d d! Z,G d"d# d#ej-Z.G d$d% d%ej/Z0dS )(z;Class MirroredStrategy implementing tf.distribute.Strategy.    N)
pywrap_tfe)ag_ctx)api)distribute_lib)distribute_utils)shared_variable_creator)context)def_functiondevice)ops)summary_ops_v2)variable_scope)
tf_logging)coordinator)traceback_utilsc                 C   s   t j| jdkS )NZGPU)	tf_device
DeviceSpecfrom_stringdevice_typer
    r   j/home/www/facesmatcher.com/pyenv/lib/python3.10/site-packages/tensorflow/python/distribute/mirrored_run.py_is_gpu_device&   s   r   c                    s   |du rd}|du ri }t  tjrV jr'tdd jjD r't ||S tvr1t	
 t< t  }|du rO fdd} j|d}|t  < ||i |S t rgttjdjj d	 nt t  t ||S )
a  Call `fn` on each worker devices(replica).

  It's highly recommended to wrap the call to this function inside a
  `tf.function`, otherwise the performance is poor.

  Args:
    strategy: `tf.distribute.Strategy`.
    fn: function to call on each worker devices.
    args: positional arguments to `fn`.
    kwargs: keyword arguments to `fn`.

  Returns:
    Wrapped returned value of `fn` from all replicas.
  Nr   c                 S   s   g | ]}t |qS r   )r   ).0dr   r   r   
<listcomp>C   s    z)call_for_each_replica.<locals>.<listcomp>c                     s   t  j| |S N)call_for_each_replicapython_function)argskwargsfnstrategyr   r   
wrapped_fnN   s   z)call_for_each_replica.<locals>.wrapped_fn)r   zUsing %s eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.   )
isinstancer	   FunctionZ_jit_compileallextendedworker_devices_call_for_each_replica_cfer_fn_cacheweakrefWeakKeyDictionarygetZ_cloner   executing_eagerlyloggingZlog_first_nWARN	__class____name__	autographZ
tf_convertautograph_ctxZcontrol_status_ctx)r#   r"   r   r    wrappedr$   r   r!   r   r   *   s8   r   c              	   c   s    |r@|   / t  |dur|| _dV  W d   n1 s!w   Y  W d   dS W d   dS 1 s9w   Y  dS |    |durL|| _dV  W d   dS 1 sZw   Y  dS )z;Context manager for selecting a graph and maybe eager mode.N)Z
as_defaultr   
eager_mode_variable_creator_stack)geagerZcreator_stackr   r   r   _enter_graphm   s   P
"r<   c                 c   sD    | rt   d V  W d    d S 1 sw   Y  d S d V  d S r   )r   r8   )r;   r   r   r   _maybe_enter_eager_mode|   s   
"
r=   c                 C   s"   t j| }|jddd}| S )NZCPUr   )r   Zdevice_index)r   r   r   replaceZ	to_string)r   Z
cpu_devicer   r   r   _cpu_device   s   r?   c                   @   s   e Zd ZdS )_RequestedStopN)r4   
__module____qualname__r   r   r   r   r@      s    r@   c                  C   s    t  r
t jh} | S t jh} | S r   )r   Zis_traceback_filtering_enabledZenable_traceback_filteringZdisable_traceback_filtering)thread_local_callablesr   r   r   (_get_thread_local_configuration_callable   s
   rD   c                 C   s  d}t  st   tjtfd}i }| jj	}t
 }g }	tt|D ]#}
t||
}t| ||
|||tjt|
|t|
||
}|	| q$|	D ]}|  qJz| h d}|s| sg }|r|	D ]}|j  qi|	D ]/}|j  |j  | r W d   W |	D ]}|j  q||	 dS ||j qsn7|	D ]4}|j  |j  |j  | r W d   W |	D ]}|j  q||	 dS ||j q| r	 W d   W |	D ]}|j  q||	 dS t|}|st|rt dt!t"dd |	D }t!t"dd |	D }|	d j#}|	d j$}t }|	D ]	}|%|j& q/t'|Z t(|D t))|. t*|	d j+ |	d j,| g|R i |}W d   n	1 slw   Y  W d   n	1 s|w   Y  W d   n	1 sw   Y  W d   n	1 sw   Y  t-|	D ]\}}t|||_.q|s| rcW d   n	1 sw   Y  W |	D ]}|j  q||	 n|	D ]}|j  q||	 w t!t"d	d |	D S )
a  Run `fn` in separate threads, once per replica/worker device.

  Args:
    distribution: the DistributionStrategy object.
    fn: function to run (will be run once per replica, each in its own thread).
    args: positional arguments for `fn`
    kwargs: keyword arguments for `fn`.

  Returns:
    Merged return value of `fn` across all replicas.

  Raises:
    RuntimeError: If fn() calls get_replica_context().merge_call() a different
        number of times from the available devices.
  F)Zclean_stop_exception_typesNzNSome replicas made a different number of replica_context().merge_call() calls.c                 s       | ]}|j V  qd S r   )
merge_argsr   tr   r   r   	<genexpr>       z)_call_for_each_replica.<locals>.<genexpr>c                 s   rE   r   )merge_kwargsrG   r   r   r   rI      rJ   r   c                 s   rE   r   )main_resultrG   r   r   r   rI     rJ   )/r   r0   r   get_default_graphZswitch_to_thread_localr   ZCoordinatorr@   r)   r*   rD   rangelenr   Zmake_fn_MirroredReplicaThreadr   caching_scope_localZselect_replicaappendstartstop_on_exceptionshould_stop
should_runset
has_pausedwaitclearjoindoner(   anyRuntimeErrorZregrouptuplecaptured_name_scopecaptured_var_scopeupdatecaptured_control_deps
name_scopeZcontrol_dependenciesr   r=   merge_call_entered_in_eagermerge_fn	enumeratemerge_result)distributionr"   r   r    Zrun_concurrentlycoordZshared_variable_storedevicesrC   threadsindexvariable_creator_fnrH   Zall_doner\   rF   rK   Zmtt_captured_name_scopeZmtt_captured_var_scopeZmtt_captured_control_depsrh   rr   r   r   r+      s   





H


HH


   Hr+   c                       sT   e Zd ZdZ	d fdd	Zdd Zdd Zd	d
 Zdd Zdd Z	dd Z
  ZS )rP   z,A thread that runs() a function on a device.Nc                    s  t t|   || _|| _|| _|| _|j|| _	|| _
|| _|| _|	| _d | _d| _d | _d | _d | _d | _d | _d | _z
|j| _|j| _W n tyW   d | _d | _Y nw t | _t | _t   t }|! | _"| #  | $  t%&|j'| _(t)* | _+t),  t! | _-t)* | _.W d    n1 sw   Y  | j+j/d d  | _/t01 | _2| j+3 | _4| j4r|  j4d7  _4| jdkr| j4sd| _4|  j4d| j 7  _4|
| _5d S )NF/r    zreplica_%d/)6superrP   __init__rj   ri   rk   
replica_idr)   Z_get_replica_id_in_sync_groupreplica_id_in_sync_grouprn   main_fn	main_argsmain_kwargsrL   r\   rf   rF   rK   rh   r`   ra   new_cache_scope_countcaching_scope_enteredcache_scope_exited_countcaching_scope_exitedAttributeError	threadingEventrV   rX   r   Zensure_initializedr0   in_eager!record_thread_local_summary_state'record_thread_local_eager_context_stater   Z#TFE_ContextGetDevicePlacementPolicyZ_context_handlecontext_device_policyr   rM   graphZ
init_scope_init_in_eager_init_graphr9   r   get_variable_scope
_var_scopeget_name_scope_name_scope_thread_local_callables)selfdistrj   rt   rk   rn   r"   Zcaching_scoper   r    rC   ctxr3   r   r   rs   $  sh   










z_MirroredReplicaThread.__init__c                 C   sb  | j   | j   z| j rW | j  d S |   |   | 	  | j
d ur9| jd ur9| j
tj_| jtj_| j  t| j| j t| j| j| j t| j t| j| jv t| j| j  ] t!| j"G t#j#| j$| j dkd, t#%| j& | j'| j(i | j)| _*d| _+W d    n1 sw   Y  W d    n1 sw   Y  W d    n1 sw   Y  W d    n1 sw   Y  W d    n1 sw   Y  W d    n1 sw   Y  W d    n1 sw   Y  W d    n	1 sw   Y  W d    n1 sw   Y  W | j  d S W | j  d S | j  w )Nr   )ZreuseT),rV   rY   rZ   rj   rU   rX   rW   "restore_thread_local_summary_staterestore_thread_local_callable(restore_thread_local_eager_context_staterz   r|   r   rQ   ry   r{   rT   r<   r   r   r   r   r9   r   Zdevice_policyr   _MirroredReplicaContextri   ru   r   r   rk   rt   rd   r   r   r   Zvariable_creator_scopern   rv   rw   rx   rL   r\   r   r   r   r   runi  sv   








	
         z_MirroredReplicaThread.runc                 C   s*   t j}|j| _|j| _|j| _|j| _	dS )z.Record the thread local summary state in self.N)
r   _summary_statestep_summary_stepwriter_summary_writeris_recording_summary_recording"is_recording_distribution_strategy(_summary_recording_distribution_strategyr   Zsummary_stater   r   r   r        z8_MirroredReplicaThread.record_thread_local_summary_statec                 C   s*   t j}| j|_| j|_| j|_| j|_	dS )z-Restore thread local summary state from self.N)
r   r   r   r   r   r   r   r   r   r   r   r   r   r   r     r   z9_MirroredReplicaThread.restore_thread_local_summary_statec                 C   s   t   }|j}|j| _d S r   )r   _thread_local_dataop_callbacks_eager_context_op_callbacksr   r   Zeager_context_stater   r   r   r        z>_MirroredReplicaThread.record_thread_local_eager_context_statec                 C   s   t   }|j}| j|_d S r   )r   r   r   r   r   r   r   r   r     r   z?_MirroredReplicaThread.restore_thread_local_eager_context_statec                 C   s    | j r| j D ]}|  qd S d S r   )r   )r   r"   r   r   r   r     s
   
z4_MirroredReplicaThread.restore_thread_local_callabler   )r4   rA   rB   __doc__rs   r   r   r   r   r   r   __classcell__r   r   r   r   rP   !  s    E

rP   c                   @   s$   e Zd ZdZdd Zedd ZdS )r   z(ReplicaContext for synchronized replica.c                 C   s   t  }t|tsJ ||_||_||_|j |_	|j	r$| j	d7  _	t
 |_|j |_t  |_t |jkrAtd|j  |j  |j  |j rXt d|_|jS )a  `merge_call()` implementation for synchronized replica.

    This pauses the current replica thread and passes `fn` and its arguments to
    the main thread. The main thread will wait until all replicas pause, then
    invoke `fn` with grouped arguments. The current replica thread will continue
    after `fn` completes.

    See `_call_for_each_replica` for the logic in the main thread.

    Args:
      fn: a function that is called in cross replica context with grouped
        arguments from each replica. `fn` should returns grouped values.
      args: positional arguments to `fn`.
      kwargs: keyward arguments to `fn`.

    Returns:
      Return value of `fn` for the current replica.

    Raises:
      RuntimeError: when merge_call happens in a different graph, e.g. in a
        different tf.function, which is not supported now.
      _RequestedStop: when stop is requested.

    rp   aI  `merge_call` called while defining a new graph or a tf.function. This can often happen if the function `fn` passed to `strategy.run()` contains a nested `@tf.function`, and the nested `@tf.function` contains a synchronization point, such as aggregating gradients (e.g, optimizer.apply_gradients), or if the function `fn` uses a control flow statement which contains a synchronization point in the body. Such behaviors are not yet supported. Instead, please avoid nested `tf.function`s or control flow statements that may potentially cross a synchronization boundary, for example, wrap the `fn` passed to `strategy.run` or the entire `strategy.run` inside a `tf.function` or move the control flow out of `fn`. If you are subclassing a `tf.keras.Model`, please avoid decorating overridden methods `test_step` and `train_step` in `tf.function`.N)r~   current_threadr&   rP   rf   rF   rK   r   r   r`   r   r   ra   Z_current_control_dependenciesrc   r   r0   re   r   rM   r^   rX   rW   rV   rY   rZ   rj   rU   r@   rh   )r   r"   r   r    rH   r   r   r   _merge_call  s,   




z#_MirroredReplicaContext._merge_callc                 C   s   t |  | jjj| j gS r   )r   Zrequire_replica_contextZ	_strategyr)   Zworker_devices_by_replicaZ_replica_id_in_sync_groupr   r   r   r   rk     s
   
z_MirroredReplicaContext.devicesN)r4   rA   rB   r   r   propertyrk   r   r   r   r   r     s
    ]r   )NNr   )1r   
contextlibr~   r-   Ztensorflow.pythonr   Z tensorflow.python.autograph.corer   r6   Z tensorflow.python.autograph.implr   r5   Ztensorflow.python.distributer   r   r   Ztensorflow.python.eagerr   r	   Ztensorflow.python.frameworkr   r   r   Ztensorflow.python.opsr   r   Ztensorflow.python.platformr   r1   Ztensorflow.python.trainingr   Ztensorflow.python.utilr   r   r   r.   r,   contextmanagerr<   r=   r?   	Exceptionr@   rD   r+   ThreadrP   ZReplicaContextr   r   r   r   r   <module>   sD   
@
  