o
    ?ej                     @   sr  d Z ddlZddlZddlmZmZmZmZ ddlm	Z	 ddlm
Z ddlmZ ddlmZ ddlmZ dd	lmZ dd
lmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ dZ dd Z!dd Z"dd Z#G dd de$Z%G dd de$Z&ej'fdd Z(d!d" Z)d#d$ Z*d%d& Z+d'd( Z,d)d* Z-d+d, Z.d-d. Z/dS )/zUtilities for cross_device_ops.    N)CallableListOptionalUnion)collective_util)values)backprop_util)context)dtypes)indexed_slices)ops)tensor_spec)	array_ops)collective_ops)cond)math_ops)nccl_ops)resource_variable_ops)
tf_logging)cored   c                 C   sT   g }t |  D ]}dd |D }t|}|dd t ||D  qtt | }|S )z)Aggregate gradients using nccl allreduce.c                 S      g | ]\}}|qS  r   .0g_r   r   p/home/www/facesmatcher.com/pyenv/lib/python3.10/site-packages/tensorflow/python/distribute/cross_device_utils.py
<listcomp>-       z2aggregate_gradients_using_nccl.<locals>.<listcomp>c                 S      g | ]
\}\}}||fqS r   r   r   r   r   vr   r   r   r   0       )zipr   Zall_sumappendlist)replica_gradsZagg_all_g_and_vZsingle_g_and_vsingle_grads	agg_gradsr   r   r   aggregate_gradients_using_nccl)   s   
r*   c              
   C   sF  g }t | }|d }tt| D ]	\}}|| }|| | }||k r(d}	|}
n|}	d}
||	|	|  }t| |  t|dd\}}W d   n1 sNw   Y  ||
|
|  }t| |  t|dd\}}W d   n1 suw   Y  t| |  t||gdd\\}}}W d   n1 sw   Y  t| |  t|}W d   n1 sw   Y  t| |  t|}W d   n1 sw   Y  g }tt |D ]/}t| |  ||k ||k kr|}n|}|	t| W d   n	1 sw   Y  q|	dd t||D  qt
t| }|S )a  Aggregate gradients using hierarchical copies.

  Args:
    avail_devices: available GPU devices.
    replica_grads: List of lists of (gradient, variable) tuples. The outer list
      is over replicas. The inner list is over individual gradients.

  Returns:
    The list of (aggregated_gradient, variable), where the gradient has been
      summed across all replicas and the variable is chosen from the first
      replica.
     r   FNc                 S   r    r   r   r!   r   r   r   r      r#   z?aggregate_gradients_using_hierarchical_copy.<locals>.<listcomp>)len	enumerater$   r   device$aggregate_single_gradient_using_copyr   identityranger%   r&   )Zavail_devicesr'   r)   Znum_devices
group_sizeir(   Zgroup_0_main_deviceZgroup_1_main_deviceZgroup_0_beginZgroup_1_beginZgroup_0_device_gradsZgroup_0_agg_gradsr   Zgroup_1_device_gradsZgroup_1_agg_gradsZagg_total_gradsZgroup_0_agg_grads_bcastZgroup_1_agg_grads_bcastZagg_grads_bcastjZsrc_device_gradr   r   r   +aggregate_gradients_using_hierarchical_copy7   sh   


r5   c                 C   sz   dd | D }t |}|rt|dkrt|dt| }| d d }|r7ttt|}||f|fS ||fdfS )a&  Calculate the average gradient for a shared variable across all replicas.

  Note that this function provides a synchronization point across all replicas.

  Args:
    grad_and_vars: A list or tuple of (gradient, variable) tuples. Each
      (gradient, variable) pair within the outer list represents the gradient
      of the variable calculated for a single replica, and the number of pairs
      equals the number of replicas.
    use_mean: if True, mean is taken, else sum of gradients is taken.
    check_inf_nan: check grads for nans and infs.

  Returns:
    The tuple ([(average_gradient, variable),], has_nan_or_inf) where the
      gradient has been averaged across all replicas. The variable is chosen
      from the first replica. The has_nan_or_inf indicates the grads has nan or
      inf.
  c                 S   r   r   r   r   r   r   r   r      r   z8aggregate_single_gradient_using_copy.<locals>.<listcomp>   g      ?r   N)r   add_nr,   r   multiplyZlogical_notZ
reduce_all	is_finite)Zgrad_and_varsZuse_meanZcheck_inf_nanZgradsZgradr"   Zhas_nan_or_infr   r   r   r/      s   
r/   c                   @   s:   e Zd ZdZdddZdd Zdd Zd	d
 Zdd ZdS )CollectiveKeysa  Class that manages collective keys.

  We need to manage three different keys for collective:

  *Group key*: an integer key to identify the set of cooperative devices.
  Collective ops work under the same set of devices must using the same group
  key.

  *Instance key*: an integer key to identify the set of same counterpart of
  tensors on different devices in a device group that need to be all-reduced.

  This class is thread safe.
  r6   c                 C   s    || _ i | _t | _i | _dS )zaInitializes the object.

    Args:
      group_key_start: the starting integer of group key.
    N)
_group_key_instance_key_table	threadingLock_lock_known_groups)selfZgroup_key_startr   r   r   __init__   s   

zCollectiveKeys.__init__c                 C   sX   | j  d|}|| jvr| || j|< | j| W  d   S 1 s%w   Y  dS )a   Returns a group key for the list of local devices.

    The same group key is returned if the list of local devices is the same.

    Args:
      devices: a list of local canonical device strings in a collective group.

    Returns:
      a group key.
    ,N)r?   joinr@   _get_new_group_key)rA   devicesZdevices_keyr   r   r   get_group_key   s   

$zCollectiveKeys.get_group_keyc                 C   s:   | j }|  j d7  _ i | j|< |D ]	}t| j| |< q|S )aJ  Returns a new group key.

    The caller should store and reuse the same group key for the same set of
    devices. Calling this method always returns a new group key.

    This method is not thread-safe.

    Args:
      devices: a list of canonical device strings in a collective group.

    Returns:
      a new group key.
    r6   )r;   r<   INSTANCE_KEY_START_NUMBER)rA   rF   Znew_keyr.   r   r   r   rE      s   
z!CollectiveKeys._get_new_group_keyc                 C   s   | j 7 | j|d}|du rtd| d||vr%td| d| || }||  d7  < |W  d   S 1 s=w   Y  dS )a  Returns a new instance key for use in defining a collective op.

    You should call this once per each collective op of a collective instance.

    Args:
      group_key: the group key returned by get_group_key(). You should not
        assign the group key yourself.
      device: a canonical device string. It should be the device this collective
        op is on.

    Returns:
      a new instance key.

    Raises:
      ValueError: when the group key is invalid or the device is not in the
      group.
    NzGroup z is not found.zDevice z is not present in group r6   )r?   r<   get
ValueError)rA   	group_keyr.   groupr"   r   r   r   get_instance_key   s   $zCollectiveKeys.get_instance_keyc                 C   s"   t  }| j|_t| j||_|S N)r:   r;   copydeepcopyr<   )rA   memoZcopiedr   r   r   __deepcopy__	  s   zCollectiveKeys.__deepcopy__N)r6   )	__name__
__module____qualname____doc__rB   rG   rE   rM   rR   r   r   r   r   r:      s    
r:   c                
   @   sN  e Zd ZdZdZdZdedededede	j
f
dd	Zd
eejejf fddZdd Zdd Zdd Zdd Zdd Z		d'dejd
eeejejf  dee	j
 dejfddZdejdee	j
 dejfddZ	d(deeej  dee	j
 dejfdd Z	d(dejd!ejdee	j
 dejfd"d#Z	d(d$ejdee	j
 dejfd%d&Z dS ))CollectiveReplicaLauncherz"Launch collectives on one replica.TrK   r2   collective_keysr.   optionsc              	   C   s   || _ || _|| _|| _|| _|  rMt , t| t	
d| _W d    n1 s.w   Y  W d    d S W d    d S 1 sFw   Y  d S d | _d S )Ng        )r;   _group_size_collective_keys_device_options_use_ordering_tokenr   Z
init_scoper.   r   ZResourceVariable_ordering_token)rA   rK   r2   rX   r.   rY   r   r   r   rB     s   P
z"CollectiveReplicaLauncher.__init__control_inputc                 C   s$   |d ur|   st|gS t S rN   )r^   r   control_dependenciesZNullContextmanager)rA   r`   r   r   r   _control_input&  s   z(CollectiveReplicaLauncher._control_inputc                 C      t  sdS tjS NF)r   #executing_eagerly_outside_functionsrW   _prefer_unique_instance_keyrA   r   r   r   _use_unique_instance_key,  s   z2CollectiveReplicaLauncher._use_unique_instance_keyc                 C   rc   rd   )r   re   rW   _prefer_ordering_tokenrg   r   r   r   r^   1  s   z-CollectiveReplicaLauncher._use_ordering_tokenc                 C   s   |   rht }t|ddr|j}t|ddst s@|jr@|  |	| j
tg tjW  d   S 1 s9w   Y  dS | j| j| j}td tj|tjdW  d   S 1 saw   Y  dS | j| j| jS )zReturns the next instance key.Zis_control_flow_graphFNzCPU:0)dtype)rh   r   Zget_default_graphgetattrZouter_graphr	   executing_eagerlyZbuilding_functionZ
as_defaultZcapture_call_time_value_next_instance_keyr   Z
TensorSpecr
   int32r[   rM   r;   r\   r.   Zconvert_to_tensor)rA   graphinstance_keyr   r   r   rm   8  s(   
$	$
z,CollectiveReplicaLauncher._next_instance_keyc                 C   s   |   r| jjS d S rN   )r^   r_   handlerg   r   r   r   _get_ordering_tokenW  s   z-CollectiveReplicaLauncher._get_ordering_tokenc                 C   s   |   S )z0Whether this launcher can order NCCL operations.)r^   rg   r   r   r   can_order_nccl[  s   z(CollectiveReplicaLauncher.can_order_ncclNinput_tensorreturnc                 C   s   |   }| j|}|  }t| j6 | |  tj	|| j
| j||jj|j|dW  d   W  d   S 1 s=w   Y  W d   dS 1 sMw   Y  dS )a  All-reduce a dense tensor.

    Args:
      input_tensor: a dense tensor. It must have the same shape on all replicas.
      control_input: if not None, add control edges between control_input and
        the all-reduce.
      options: an optional tf.distribute.experimental.CommunicationOptions. If
        provided, it overrides the default options.

    Returns:
      The reduced tensor.
    Zcommunication_hinttimeoutordering_tokenN)rm   r]   mergerr   r   r.   r\   rb   r   Zall_reduce_v2rZ   r;   implementationvaluetimeout_seconds)rA   rt   r`   rY   rp   rx   r   r   r   
all_reduce_  s    Rz$CollectiveReplicaLauncher.all_reducec              
   C   sn   |   }| j|}|  }t| j tj|| j	| j
||jj|j|dW  d   S 1 s0w   Y  dS )a&  All-gather a dense tensor.

    Args:
      input_tensor: a dense tensor. It must have the same shape on all replicas.
      options: an optional tf.distribute.experimental.CommunicationOptions. If
        provided, it overrides the default options.

    Returns:
      The reduced tensor.
    rv   N)rm   r]   ry   rr   r   r.   r\   r   Zall_gather_v2rZ   r;   rz   r{   r|   )rA   rt   rY   rp   rx   r   r   r   _all_gather~  s   $z%CollectiveReplicaLauncher._all_gatherinput_tensor_packsc              	   C   s
  | j |}g }|D ]x}t r |D ]}|| |d| qq
t| jT dd |D }dd |D }|j	t
jjkrC|rC|d }nd}| tj|dd||}	dd |D }
tj|	|
dd}t||D ]\}}|t|| qeW d   n1 s}w   Y  q
|S )	a  Batch all-reduce dense tensors.

    This takes a list of batches of tensors. Using multiple batches have the
    benefit that it doesn't need to wait for all inputs to be ready to start the
    all-reduce.

    Args:
      input_tensor_packs: a list of lists of dense tensors.
      options: an optional tf.distribute.experimental.CommunicationOptions. If
        provided, it overrides the default options.

    Returns:
      A flat list of reduced tensors.
    Nc                 S   s   g | ]	}t |d gqS ))r   reshaper   tr   r   r   r     s    z>CollectiveReplicaLauncher.batch_all_reduce.<locals>.<listcomp>c                 S      g | ]}t |qS r   )r   shaper   r   r   r   r         r   r   axisc                 S   r   r   )r   Zreduce_prod)r   sr   r   r   r     r   )r]   ry   r	   rl   r%   r}   r   r.   r\   rz   r   CommunicationImplementationNCCLr   concatsplitr$   r   )rA   r   rY   Zoutputspackrt   Zflat_tensorsZshapesr`   Zreducednum_elementsZflat_outputsr   Zflat_outputr   r   r   batch_all_reduce  s8   
z*CollectiveReplicaLauncher.batch_all_reducer   c                 C   s  t  rtdt| j tt|g tj	|gt
|t
|d t|fdd}tj||d}| tjt|dd|}|dddf }t
|}t||}	| |	|}
g }t| jD ]}|| }||
||||    qdt	|d}tj	t
d|d dgt
|d t|fdd}tj||dW  d   W  d   S 1 sw   Y  W d   dS 1 sw   Y  dS )aJ  All-gather a dense tensor.

    This method must be called inside a tf.function.

    Args:
      input_tensor: a dense tensor. It must have the same rank on all replicas,
        and dimensions other than `axis` need to be the same as well.
      axis: 0-D int32 Tensor. Dimension along which to gather. Must be in the
        range [0, rank(value)).
      options: an optional tf.distribute.experimental.CommunicationOptions. If
        provided, it overrides the default options.

    Returns:
      The gathered Tensor.

    Raises:
      RuntimeError: if called in eager mode.
    z*all_gather is not supported in eager mode.r6   r   r   )permN)r	   rl   RuntimeErrorr   r.   r\   ra   r   r0   r   r   r1   rankZ	transposer~   Zexpand_dims_v2shape_v2
reduce_max	_pad_utilrZ   r%   )rA   rt   r   rY   Zperm_preZinput_tensor_tZgathered_shapeZ
first_dimsfull_axis_dimpadded_input_tensorZgather_padded_out_tensorsplit_tensorsr3   	start_posZout_tensor_tZ
perm_afterr   r   r   
all_gather  sF   


Rz$CollectiveReplicaLauncher.all_gatherinput_slicesc              	      s   j tjZ dttjtt	j
 gtjf dtjffdd tj}|dtjdtt	j
 dtjffddtttt fd	d
 fdd
W  d   S 1 siw   Y  dS )a9  All-reduce an IndexedSlices.

    This method can be called outside  tf.function.

    Args:
      input_slices: an IndexedSlices.
      options: an optional tf.distribute.experimental.CommunicationOptions. If
        provided, it overrides the default options.

    Returns:
      The reduced IndexedSlices.
    all_gather_fnru   c                    sn   |  j }jtjjkr|g}ng }t| |  j}W d   n1 s)w   Y  tj	|| j
dS )z/Use all_gather_fn to aggregate `IndexedSlices`.N)r   indicesdense_shape)r   rz   r   r   r   r   ra   r   r   IndexedSlicesr   )r   Z
all_valuescontrolZall_indices)r   rY   r   r   all_gather_indexed_slices   s   zVCollectiveReplicaLauncher.all_reduce_indexed_slices.<locals>.all_gather_indexed_slicesrt   rY   c                    sb   t  }t| |}||}g }tjD ]}|| }|||| |    qt|dS )z4all_gather tensors of different sizes using padding.r   )	r   r   r   r~   r1   rZ   r%   r   r   )rt   rY   
max_lengthZpadded_tensorZall_padded_tensorsr   r3   r   )all_lengthsrA   r   r   all_gather_with_padding6  s   


zTCollectiveReplicaLauncher.all_reduce_indexed_slices.<locals>.all_gather_with_paddingc                      s
    j S rN   )r~   r   )r   rA   r   r   <lambda>H  s   
 zECollectiveReplicaLauncher.all_reduce_indexed_slices.<locals>.<lambda>c                      s    S rN   r   r   )r   r   r   r   r   I  s    N)r]   ry   r   r.   r\   r   r   
TensorLiker   r   OptionsTensorr   r   r   r   r   r~   r   r   equalr   Z
reduce_min)rA   r   rY   lengthr   )r   r   r   r   rY   rA   r   all_reduce_indexed_slices  s6   $z3CollectiveReplicaLauncher.all_reduce_indexed_slices)NNrN   )!rS   rT   rU   rV   rf   ri   intr:   strr   r   rB   r   r   r   r   Z	Operationrb   rh   r^   rm   rr   rs   r   r   r}   r~   r   r   r   r   r   r   r   r   r   r   rW     sz    




2
BrW   c                 C   s$   t dd | D rt| S || S )zGAggregate tensors using `accumulation_fn` and IndexedSlices via concat.c                 s       | ]	}t |tjV  qd S rN   
isinstancer   r   r   r"   r   r   r   	<genexpr>N  s    z6aggregate_tensors_or_indexed_slices.<locals>.<genexpr>)anyr   ZAggregateIndexedSlicesGradients)r   Zaccumulation_fnr   r   r   #aggregate_tensors_or_indexed_slicesL  s   
r   c                 C   s6   t | tjrt| } t| j| | j| jS | | S rN   )r   r   r   r   ZFlattenNestedIndexedSlicesr   r   r   )r{   nr   r   r   %divide_by_n_tensors_or_indexed_slicesT  s   
r   c                 C   s   t |> t| tjr.t| j}t| j}| j	dur$t| j	}nd}t|||}nt| }W d   |S W d   |S 1 sFw   Y  |S )z-Copies a tensor or IndexedSlices to a device.N)
r   r.   r   r   r   r   r0   r   r   r   )r{   r.   Zcopied_valuesZcopied_indicesZcopied_shaperesultr   r   r   'copy_tensor_or_indexed_slices_to_device]  s$   


r   c                 C   s4   t | tjrdS t | tjrtdd | jD S dS )NTc                 s   r   rN   r   r   r   r   r   r   r  s    
z$is_indexed_slices.<locals>.<genexpr>F)r   r   r   	value_libZDistributedValuesallr   )r{   r   r   r   is_indexed_slicesn  s   r   c                 C   s`   g }g }g }g }t | D ]\}}t|r|| || q|| || q||||fS )a  Split values into dense and sparse values.

  Args:
    values: a list of tensors or `PerReplica`s.

  Returns:
    Four lists:
      a list of dense values, a list of their indices in `values` and
      a list of sparse values, a list of their indices in `values`.
  )r-   r   r%   )r   Zdense_valuesZdense_indicesZsparse_valuesZsparse_indicesr3   r"   r   r   r   split_by_sparsityw  s   

r   c                 C   sl   d}| D ]
}|t |d 7 }qdg| }| D ]}|r3|d r3t| D ]\}}|| du s.J |||< q"q|S )zStitch values together according to their indices.

  Args:
    values_and_indices_list: a list of tuples of values and indices indicating
      the values and positions in the returned list.

  Returns:
    a stitched list of values.
  r   N)r,   r$   )Zvalues_and_indices_listr   Zvalues_and_indicesr   r"   r3   r   r   r   stitch_values  s   


r   c                 C   s   |dkr| gS g }d}| D ]4}|j  }|du r#td| | g  S ||jj }|r/||kr6|g  d}|d | ||7 }q|S )a  Groups `input_tensors` into chunks of `bytes_per_pack`.

  The method preserves the original order of `input_tensors`. The grouping is
  best effort, each pack could have more or less bytes than `bytes_per_pack`.
  It only groups values with known shape.

  Args:
    input_tensors: a list of Tensor.
    bytes_per_pack: an integer.

  Returns:
    A list of packs of Tensor. All values are grouped into one pack if
    `bytes_per_pack` is zero or any of the value has unknown shape.
  r   NzAnot packing values due to the unknown or inconsistent shape of %sr   )r   r   loggingwarningrj   sizer%   )Zinput_tensorsZbytes_per_packZpacksZlast_pack_sizer{   r   r   r   r   r   group_by_size  s&   



r   c                 C   s\   |t | d  }t | }d|gg}t j|t j|d dftjdgdd}t | |}|S )z?Pad the `input_tensor`'s first dimension to be `full_axis_dim`.r   r6   r+   )r   rj   r   )r   r   r   r   Zzerosr
   rn   pad)rt   r   Zmissing_axis_dimZtensor_rankZpaddings_axisZpaddingsr   r   r   r   r     s   

r   )0rV   rO   r=   typingr   r   r   r   Ztensorflow.python.distributer   r   r   Ztensorflow.python.eagerr   r	   Ztensorflow.python.frameworkr
   r   r   r   Ztensorflow.python.opsr   r   r   r   r   r   Ztensorflow.python.platformr   r   Ztensorflow.python.typesr   rH   r*   r5   r/   objectr:   rW   r7   r   r   r   r   r   r   r   r   r   r   r   r   <module>   sH   R$e  <		(