o
    ?e                     @   s  d Z ddlZddlZddlZddlZddlZddlZddl	Z	ddl
mZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ dd	lmZ dd
lmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlm Z  ddl!m"Z" ddl!m#Z# ddl!m$Z$ ddl%m&Z' ddl(m)Z) ddl*m+Z+ ddl,m-Z- dd Z.dd Z/	dTd d!Z0d"d# Z1d$d% Z2d&d' Z3dTd(d)Z4dTd*d+Z5dTd,d-Z6	.	dUd/d0Z7d1d2 Z8d3d4 Z9e+d5G d6d7 d7e:Z;e+d8G d9d: d:e;Z<d;d< Z=	=dVd>d?Z>G d@dA dAe:Z?dWdBdCZ@dXdDdEZAG dFdG dGe;ZBeBZCeDdHdIZEe+dJG dKdL dLeBZFe+dMG dNdO dOeBZGejHZIejHZHG dPdQ dQe;ZJdXdRdSZKdS )Yz?Classes for different algorithms of reduction and broadcasting.    N)
device_lib)collective_util)cross_device_utils)device_util)distribute_utils)	ps_values)reduce_util)
tpu_values)values)values_util)context)def_function)indexed_slices)kernels)ops)tensor)tensor_util)	array_ops)math_ops)resource_variable_ops)
tf_logging)nest)	tf_export)doc_controlsc                 C   s$   t | tjtjfrt| jS t| S )zChecks whether `destinations` is not empty.

  Args:
    destinations: a `DistributedValues`, variable, or string object.

  Returns:
    Boolean which is True if `destinations` is not empty.
  )
isinstancer   ZBaseResourceVariable
tensor_libTensorbooldevicedestinations r!   n/home/www/facesmatcher.com/pyenv/lib/python3.10/site-packages/tensorflow/python/distribute/cross_device_ops.pycheck_destinations4   s   

r#   c                 C   sH   t | tjtjtjtjt	j
tjfst| stdt| s"tddS )z5Validates the `destination` is one of expected types.zcdestinations must be one of a `DistributedValues` object, a tf.Variable object, or a device string.zdestinations can not be emptyN)r   	value_libDistributedValuesr   r   r   IndexedSlicesr   ZAggregatingVariablesixstring_typesr	   ZTPUMirroredVariabler   Zis_resource_variable
ValueErrorr#   r   r!   r!   r"   validate_destinationsE   s   r*   Tc                 C   s|   t |tjr
tdt|st|dkrtj|j	|j
dS | tjjkr'|S |dkr3td|| f t| t|||dS )z8Reduce a non-DistributedValue `value` to `destinations`.z^You are passing a `DistributedValues` to `reduce_non_distributed_value`, which is not allowed.r   )dtype   zOA non-DistributedValues value %s cannot be reduced with the given reduce op %s.canonicalize_devices)r   r$   r%   r)   r   Z
is_tf_typenpallZzerosshaper+   r   ReduceOpMEANr*   simple_broadcast)	reduce_opvaluer    Znum_replicas_in_graphr.   r!   r!   r"   reduce_non_distributed_valueT   s   r7   c                 C   sB   t | tjr| S t| st| } t| drt| fS t	d)z2Converts a single tensor into a PerReplica object.r   zZCannot convert `input_tensor` to a `PerReplica` object because it doesn't have device set.)
r   r$   r%   r   Z	is_tensorr   Zconvert_to_tensorhasattr
PerReplicar)   )Zinput_tensorr!   r!   r"   _make_tensor_into_per_replicau   s   


r:   c                 C   st   g }t | } t| t tfstd| D ]$}t|tstdt|dkr(tdt|d }|||d f q|S )z@Converts each tensor into a PerReplica object in the input list.z3`value_destination_pairs` should be a list or tuplez<Each element of `value_destination_pairs` should be a tuple.   zFEach element of `value_destination_pairs` should be a tuple of size 2.r   r,   )listr   tupler)   lenr:   append)value_destination_pairsresultpairper_replicar!   r!   r"   "_normalize_value_destination_pairs   s   
rD   c                 C   sJ   | sdS t | ttfsdS tdd | D sdS tdd | D s#dS dS )z,Validates value_destination_pairs are valid.Fc                 s   s    | ]}t |tV  qd S N)r   r=   ).0rB   r!   r!   r"   	<genexpr>       z4_validate_value_destination_pairs.<locals>.<genexpr>c                 s   s     | ]}t |d  tjV  qdS r   N)r   r$   r9   rF   vr!   r!   r"   rG      s    T)r   r<   r=   r0   )r@   r!   r!   r"   !_validate_value_destination_pairs   s   rL   c                 C   sb   t | tjr	| jS |rt | tjrt| fS t| jfS t | tjr*t	| fS t	| jfS rE   )
r   r$   r%   _devicesr'   r(   r   resolver   !canonicalize_without_job_and_task)r    r.   r!   r!   r"   get_devices_from   s   rP   c                 C   s$   | |u pt t| |t t||kS rE   )setrP   )leftrightr.   r!   r!   r"   _devices_match   s   rT   c                    sB   t  fddD sdS t  fdddd  D sdS dS )Nc                 3   s     | ]\}}t || V  qd S rE   rT   )rF   rK   dr-   r!   r"   rG      s
    

z%_all_devices_match.<locals>.<genexpr>Fc                 3   s(    | ]\}}t |d  d   V  qdS rI   rU   rF   rK   _r.   r@   r!   r"   rG      s
    
r,   T)r0   )r@   r.   r!   rY   r"   _all_devices_match   s   
rZ   Fc                 C   sZ   t ||}t|dkr|st| |d S g }|D ]}|t| | qtj|tjdS )z8Broadcast `value` to `destinations` using simple copies.r,   r   Z
wrap_class)	rP   r>   r   Z'copy_tensor_or_indexed_slices_to_devicer?   r   regroupr$   Mirrored)r6   r    always_mirroredr.   devicesZvalue_updatesrV   r!   r!   r"   r4      s   

r4   c              	   C   s   | j }|s	tdt|}t|D ttj% t	||}|t
jjkr-t||}n
|t
jjkr7tdW d   n1 sAw   Y  W d   |S W d   |S 1 sYw   Y  |S )z3Reduces the value by accumulation_fn and reduce_op.%`per_replica_value` must be non-emptyz.`reduce_op` must be Reduce.SUM or Reduce.MEAN.N)r
   r)   r>   r   r   r   device_policyDEVICE_PLACEMENT_SILENTr   Z#aggregate_tensors_or_indexed_slicesr   r2   r3   Z%divide_by_n_tensors_or_indexed_slicesZSUM)per_replica_valuereduce_to_deviceaccumulation_fnr5   
all_valuescountreducedr!   r!   r"   _simple_reduce   s0   
	
		ri   c              	   C   s   | j }|s	tdt|- ttj t||}W d   n1 s&w   Y  W d   |S W d   |S 1 s>w   Y  |S )zAConcatenate all values in the DistributedValues input and return.r`   N)	r
   r)   r   r   r   ra   rb   r   concat)rc   rd   axisrf   gatheredr!   r!   r"   _simple_gather   s   

rm   zdistribute.CrossDeviceOpsc                   @   s   e Zd ZdZdd Zedd ZdddZdd	d
Zdd Z	dddZ
dd Zejdd Zejdd Zejdd Zdd ZdS )CrossDeviceOpsa  Base class for cross-device reduction and broadcasting algorithms.

  The main purpose of this class is to be passed to
  `tf.distribute.MirroredStrategy` in order to choose among different cross
  device communication implementations. Prefer using the methods of
  `tf.distribute.Strategy` instead of the ones of this class.

  Implementations:
  * `tf.distribute.ReductionToOneDevice`
  * `tf.distribute.NcclAllReduce`
  * `tf.distribute.HierarchicalCopyAllReduce`
  c                 C   s
   d| _ d S NT)_canonicalize_devicesselfr!   r!   r"   __init__
  s   zCrossDeviceOps.__init__c                 C   s   dS )Nr,   r!   rq   r!   r!   r"   _num_between_graph_workers  s   z)CrossDeviceOps._num_between_graph_workersNc                 C   s   |du rt  }t|}t| | jdkrMt|jdkrMt||| jrMt	
|jd j
 t|jd }W d   n1 s?w   Y  tj|ftjdS |du rUt  }| ||||S )a  Reduce `per_replica_value` to `destinations`.

    See `tf.distribute.StrategyExtended.reduce_to`. This can only be called in
    the cross-replica context.

    Args:
      reduce_op: a `tf.distribute.ReduceOp` specifying how values should be
        combined.
      per_replica_value: a `tf.distribute.DistributedValues`, or a `tf.Tensor`
        like object.
      destinations: a `tf.distribute.DistributedValues`, a `tf.Variable`, a
        `tf.Tensor` alike object, or a device string. It specifies the devices
        to reduce to. To perform an all-reduce, pass the same to `value` and
        `destinations`. Note that if it's a `tf.Variable`, the value is reduced
        to the devices of that variable, and this method doesn't update the
        variable.
      options: a `tf.distribute.experimental.CommunicationOptions`. See
        `tf.distribute.experimental.CommunicationOptions` for details.

    Returns:
      A `tf.Tensor` or `tf.distribute.DistributedValues`.

    Raises:
      ValueError: if per_replica_value can't be converted to a
        `tf.distribute.DistributedValues` or if destinations is not a string,
        `tf.Variable` or `tf.distribute.DistributedValues`.
    Nr,   r   r[   )r   Optionsr:   r*   rt   r>   r
   rT   rp   r   r   r   identityr   r\   r$   r]   reduce_implementation)rr   r5   rc   r    optionsrK   r!   r!   r"   reduce  s,   zCrossDeviceOps.reducec                 C   s   t |tjr
td|du rt }t|}t| | jdkrWt	|j
dkrWt||| jrWt|j
d j t|j
d }W d   n1 sIw   Y  tj|ftjdS | ||||S )a>  Gather `per_replica_value` to `destinations`.

    Args:
      per_replica_value: a `tf.distribute.DistributedValues`, or a `tf.Tensor`
        like object.
      destinations: a `tf.distribute.DistributedValues`, a `tf.Variable`, a
        `tf.Tensor` alike object, or a device string. It specifies the devices
        to gather to. To perform an all-gather, pass the same to `value` and
        `destinations`. Note that if it's a `tf.Variable`, the value is gathered
        to the devices of that variable, and this method doesn't update the
        variable.
      axis: specifies the dimension to gather along within each replica's
        tensor.
      options: a `tf.distribute.experimental.CommunicationOptions`. See
        `tf.distribute.experimental.CommunicationOptions` for details.

    Returns:
      A `tf.Tensor` or `tf.distribute.DistributedValues`

    Raises:
      ValueError: if per_replica_value can't be converted to a
        `tf.distribute.DistributedValues` or if destinations is not a string,
        `tf.Variable` or `tf.distribute.DistributedValues`.
    z0gather/all_gather does not support IndexedSlicesNr,   r   r[   )r   r   r&   NotImplementedErrorr   ru   r:   r*   rt   r>   r
   rT   rp   r   r   r   rv   r   r\   r$   r]   _gather_implementation)rr   rc   r    rk   rx   rK   r!   r!   r"   _gatherC  s,   
zCrossDeviceOps._gatherc                 C      t d)a  Implementation of `gather` method of `tf.distribute.CrossDeviceOps`.

    Overriding this method is useful for subclass implementers.

    Args:
      per_replica_value: a `tf.distribute.DistributedValues`, or a `tf.Tensor`
        like object.
      destinations: a `tf.distribute.DistributedValues`, a `tf.Variable`, a
        `tf.Tensor` alike object, or a device string. It specifies the devices
        to gather to. To perform an all-gather, pass the same to `value` and
        `destinations`. Note that if it's a `tf.Variable`, the value is gathered
        to the devices of that variable, this method doesn't update the
        variable.
      axis: specifies the dimension to gather along within each replica's
        tensor.
      options: a `tf.distribute.experimental.CommunicationOptions`. See
        `tf.distribute.experimental.CommunicationOptions` for details.

    Returns:
      A `tf.Tensor` or `tf.distribute.DistributedValues`.

    Raises:
      ValueError: if per_replica_value can't be converted to a
        `tf.distribute.DistributedValues` or if destinations is not a string,
        `tf.Variable` or `tf.distribute.DistributedValues`.
    z2_gather method must be implemented in descendants.rz   rr   rc   r    rk   rx   r!   r!   r"   r{   q  s   z%CrossDeviceOps._gather_implementationc                 C   s   |du rt  }t|st|}|D ]\}}t| q| jdkr8t|| jr8t|d d j	dkr8dd |D S |du r@t  }| 
|||S )aB  Reduce values to destinations in batches.

    See `tf.distribute.StrategyExtended.batch_reduce_to`. This can only be
    called in the cross-replica context.

    Args:
      reduce_op: a `tf.distribute.ReduceOp` specifying how values should be
        combined.
      value_destination_pairs: a sequence of (value, destinations) pairs. See
        `tf.distribute.CrossDeviceOps.reduce` for descriptions.
      options: a `tf.distribute.experimental.CommunicationOptions`. See
        `tf.distribute.experimental.CommunicationOptions` for details.

    Returns:
      A list of `tf.Tensor` or `tf.distribute.DistributedValues`, one per pair
      in `value_destination_pairs`.

    Raises:
      ValueError: if `value_destination_pairs` is not an iterable of
        tuples of `tf.distribute.DistributedValues` and destinations.
    Nr,   r   c                 S   s"   g | ]\}}t j|jtjd qS r[   )r   r\   r
   r$   r]   rW   r!   r!   r"   
<listcomp>  s    z/CrossDeviceOps.batch_reduce.<locals>.<listcomp>)r   ru   rL   rD   r*   rt   rZ   rp   r>   r
   batch_reduce_implementation)rr   r5   r@   rx   rX   rV   r!   r!   r"   batch_reduce  s2   
zCrossDeviceOps.batch_reducec                 C   s   t | | ||S )aC  Broadcast `tensor` to `destinations`.

    This can only be called in the cross-replica context.

    Args:
      tensor: a `tf.Tensor` like object. The value to broadcast.
      destinations: a `tf.distribute.DistributedValues`, a `tf.Variable`, a
        `tf.Tensor` alike object, or a device string. It specifies the devices
        to broadcast to. Note that if it's a `tf.Variable`, the value is
        broadcasted to the devices of that variable, this method doesn't update
        the variable.

    Returns:
      A `tf.Tensor` or `tf.distribute.DistributedValues`.
    )r*   broadcast_implementationrr   r   r    r!   r!   r"   	broadcast  s   zCrossDeviceOps.broadcastc                 C   r}   )am  Implementation of `reduce`.

    Overriding this method is useful for subclass implementers.

    Args:
      reduce_op: a `tf.distribute.ReduceOp` specifying how values should be
        combined.
      per_replica_value: a `tf.distribute.DistributedValues`, or a `tf.Tensor`
        like object.
      destinations: a `tf.distribute.DistributedValues`, a `tf.Variable`, a
        `tf.Tensor` alike object, or a device string. It specifies the devices
        to reduce to. To perform an all-reduce, pass the same to `value` and
        `destinations`. Note that if it's a `tf.Variable`, the value is reduced
        to the devices of that variable, this method doesn't update the
        variable.
      options: a `tf.distribute.experimental.CommunicationOptions`. See
        `tf.distribute.experimental.CommunicationOptions` for details.

    Returns:
      A `tf.Tensor` or `tf.distribute.DistributedValues`.

    Raises:
      ValueError: if per_replica_value can't be converted to a
        `tf.distribute.DistributedValues` or if destinations is not a string,
        `tf.Variable` or `tf.distribute.DistributedValues`.
    z2_reduce method must be implemented in descendants.r~   rr   r5   rc   r    rx   r!   r!   r"   rw     s   z$CrossDeviceOps.reduce_implementationc                 C   r}   )a  Implementation of `batch_reduce`.

    Overriding this method is useful for subclass implementers.

    Args:
      reduce_op: a `tf.distribute.ReduceOp` specifying how values should be
        combined.
      value_destination_pairs: a sequence of (value, destinations) pairs. See
        `reduce` for descriptions.
      options: a `tf.distribute.experimental.CommunicationOptions`. See
        `tf.distribute.experimental.CommunicationOptions` for details.

    Returns:
      A list of `tf.Tensor` or `tf.distribute.DistributedValues`, one per pair
      in `value_destination_pairs`.

    Raises:
      ValueError: if `value_destination_pairs` is not an iterable of
        tuples of `tf.distribute.DistributedValues` and destinations.
    zFbatch_reduce_implementation method must be implemented in descendants.r~   rr   r5   r@   rx   r!   r!   r"   r     s   z*CrossDeviceOps.batch_reduce_implementationc                 C   s   t ||d| jdS )a  Implementation of `broadcast`.

    Args:
      tensor: a `tf.Tensor` like object. The value to broadcast.
      destinations: a `tf.distribute.DistributedValues`, a `tf.Variable`, a
        `tf.Tensor` alike object, or a device string. It specifies the devices
        to broadcast to.
        `destinations`. Note that if it's a `tf.Variable`, the value is
        broadcasted to the devices of that variable, this method doesn't update
        the variable.

    Returns:
      A `tf.Tensor` or `tf.distribute.DistributedValues`.
    T)r^   r.   )r4   rp   r   r!   r!   r"   r     s   z'CrossDeviceOps.broadcast_implementationc                 C   r}   )a  All-reduce the `value` across all replicas so that all get the result.

    `value` can be a nested structure of tensors or `IndexedSlices`. The
    implementation should generally batch the all-reduces when possible.
    `options` can be set to hint the batching behavior.

    This API must be called in a replica context.

    Args:
      reduce_op: A `tf.distribute.ReduceOp` value specifying how values should
        be combined.
      value: Value to be reduced. A tensor or a nested structure of tensors or
        `IndexedSlices`.
      replica_id: An interger indicating the id of the replica where this
        all_reduce is called under. This is the local replica id that ranges
        from 0 to len(local_devices) - 1.
      options: A `tf.distribute.experimental.CommunicationOptions`.

    Returns:
      A tensor/IndexedSlices or a nested strucutre of tensors/IndexedSlices with
      the reduced values. The structure is the same as `value`.
    z/_all_reduce must be implemented in descendants.r~   )rr   r5   r6   
replica_idrx   r!   r!   r"   _all_reduce+  s   zCrossDeviceOps._all_reducerE   )__name__
__module____qualname____doc__rs   propertyrt   ry   r|   r{   r   r   r   Zfor_subclass_implementersrw   r   r   r   r!   r!   r!   r"   rn      s"    


0.
1


rn   zdistribute.ReductionToOneDevicec                       s:   e Zd ZdZd fdd	Zdd Zdd Zd	d
 Z  ZS )ReductionToOneDevicea  A CrossDeviceOps implementation that copies values to one device to reduce.

  This implementation always copies values to one device to reduce them, then
  broadcast reduced values to the destinations. It doesn't support efficient
  batching.

  Here is how you can use `ReductionToOneDevice` in
  `tf.distribute.MirroredStrategy`:

  ```
    strategy = tf.distribute.MirroredStrategy(
      cross_device_ops=tf.distribute.ReductionToOneDevice())
  ```
  Nc                    s$   || _ |ptj| _tt|   dS )aH  Initializes with a device to reduce to and a way to accumulate.

    Args:
      reduce_to_device: the intermediate device to reduce to. If None, reduce
        to the first device in `destinations` of the `reduce` method.
      accumulation_fn: a function that does accumulation.  If None,
        `tf.math.add_n` is used.
    N)rd   r   Zadd_nre   superr   rs   )rr   rd   re   	__class__r!   r"   rs   V  s   	zReductionToOneDevice.__init__c                 C   sf   ~t |rt|| j}nt|| j}| jp|d }ttjd||f d t||| j|}| 	||S )Nr   z"Reduce to %s then broadcast to %r.
   )
r#   rP   rp   rd   logginglog_first_nINFOri   re   r   )rr   r5   rc   r    rx   r_   rd   rh   r!   r!   r"   rw   c  s   z*ReductionToOneDevice.reduce_implementationc                 C   sb   ~t |rt|| j}nt|| j}| jp|d }ttjd||f d t|||}| ||S )Nr   z"Gather to %s then broadcast to %r.r   )	r#   rP   rp   rd   r   r   r   rm   r   )rr   rc   r    rk   rx   r_   rd   rl   r!   r!   r"   r{   r  s   z+ReductionToOneDevice._gather_implementationc                    s    fdd|D S )Nc                    s"   g | ]\}}j || d qS ))r    rx   rw   )rF   trK   rx   r5   rr   r!   r"   r     s    zDReductionToOneDevice.batch_reduce_implementation.<locals>.<listcomp>r!   r   r!   r   r"   r     s   z0ReductionToOneDevice.batch_reduce_implementation)NN)	r   r   r   r   rs   rw   r{   r   __classcell__r!   r!   r   r"   r   E  s    r   c                 C   sb   | d j }dd tt|D }| D ]}t|jD ]\}}|j |ks$J || |df qq|S )ae  Group values into sublists by their devices.

  This grouping is needed to call the all-reduce library because it expects a
  list of the following form:
    [[(grad0_gpu0, v0_gpu0), (grad1_gpu0, v1_gpu0), (grad2_gpu0, v2_gpu0) ...],
     [(grad0_gpu1, v0_gpu1), (grad1_gpu1, v1_gpu1), (grad2_gpu1, v2_gpu1) ...],
     [(grad0_gpu2, v0_gpu2), (grad1_gpu0, v1_gpu2), (grad2_gpu0, v2_gpu2) ...],
     ...
    ]

  Args:
    per_replica_values: a list of PerReplica objects.

  Returns:
    a list of lists, each sublist has components for its corresponding device of
      PerReplica objects, paired with a None.
  r   c                 S      g | ]}g qS r!   r!   rF   rX   r!   r!   r"   r         z*_group_value_by_device.<locals>.<listcomp>N)rM   ranger>   	enumerater
   r?   )per_replica_valuesr    groupedrc   irK   r!   r!   r"   _group_value_by_device  s   
r   r,   c           
   
   C   s   t t|| }dd tt | d D }| D ]:}t|D ]3\}\}}	|tjjkrIt|j || 	||  W d   n1 sCw   Y  q|| 	| qqdd |D S )a  Ungroup results from all-reduce and make Mirrored objects.

  Each all-reduce result will be divided by the number of destinations before
  Mirrored objects are created if reduce_op is "mean".

  Args:
    grouped_reduced: a list of lists, each sublist has components for each
      device, paired with a None. It is the result from
      cross_device_utils.aggregate_gradients_using*.
    destinations: a value to colocate the result with.
    reduce_op: Indicates how values will be aggregated. Accepted values
      are `tf.distribute.ReduceOp.SUM`, `tf.distribute.ReduceOp.MEAN`.
    num_between_graph_workers: number of workers in the between-graph
      replication.

  Returns:
    a list of Mirrored objects.
  c                 S   r   r!   r!   r   r!   r!   r"   r     r   z._ungroup_and_make_mirrored.<locals>.<listcomp>r   Nc                 S   s   g | ]
}t j|tjd qS r   )r   r\   r$   r]   rJ   r!   r!   r"   r     s
    
)
r>   rP   r   r   r   r2   r3   r   r   r?   )
Zgrouped_reducedr    r5   Znum_between_graph_workersZnum_replicasindexZper_replica_reducedr   rK   rX   r!   r!   r"   _ungroup_and_make_mirrored  s   r   c                   @   s*   e Zd ZdZd
ddZdd Zdd Zd	S )_ConcatAndSplitPackerz,Concatenate and split tensors for reduction.r,   c                 C   s   |dkrt d|| _dS )zInitialize the _ConcatAndSplitPacker object.

    Args:
      num_packs: specifies the number of split packs that will be
        formed.

    Raises:
      ValueError: if num_packs is not greater than 0.
    r   z$num_packs must be greater than zero.N)r)   	num_packsrr   r   r!   r!   r"   rs     s   

z_ConcatAndSplitPacker.__init__c              	   C   s(  || _ g | _g | _g }|D ]}t|d d q dd |D }dd |D }dd |D }t|d}| j}tdd |D rJt	dd |D }	nt
|}	|	| }
|	|
|d	   }|
g|d	  |g }t||}|t|d
g|  | j| | j| W d
   n1 sw   Y  q|S )zPack tensors.r   c                 S   s   g | ]\}}t |d gqS )r   ZreshaperF   grX   r!   r!   r"   r     s    z._ConcatAndSplitPacker.pack.<locals>.<listcomp>c                 S      g | ]	\}}t |qS r!   )r   r1   r   r!   r!   r"   r         c                 S   r   r!   )r   sizer   r!   r!   r"   r     r   c                 s   s    | ]
\}}|j  V  qd S rE   )r1   Zis_fully_definedr   r!   r!   r"   rG     s    z-_ConcatAndSplitPacker.pack.<locals>.<genexpr>c                 S   s   g | ]	\}}|j  qS r!   )r1   Znum_elementsr   r!   r!   r"   r     r   r,   N)grouped_grads_and_varsall_device_shapesall_device_sizesr   colocate_withr   rj   r   r0   sumr   splitr?   zip)rr   r   device_grad_packsdevice_grads_and_varsZ
flat_gradsdevice_shapesdevice_sizesZconcat_gradsZ
num_splitsZtotal_grad_sizeZ
split_sizeZsplit_size_lastZsplit_sizesZ
grad_packsr!   r!   r"   pack  s8   
*z_ConcatAndSplitPacker.packc              	   C   s   g }t || j| j| jD ]K\}}}}t|d d 4 dd |D }t|d}t||}dd t ||D }	dd t |	|D }
|	|
 W d   n1 sRw   Y  q|S )zReverse the pack.r   c                 S   s   g | ]\}}|qS r!   r!   r   r!   r!   r"   r         z0_ConcatAndSplitPacker.unpack.<locals>.<listcomp>c                 S   s   g | ]
\}}t ||qS r!   r   )rF   r1   Zgradr!   r!   r"   r   "  s    
c                 S   s   g | ]
\}\}}||fqS r!   r!   )rF   r   rX   rK   r!   r!   r"   r   (  s    N)
r   r   r   r   r   r   r   rj   r   r?   )rr   Zsummed_device_grad_packsZaggregated_device_gradsr   r   r   r   Zdevice_grads_concatZgrads_with_sizesZgrads_with_shapesZsummed_device_gradsr!   r!   r"   unpack  s.   z_ConcatAndSplitPacker.unpackNr,   )r   r   r   r   rs   r   r   r!   r!   r!   r"   r     s
    
4r   c                 C   s2   |dkrt |}|| }||fS d}| }||fS )zPack tensors if specified.r   N)r   r   )Zdevice_gradsr   tensor_packerr   r!   r!   r"   _pack_tensors0  s   
r   c                 C   s   |r| | S | S )z4Unpack tensors if they are packed before all-reduce.)r   )rh   r   r!   r!   r"   _unpack_tensors;  s   
r   c                       sR   e Zd ZdZd fdd	Zdd Zdd	 Zd
d Zdd Zdd Z	dd Z
  ZS )AllReduceCrossDeviceOpsaI  All-reduce implementation of CrossDeviceOps.

  It performs all-reduce when applicable using NCCL or hierarchical copy. For
  the batch API, tensors will be repacked or aggregated for more efficient
  cross-device transportation.

  For reduces that are not all-reduce, it falls back to
  `tf.distribute.ReductionToOneDevice`.
  ncclr,   c                    s&   || _ || _t | _tt|   dS )a  Initializes the object.

    Args:
      all_reduce_alg: the all-reduce algorithm to use, currently only "nccl" or
        "hierarchical_copy" are supported.
      num_packs: a non-negative integer. The number of packs to split values
        into. If zero, no packing will be done.
    N)_all_reduce_alg
_num_packsr   _simple_cross_replica_opsr   r   rs   )rr   all_reduce_algr   r   r!   r"   rs   M  s   	z AllReduceCrossDeviceOps.__init__c                 C   sD   ~t ||rtdd t|D s| ||gd S | j|||S )Nc                 s   s    | ]	}d |  v V  qdS )cpuNlowerrF   rV   r!   r!   r"   rG   a      z@AllReduceCrossDeviceOps.reduce_implementation.<locals>.<genexpr>r   )rT   anyrP   _batch_all_reducer   ry   r   r!   r!   r"   rw   [  s   

z-AllReduceCrossDeviceOps.reduce_implementationc                    s4   t |rdd |D S  fdd|D S )Nc                 S      g | ]}|d  qS r   r!   rJ   r!   r!   r"   r   k  r   zGAllReduceCrossDeviceOps.batch_reduce_implementation.<locals>.<listcomp>c                        g | ]\}} || qS r!   r   rF   r6   destr   r!   r"   r   m      )rZ   r   r   r!   r   r"   r   g  s   z3AllReduceCrossDeviceOps.batch_reduce_implementationc           	      C   sT   t |\}}}}|r| ||}ng }|r| ||}ng }t ||f||ffS )z All-reduce algorithm in a batch.)r   split_by_sparsity_do_batch_all_reduce_do_batch_all_reduce_sparsestitch_values)	rr   r5   r   dense_valuesdense_indicessparse_valuessparse_indicesdense_resultssparse_resultsr!   r!   r"   r   r  s   

z)AllReduceCrossDeviceOps._batch_all_reducec                 C   s   t t jdt|| j| jf d |d j}t|}t|| j\}}| jdkr-t	
|}nt	||}t||}t||d |S )zRun batch all-reduces.zDbatch_all_reduce: %d all-reduces with algorithm = %s, num_packs = %dr   r   r   )r   r   r   r>   r   r   rM   r   r   r   Zaggregate_gradients_using_ncclZ+aggregate_gradients_using_hierarchical_copyr   r   )rr   r5   r   r    r   r   r   rh   r!   r!   r"   r     s(   


z,AllReduceCrossDeviceOps._do_batch_all_reducec                 C   s,   t t jdt| d | j|t||S )z'Run batch all-reduce for sparse values.z9Efficient allreduce is not supported for %d IndexedSlicesr   )r   r   WARNr>   r   r   r   )rr   r5   r   r!   r!   r"   r     s   
z3AllReduceCrossDeviceOps._do_batch_all_reduce_sparsec                 C   s"   t t jdd t ||||S )Nzgather/all_gather with NCCL or HierarchicalCopy is not supported. Falling back to gather on one device and then broadcast. We're working on a more efficient implementation.   )r   r   r   r   r|   r   r!   r!   r"   r{     s   z.AllReduceCrossDeviceOps._gather_implementation)r   r,   )r   r   r   r   rs   rw   r   r   r   r   r{   r   r!   r!   r   r"   r   B  s    
r   AllReduceSpecTuplezalg shards limitzdistribute.NcclAllReducec                       "   e Zd ZdZd fdd	Z  ZS )NcclAllReducea  NCCL all-reduce implementation of CrossDeviceOps.

  It uses Nvidia NCCL for all-reduce. For the batch API, tensors will be
  repacked or aggregated for more efficient cross-device transportation.

  For reduces that are not all-reduce, it falls back to
  `tf.distribute.ReductionToOneDevice`.

  Here is how you can use `NcclAllReduce` in `tf.distribute.MirroredStrategy`:


  ```
    strategy = tf.distribute.MirroredStrategy(
      cross_device_ops=tf.distribute.NcclAllReduce())
  ```
  r,   c                    .   |dk rt d|tt| jd|d dS )zInitializes the object.

    Args:
      num_packs: a non-negative integer. The number of packs to split values
        into. If zero, no packing will be done.

    Raises:
      ValueError: if `num_packs` is negative.
    r   z<NCCL all-reduce requires num_packs >= 0, but {} is specifiedr   r   r   N)r)   formatr   r   rs   r   r   r!   r"   rs     s   


zNcclAllReduce.__init__r   r   r   r   r   rs   r   r!   r!   r   r"   r     s    r   z$distribute.HierarchicalCopyAllReducec                       r   )HierarchicalCopyAllReduceaA  Hierarchical copy all-reduce implementation of CrossDeviceOps.

  It reduces to one GPU along edges in some hierarchy and broadcasts back to
  each GPU along the same path. For the batch API, tensors will be repacked or
  aggregated for more efficient cross-device transportation.

  This is a reduction created for Nvidia DGX-1 which assumes GPUs connects like
  that on DGX-1 machine. If you have different GPU inter-connections, it is
  likely that it would be slower than `tf.distribute.ReductionToOneDevice`.

  For reduces that are not all-reduce, it falls back to
  `tf.distribute.ReductionToOneDevice`.

  Here is how you can use `HierarchicalCopyAllReduce` in
  `tf.distribute.MirroredStrategy`:

  ```
    strategy = tf.distribute.MirroredStrategy(
      cross_device_ops=tf.distribute.HierarchicalCopyAllReduce())
  ```
  r,   c                    r   )zInitializes the object.

    Args:
      num_packs: a non-negative integer. The number of packs to split values
        into. If zero, no packing will be done.

    Raises:
      ValueError if `num_packs` is negative.
    r   z=HierarchicalCopy requires num_packs >= 0, but {} is specifiedZhierarchical_copyr   N)r)   r   r   r   rs   r   r   r!   r"   rs     s   


z"HierarchicalCopyAllReduce.__init__r   r   r!   r!   r   r"   r     s    r   c                       sj   e Zd ZdZ		d fdd	Zedd Zdd	 Zd
d Zdd Z	dd Z
dd Zdd Zdd Z  ZS )CollectiveAllReducezAll-reduce cross device ops using collective ops.

  In the between-graph replicated training, it will still do all-reduces across
  all workers and then put results on the right destinations.
  NTc           	         s   |t | dkrtd|| _|| _|pt | _t | _	|r+t
dd |D | _n
t
dd |D | _| j| j}g | _d| _| jD ]}t||| j||}| j| | s^d| _qEtt|   || _dS )	a  Initializes the object.

    Args:
      devices: a list of device strings to run collectives on.
      group_size: the global group size. For between-graph replicated training
        it's the total number of devices across all workers.
      options: a `tf.distribute.experimental.CommunicationOptions`.
      collective_keys: an optional CollectiveKey object.
      canonicalize_devices: Whether to canonicalize devices for workers or not.
    r   z6group_size must be divisible by the number of devices.c                 s       | ]}t |V  qd S rE   r   canonicalizer   r!   r!   r"   rG   C  rH   z/CollectiveAllReduce.__init__.<locals>.<genexpr>c                 s   r   rE   )r   rO   r   r!   r!   r"   rG   E  s    

FTN)r>   r)   _group_size_optionsr   ZCollectiveKeys_collective_keys	threadingLock_lockr=   rM   Zget_group_key
_launchers_limited_ncclZCollectiveReplicaLauncherr?   Zcan_order_ncclr   r   rs   rp   )	rr   r_   Z
group_sizerx   collective_keysr.   Z	group_keyr   launcherr   r!   r"   rs     s4   



zCollectiveAllReduce.__init__c                 C   s   | j t| j S rE   )r   r>   rM   rq   r!   r!   r"   rt   W  s   z.CollectiveAllReduce._num_between_graph_workersc              	   C   s  t |}| jr |jtjjkr t|dkr |tj	tjj
d}| j| }t|\}}}	}
g }g }|r|  t||j}t sZ|dkrZtdt|t| j| j|jt| |||}|tjjkrt|D ]#\}}t| j|  || j ||< W d   n1 sw   Y  qj|  |	rt s|dkrtdt|	t| j| j|j |	D ]}|||| q|tjjkrt|D ]2\}}t| j|  t j!|| j"| j || j#|| j$d||< W d   n1 sw   Y  qt%||f||
ff}t &||S )z%Implements CrossDeviceOps.all_reduce.r,   implementationr   zuCollective all_reduce tensors: %d all_reduces, num_devices = %d, group_size = %d, implementation = %s, num_packs = %dNzjCollective all_reduce IndexedSlices: %d all_reduces, num_devices =%d, group_size = %d, implementation = %s)r
   indicesdense_shape)'r   flattenr   r   r   CommunicationImplementationNCCLr>   mergeru   RINGr   r   r   reverseZgroup_by_sizeZbytes_per_packr   executing_eagerlyr   infor   Zbatch_all_reducer   r2   r3   r   r   r   rM   r?   Zall_reduce_indexed_slicesr   r&   r
   r   r   r   Zpack_sequence_as)rr   r5   r6   r   rx   Zflat_valuesr   r   r   r   r   r   r   Zpacksr   rK   Zindexed_sliceZflat_resultsr!   r!   r"   r   \  sx   






zCollectiveAllReduce._all_reducec              
      s4  dd j D tj }|D ]}t|D ]}| |j|  qqt r[ fdd}j tj	
tj }||tt|}	|  W d   n1 sUw   Y  n)g }	j t|D ]}|	| |  qeW d   n1 sw   Y  g }
t|	 D ]}|
tj|tjd q|
S )z'All reduce a list of per_replica_value.c                 S   r   r!   r!   r   r!   r!   r"   r     r   zFCollectiveAllReduce._all_reduce_per_replica_values.<locals>.<listcomp>c                    s@   t   |  |  W  d    S 1 sw   Y  d S rE   )r   Z
eager_moder   )Z	device_idrx   r5   rr   Zvalues_by_devicer!   r"   	thread_fn  s
   
$zECollectiveAllReduce._all_reduce_per_replica_values.<locals>.thread_fnNr[   )rM   r>   r   r?   r
   r   r  r   multiprocessingpoolZ
ThreadPoolmapr<   closer   r   r   r\   r$   r]   )rr   r5   r   rx   Znum_devicesrC   r   r  r  Zoutputs_by_devicerA   r
   r!   r  r"   _all_reduce_per_replica_values  s8   

z2CollectiveAllReduce._all_reduce_per_replica_valuesc           
   
   C   s  t   | ||g|d }t|| j}t||| jr|S t|tjs)t|g}g }t	
|j@ |D ]5}t	|& |jD ]}	|	j|krP|t|	  qZq?|t|j W d    n1 sdw   Y  q4W d    n1 stw   Y  tj|tjdS Nr   r[   )r   mark_as_unsaveabler
  rP   rp   rT   r   r$   r]   r   control_dependenciesr
   r   r?   r   rv   _primaryr   r\   )
rr   r5   rc   r    rx   Zall_reducedr_   r   rV   rK   r!   r!   r"   rw     s<   

z)CollectiveAllReduce.reduce_implementationc                    sZ   t   t|j}|rdd |D  S |s"ttjdd  fdd|D S )Nc                 S   r   r   r!   rJ   r!   r!   r"   r     r   zCCollectiveAllReduce.batch_reduce_implementation.<locals>.<listcomp>zFEfficient batch_reduce is not supported if destinations are different.r   c                    r   r!   r   r   r   r!   r"   r     r   )r   r  rZ   rp   r
  r   r   r   )rr   r5   r@   rx   Zall_devices_matchr!   r   r"   r     s    z/CollectiveAllReduce.batch_reduce_implementationc           
   
   C   s  |  |g||d }t  t|| j}t||| jr|S t|tjs)t|g}g }t	
|j@ |D ]5}t	|& |jD ]}	|	j|krP|t|	  qZ|t|j q?W d    n1 sdw   Y  q4W d    n1 stw   Y  tj|tjdS r  )_batch_all_gatherr   r  rP   rp   rT   r   r$   r]   r   r  r
   r   r?   r   rv   r  r   r\   )
rr   rc   r    rk   rx   Zall_gatheredr_   r   rV   rK   r!   r!   r"   r{     s0   

	z*CollectiveAllReduce._gather_implementationc           	         s   t }jrjtjjkr|dkrtjtjjdt	
t	jd|t jjjf d  fdd}t rDt| }n| }g }|D ]}|tj|tjd qK|S )z'all gather multiple per-replica-values.r,   r   zeCollective batch_all_gather: %d all-gathers, num_devices = %d, group_size = %d, implementation = %s, r   c                     s   g } j L td. D ]#}g }ttjD ]}|j| |j	|   q| | qW d    n1 s<w   Y  W d    | S W d    | S 1 sTw   Y  | S )NZ	allgather)
r   r   Z
name_scoper   r>   rM   r?   r   Z
all_gatherr
   )gathered_valuesrC   Zoutputsr   rk   rx   r   rr   r!   r"   compute_gathered_values-  s    (zFCollectiveAllReduce._batch_all_gather.<locals>.compute_gathered_valuesr[   )r>   r   r   r   r   r   r   ru   r   r   r   r   rM   r   r   r  r   functionr?   r   r\   r$   r]   )	rr   r   rk   rx   Z
batch_sizer  r  mirroredr6   r!   r  r"   r    s>   
z%CollectiveAllReduce._batch_all_gatherc                 C   s&   t | j|}t| j| j| j|| jS rE   )copydeepcopyr   r   rM   r   r   rp   )rr   memor   r!   r!   r"   __deepcopy__C  s   z CollectiveAllReduce.__deepcopy__ro   )r   r   r   r   rs   r   rt   r   r
  rw   r   r{   r  r  r   r!   r!   r   r"   r     s    
;
J!")r   c                 C   s  t dd | D }t r4t jdd}t jdd}t|t|kr-td t	 S t  }nt
j|d}t  }|D ]}t|j|v rO||j q?t|t|kretddt||  td	d |D rvtd
 t	 S tdrtddS td t	 S )aJ  Find the best `CrossDeviceOps` locally given a `tf.compat.v1.ConfigProto`.

  Args:
    devices: a list of devices passed to `tf.distribute.Strategy`.
    session_config: a `tf.compat.v1.ConfigProto` or `None`. If `None`, it will
      make decision based on all logical devices.

  Returns:
    A subclass of `CrossDeviceOps`.
  c                 s   r   rE   r   r   r!   r!   r"   rG   V  rH   z*select_cross_device_ops.<locals>.<genexpr>ZGPU)Zdevice_typezUNCCL is not supported when using virtual GPUs, fallingback to reduction to one device)session_configzTSome requested devices in `tf.distribute.Strategy` are not visible to TensorFlow: %s,c                 s   s    | ]	}d |  vV  qdS )ZgpuNr   r   r!   r!   r"   rG   m  r   zPThere are non-GPU devices in `tf.distribute.Strategy`, not using nccl allreduce.r   r,   )r   z3Nccl kernel is not found, not using nccl allreduce.)rQ   r   Z#executing_eagerly_outside_functionsr   Zlist_logical_devicesZlist_physical_devicesr>   r   warningr   r   Zlist_local_devicesr   r   nameaddjoinr<   r   r   Zget_registered_kernels_for_opr   )r_   r  Zrequested_devicesZlogical_gpusZphysical_gpusZmachine_devicesZusing_devicesrV   r!   r!   r"   select_cross_device_opsK  s8   




r  )T)FTr   r   rE   )Lr   collectionsr  Zmultiprocessing.dummyr  Zmultiprocessing.poolr   numpyr/   r'   Ztensorflow.python.clientr   Ztensorflow.python.distributer   r   r   r   r   r   r	   r
   r$   r   Ztensorflow.python.eagerr   r   Ztensorflow.python.frameworkr   r   r   r   r   r   Ztensorflow.python.opsr   r   r   Ztensorflow.python.platformr   r   Ztensorflow.python.utilr   Z tensorflow.python.util.tf_exportr   Ztensorflow.tools.docsr   r#   r*   r7   r:   rD   rL   rP   rT   rZ   r4   ri   rm   objectrn   r   r   r   r   r   r   r   ZAllReduceCrossTowerOps
namedtupler   r   r   r   ZCollectiveCommunicationr   r  r!   r!   r!   r"   <module>   s   
!



  KC
#
h
v$+  8