o
    ?e                     @   s@  d Z ddlZddlZddlmZ ddlmZ ddlmZ ddlm	Z	 ddlm
Z
 dd	 Zd
d Zdd Zdd Zdd Zdd Z	d<ddZdd Zdd Zdd Zd<ddZdd Zd d! Zd<d"d#Zd<d$d%Zd&d' Zd(d) Zd<d*d+Zd,d- Zd.d/ Zd<d0d1Zd<d2d3Z 	d<d4d5Z!d6d7 Z"	d<d8d9Z#	d<d:d;Z$dS )=zIUtilities to construct a TF subgraph implementing distributed All-Reduce.    Ndevice)ops)	array_ops)math_ops)nccl_opsc              	   C   s   | st d| d j}| D ]}||j}q| st dt|dkrLg }| D ]!}t| |t	|dg W d   n1 sDw   Y  q(|} | |fS )ai  Check tensors for isomorphism and flatten.

  Args:
    tensors: list of `tf.Tensor` which must all have the same shape.

  Returns:
    tensors: a list of `tf.Tensor` which are flattened (1D) views of tensors
    shape: the original shape of each element of input tensors

  Raises:
    ValueError: tensors are empty or non-isomorphic or have unknown shape.
  tensors cannot be emptyr   z)Tensors must have statically known shape.   N)

ValueErrorshapeZ
merge_withZis_fully_definedlenr   colocate_withappendr   reshape)tensorsr   tensorreshapedt r   k/home/www/facesmatcher.com/pyenv/lib/python3.10/site-packages/tensorflow/python/distribute/v1/all_reduce.py_flatten_tensors   s    
r   c              	   C   sN   g }| D ] }t | |t|| W d   n1 sw   Y  q|S )a=  Reshape tensors flattened by _flatten_tensors.

  Args:
    tensors: list of `tf.Tensor` of identical length 1D tensors.
    shape: list of integers describing the desired shape.  Product of
      the elements must equal the length of each tensor.

  Returns:
    list of `tf.Tensor` which are the reshaped inputs.
  N)r   r   r   r   r   )r   r   r   r   r   r   r   _reshape_tensors8   s   r   c           	         s  | j }dt|krtd|jd j}t|  || dkrd||   ||krM|| }t| tj	|g| j
dgd}t||}||fW  d   S |d   |kr{|  | }t| tj	|g| j
dgd}t||}||fW  d   S ||d    } | } fddt|d D |g }t| |}t|d tj	|g| j
dgd|d< ||fW  d   S t| |dfW  d   S 1 sw   Y  dS )	a  Like split for 1D tensors but pads-out case where len % pieces != 0.

  Args:
    tensor: `tf.Tensor` that must be 1D.
    pieces: a positive integer specifying the number of pieces into which
      tensor should be split.

  Returns:
    list of `tf.Tensor` of length pieces, which hold the values of
      thin input tensor, in order. The final tensor may
      be zero-padded on the end to make its size equal to those of all
      of the other tensors.

  Raises:
    ValueError: The input tensor is not 1D.
  r	   input tensor must be 1Dr   )dtypeNc                       g | ]} qS r   r   .0_
chunk_sizer   r   
<listcomp>v       z!_padded_split.<locals>.<listcomp>r
   )r   r   r   dimsvaluer   r   r   concatZzerosr   splitrange)	r   piecesr   
tensor_lenpad_lenZextended_wholepartslast_chunk_size
piece_lensr   r   r   _padded_splitJ   sD    $r.   c              	   C   s   | st d| d j}t|dkrt dt|d | }|dk r%t dg }| D ]#}t| |t|dg|g W d   n1 sGw   Y  q)|S )a  Strip the suffix padding added by _padded_split.

  Args:
    tensors: list of `tf.Tensor` of identical length 1D tensors.
    pad_len: number of elements to be stripped from the end of each tensor.

  Returns:
    list of `tf.Tensor` which are the stripped inputs.

  Raises:
    ValueError: tensors must be a non-empty list of 1D tensors, and
      each must be longer than pad_len.
  r   r   r	   ztensors must be 1Dzpad_len longer than tensorN)	r   r   r   intr   r   r   r   slice)r   r*   r   
prefix_lenstrippedr   r   r   r   _strip_padding   s   
r3   c                    s   | j }dt|krtd|jd j}||  t| E ||  krT|dks)J ||d    }|dks7J  fddt|d D |g }t	| |W  d   S t	| |W  d   S 1 sdw   Y  dS )a  Like split for 1D tensors but allows case where len % pieces != 0.

  Args:
    tensor: `tf.Tensor` that must be 1D.
    pieces: a positive integer specifying the number of pieces into which
      tensor should be split.

  Returns:
    list of `tf.Tensor` of length pieces, which hold the values of
      the input tensor, in order. The final tensor may be shorter
      than the others, which will all be of equal length.

  Raises:
    ValueError: input tensor must be 1D.
  r	   r   r   c                    r   r   r   r   r   r   r   r!      r"   z!_ragged_split.<locals>.<listcomp>N)
r   r   r   r#   r$   r   r   r'   r   r&   )r   r(   r   r)   r,   r-   r   r   r   _ragged_split   s    

	$r4   c                    s`  t ||    dkrg g fS |krtd|f tdt| }g }td|D ]/}g }|| }td| D ]fdd|D }||d |d|  }	||	7 }q9|| q, fddtd|D }
 fddtd|D }td|D ]2}td D ]*}td D ]"}||| | kr||| |< || |  d    |
| |<  qqqqy|
|fS )	a  "Generate an array of device index arrays, one for each subchunk.

  In the basic ring reduction algorithm there are size(T)/num_devices
  data chunks and each device process one chunk per tick, i.e. sending
  one chunk and receiving one chunk.  The idea of subchunking is that
  each device processes num_subchunks smaller data regions per tick,
  and the ring rank permutation is different for each subchunk index
  so that a device is potentially sending to and receiving from
  num_subchunks different other devices at each tick.  Where multiple
  independent data channels exist between devices, this strategy
  supplies a method of using them in parallel.

  Args:
    num_workers: number of worker tasks
    num_subchunks: number of subchunks into which to divide each per-GPU chunk.
    gpu_perm: an array of integers in [0, num_gpus-1] giving the default
      ring order of GPUs at each worker.  Other permutations will be generated
      by rotating this array and splicing together per-worker instances.

  Raises:
    ValueError: the number of subchunks may not exceed the number of GPUs.

  Returns:
    pred_by_s_d: list of lists that maps (by index) from (subchunk, dev) to
        preceding device in the permutation for that subchunk.  The
        device index of GPU i at worker j is i + (j * num_gpus).
    rank_by_s_d: list of lists that maps (by index) from (subchunk, dev) to
       local rank of device d in the permutation for that subchunk.
  r   z'num_subchunks %d must be <= num_gpus %dr	   c                    s   g | ]}  | qS r   r   )r   i)num_gpuswr   r   r!      s    z&_ring_permutations.<locals>.<listcomp>Nc                        g | ]}d d t d D qS )c                 S      g | ]}d qS r
   r   r   dr   r   r   r!      r"   1_ring_permutations.<locals>.<listcomp>.<listcomp>r   r'   r   sdevicesr   r   r!          c                    r8   )c                 S   r9   r:   r   r;   r   r   r   r!      r"   r=   r   r>   r?   rA   r   r   r!      rC   )r   r   maxr/   r'   r   )num_workersnum_subchunksgpu_permZrotation_intervalZ
perms_by_sr@   Z
full_orderoffsetZdefault_orderZ	dev_orderpred_by_s_drank_by_s_dr<   r   r   )rB   r6   r7   r   _ring_permutations   sF   



 rK   c                 C   s   t | dk r
tdt| \} }dd | D }t|||\}}	t| ||||	|\}
}|r1t||
}
t||	|
}|dkr@t||}t |dkrKt||}|S )av  Construct a subgraph performing a ring-style all-reduce of input_tensors.

  Args:
    input_tensors: a list of `tf.Tensor` objects, which must all
      have the same shape and type.
    num_workers: number of worker tasks spanned by input_tensors.
    num_subchunks: number of subchunks each device should process in one tick.
    gpu_perm: a list of ints giving a ring-wise rank ordering of GPUs at
      each worker.  All workers must have the same number of
      GPUs with the same rank ordering.  If NVLINK is available, this should
      be a ring order supported by NVLINK edges.
    red_op: a binary operator for elementwise reduction.
    un_op: an optional unary operator to apply to fully reduced values.

  Raises:
    ValueError: empty input_tensors or they don't all have same
    size.

  Returns:
    a list of `tf.Tensor` identical sum-reductions of input_tensors.
     z(input_tensors must be length 2 or longerc                 S      g | ]}|j qS r   r   r   r   r   r   r   r!         z)build_ring_all_reduce.<locals>.<listcomp>r   r	   )	r   r   r   rK   _build_ring_gather_apply_unary_to_chunks_build_ring_scatterr3   r   )input_tensorsrE   rF   rG   red_opun_opr   rB   rI   rJ   chunks_by_devr*   output_tensorsr   r   r   build_ring_all_reduce   s*   


rX   c              
   C   s  t | }|dkr
g S |dkr| S | d j}dt |krtd|| }|d }	g }
d}td|D ]'}t||  t| | |\}}|
| W d   n1 sRw   Y  q0td|	D ]}dd td|D }td|D ]L}t|| ; td|D ]-}|| | }|| d|  | }|| | }|| | }||
| | |
| | ||< q}W d   n1 sw   Y  qntd|D ](}td|D ] }|| | }|| d|  | }|| | }|| |
| |< qqq]|
|fS )a  Construct a subgraph for the first (reduction) pass of ring all-reduce.

  Args:
    input_tensors: a list of `tf.Tensor` 1D input tensors of same
      shape and type.
    devices: array of device name strings
    num_subchunks: number of subchunks each device should process in one tick.
    pred_by_s_d: as produced by _ring_permutations
    rank_by_s_d: as produced by _ring_permutations
    red_op: a binary operator for elementwise reduction

  Raises:
    ValueError: tensors must all be one dimensional.

  Returns:
    list of list of `tf.Tensor` of (partially) reduced values where
    exactly num_subchunks chunks at each device are fully reduced.
  r   r	   zinput tensors must be 1DNc                 S   r9   Nr   r   r   r   r   r!   M  r"   z&_build_ring_gather.<locals>.<listcomp>rL   )r   r   r   r'   r   r   r.   r   )rS   rB   rF   rI   rJ   rT   num_devicesr   
num_chunks	num_ticksrV   Zsplit_pad_lenr<   ZsplitstickZnew_partial_reductionsr@   rank	seg_indexpred_devchunk_indexr   r   r   rP   %  sV   




rP   c              	      sX   g }|D ]%}t |d  | fdd|D  W d   n1 s$w   Y  q|S )a&  Apply a unary op to each tensor in chunks_by_dev, on same device.

  Args:
    f: a unary function over `tf.Tensor`.
    chunks_by_dev: list of lists of `tf.Tensor`.

  Returns:
    new list of lists of `tf.Tensor` with the same structure as
    chunks_by_dev containing the derived tensors.
  r   c                       g | ]} |qS r   r   rN   fr   r   r!   q      z*_apply_unary_to_chunks.<locals>.<listcomp>N)r   r   r   )rd   rV   outputxr   rc   r   rQ   c  s   rQ   c              
   C   s  t |}t |d }d|| krtdt|| }|d }td|D ]}dd td|D }td|D ]J}	t||	 d 7 td|D ])}
||
 |	 }|| d|  | }| |
 |	 }|| |
 }t|| | ||< qEW d   n1 syw   Y  q4td|D ](}	td|D ] }
||
 |	 }|| d|  | }|| |
 }|| ||	 |< qqq#g }|D ]"}t|d  |t	|d W d   n1 sw   Y  q|S )a  Construct subgraph for second (scatter) pass of ring all-reduce.

  Args:
    pred_by_s_d: as produced by _ring_permutations
    rank_by_s_d: as produced by _ring_permutations
    chunks_by_dev: list of list of `tf.Tensor` indexed by ints
      (device, chunk)

  Raises:
    ValueError: chunks_by_dev is not well-formed

  Returns:
    list of `tf.Tensor` which are the fully reduced tensors, one
    at each device corresponding to the outer dimension of chunks_by_dev.
  r   zAExpect number of chunks per device to be divisible by num_devicesr	   c                 S   r9   rY   r   r   r   r   r   r!     r"   z'_build_ring_scatter.<locals>.<listcomp>N)
r   r   r/   r'   r   r   r   identityr   r%   )rI   rJ   rV   rZ   r[   rF   r\   r]   Zpassed_valuesr<   r@   r^   r_   r`   ra   rf   rg   r   r   r   rR   u  sL   

rR   c                    s`   dd | D }t | \} }t| ||} r fdd|D }t||}t|dkr.t||}|S )a  Construct a subgraph for recursive halving-doubling all-reduce.

  The recursive halving-doubling algorithm is described in
  (Thakur et al., 2015).

  The concept is to arrange the participating n devices in
  a linear sequence where devices exchange data pairwise
  with one other device in each round.  During the gather
  phase there are lg(n) rounds where devices exchange
  increasingly smaller sub-tensors with another device
  at increasingly greater distances, until at the top
  each device has 1/n of the fully reduced values.  During the
  scatter phase each device exchanges its fully reduced
  sub-tensor (which doubles in length at each round)
  with one other device at increasingly smaller distances
  until each device has all of the fully reduced values.

  Note: this preliminary version requires that len(input_tensors) be a
    power of 2.  TODO(tucker): relax this restriction.  Also, the
    number of elements in each tensor must be divisible by 2^h where h
    is the number of hops in each phase.  This will also be relaxed in
    the future with edge-case specific logic.

  Args:
    input_tensors: list of `tf.Tensor` to be elementwise reduced.
    red_op: a binary elementwise reduction Op.
    un_op: an optional unary elementwise Op to apply to reduced values.

  Returns:
    list of `tf.Tensor` which are the fully reduced tensors, one
    at each device of input_tensors.

  Raises:
    ValueError: num_devices not a power of 2, or tensor len not divisible
    by 2 the proper number of times.

  References:
    Optimization of Collective Communication Operations in MPICH:
      [Thakur et al., 2005]
      (https://journals.sagepub.com/doi/abs/10.1177/1094342005051521)
      ([pdf](http://wwwi10.lrr.in.tum.de/~gerndt/home/Teaching/HPCSeminar/mpich_multi_coll.pdf))
  c                 S   rM   r   r   rN   r   r   r   r!     rO   z1build_recursive_hd_all_reduce.<locals>.<listcomp>c                    rb   r   r   rN   rU   r   r   r!     re   r	   )r   _build_recursive_hd_gather_build_recursive_hd_scatterr   r   )rS   rT   rU   rB   r   reduced_shardsrW   r   ri   r   build_recursive_hd_all_reduce  s   +

rm   c              
   C   sD  t |}tt|d}|d| krtd| }td|D ]}d| }|d }dd |D }	td|D ]i}
|
| |d kr>q3||
 }||
|  }t||
 d}t||
|  d}t	| ||d |d |	|
< W d   n1 suw   Y  t	| ||d |d |	|
| < W d   n1 sw   Y  q3|	}q|S )a  Construct the gather phase of recursive halving-doubling all-reduce.

  Args:
    input_tensors: list of `tf.Tensor` to be elementwise reduced.
    devices: a list of strings naming the devices hosting input_tensors,
      which will also be used to host the (partial) reduction values.
    red_op: a binary elementwise reduction Op.

  Returns:
    list of `tf.Tensor` which are the fully reduced tensor shards.

  Raises:
    ValueError: num_devices not a power of 2, or tensor len not divisible
    by 2 the proper number of times.
  rL    num_devices must be a power of 2r   c                 S      g | ]}g qS r   r   r   r   r   r   r!     r"   z._build_recursive_hd_gather.<locals>.<listcomp>Nr	   )
r   r/   mathlogr   r'   r   r&   r   r   )rS   rB   rT   rZ   num_hopschunkshspan
group_size
new_chunksr<   left_dev	right_dev
left_splitright_splitr   r   r   rj     s2   rj   c              
   C   s4  t |}tt|d}|d| ksJ d| }ttd|D ]x}d| }|d }dd |D }td|D ]_}	|	| |d kr@q5|	}
|	| }||
 }|| }t| t	||
 || gd||
< W d   n1 slw   Y  t| t	||
 || gd||< W d   n1 sw   Y  q5|}q|S )aR  Construct the scatter phase of recursive halving-doubling all-reduce.

  Args:
    input_tensors: list of `tf.Tensor` that are fully-reduced shards.
    devices: a list of strings naming the devices on which the reconstituted
      full tensors should be placed.

  Returns:
    list of `tf.Tensor` which are the fully reduced tensors.
  rL   rn   r   c                 S   ro   r   r   r   r   r   r   r!     r"   z/_build_recursive_hd_scatter.<locals>.<listcomp>N)
r   r/   rp   rq   reversedr'   r   r   r   r%   )rS   rB   rZ   rr   rs   rt   ru   rv   rw   r<   Zleft_idxZ	right_idxrx   ry   r   r   r   rk     s@   



rk   c                 C   sL   t | \} }dd | D }t| |||}t||}t|dkr$t||}|S )a  Construct a subgraph for shuffle all-reduce.

  Shuffle reduce is essentially the algorithm implemented when using
  parameter servers.  Suppose tensor length is n, there are d devices
  and g gather shards.  Each device sends a n/g length sub-tensor to
  each gather shard.  The gather shards perform a reduction across d
  fragments, then broadcast the result back to each device.  The
  devices then join the g fully reduced fragments they receive from
  the shards.  The gather shards could perform d-1 pairwise
  reductions, or one d-way reduction.  The first is better where
  reduction Op time is low compared to transmission time, the second
  better in the other case.

  Args:
    input_tensors: list of `tf.Tensor` values to be reduced.
    gather_devices: list of names of devices on which reduction shards
      should be placed.
    red_op: an n-array elementwise reduction Op
    un_op: optional elementwise unary Op to be applied to fully-reduced values.

  Returns:
    list of `tf.Tensor` which are the fully reduced tensors.
  c                 S   rM   r   r   rN   r   r   r   r!   C  rO   z,build_shuffle_all_reduce.<locals>.<listcomp>r	   )r   _build_shuffle_gather_build_shuffle_scatterr   r   )rS   gather_devicesrT   rU   r   dst_devicesrl   rW   r   r   r   build_shuffle_all_reduce*  s   

r   c              	      s   t | }t |}| d j}t |dkrtdg }td|D ]# t|    |t|   | W d   n1 s<w   Y  qg }td|D ]1 t|     fdd|D }	||	}
|rf||
}
||
 W d   n1 suw   Y  qI|S )a  Construct the gather (concentrate and reduce) phase of shuffle all-reduce.

  Args:
    input_tensors: list of `tf.Tensor` values to be reduced.
    gather_devices: list of names of devices on which reduction shards
      should be placed.
    red_op: the binary reduction Op
    un_op: optional elementwise unary Op to be applied to fully-reduced values.

  Returns:
    list of `tf.Tensor` which are the fully reduced shards.

  Raises:
    ValueError: inputs not well-formed.
  r   r	   zinput_tensors must be 1DNc                    s   g | ]}|  qS r   r   r?   r<   r   r   r!   i  re   z)_build_shuffle_gather.<locals>.<listcomp>)	r   r   r   r'   r   r   r   r4   r   )rS   r   rT   rU   Znum_source_devicesZnum_gather_devicesr   Zshards_by_sourcerl   valuesZ	red_shardr   r   r   r}   L  s0   
r}   c              	   C   s`   t |}g }td|D ]"}t||  |t| d W d   n1 s(w   Y  q|S )a  Build the scatter phase of shuffle all-reduce.

  Args:
    reduced_shards:  list of `tf.Tensor` fully reduced shards
    dst_devices: list of names of devices at which the fully-reduced value
      should be reconstituted.

  Returns:
    list of `tf.Tensor` scattered tensors.
  r   N)r   r'   r   r   r   r   r%   )rl   r   rZ   Zout_tensorsr<   r   r   r   r~   q  s   r~   c                 C   s   t | }|t |krtdt }t }t|D ]F}tj| | }t|dr.|j	du r6J d| |  |j
p:d|jp>d|j	f}||vrNg ||< g ||< || | |  || ||  qt| t| fS )a]  Partition devices and values by common task.

  Args:
    devices: list of device name strings
    values: list of `tf.Tensor` of same length as devices.

  Returns:
    (per_task_devices, per_task_values) where both values are
    lists of lists with isomorphic structure: the outer list is
    indexed by task, and the inner list has length of the number
    of values belonging to that task.  per_task_devices contains
    the specific devices to which the values are local, and
    per_task_values contains the corresponding values.

  Raises:
    ValueError: devices must be same length as values.
  z#len(devices) must equal len(values)taskNFzfailed to parse device %s	localhostr   )r   r   collectionsOrderedDictr'   
device_libZ
DeviceSpecZfrom_stringhasattrr   ZjobZreplicar   listr   )rB   r   rZ   Zper_task_devicesZper_task_valuesr<   Zd_specindexr   r   r   _split_by_task  s    r   c              	   C   sr   |t jkrt| }ntd||r7g }|D ]}t| ||| W d   n1 s/w   Y  q|}|S )a  Build a subgraph that does one full all-reduce, using NCCL.

  Args:
    input_tensors: list of `tf.Tensor` of same-shape and type values to
      be reduced.
    red_op: binary elementwise reduction operator. Must be one of
      {tf.add}
    un_op: optional unary elementwise Op to apply to fully-reduce values.

  Returns:
    list of `tf.Tensor` of reduced values.

  Raises:
    ValueError: red_op not supported.
  z)red_op not supported by NCCL all-reduce: N)r   addr   Zall_sumr   r   r   r   )rS   rT   rU   rW   Zun_op_wrappedr   r   r   r   build_nccl_all_reduce  s   

r   c              
   C   s  t | \} }dd | D }t|| \}}t|}dd td|D }|dd }	|dd }
td|D ]G}t|| |}t|1 t|d j t	|d ||< W d   n1 s^w   Y  || d |	|< W d   n1 suw   Y  q3||}td|D ]O}g }t|| d  t
t	|| }W d   n1 sw   Y  || D ]}t| |t	| W d   n1 sw   Y  q||
|< qdd |
D }t|dkrt||}|S )a  Construct a subgraph for NCCL hybrid all-reduce.

  Args:
    input_tensors: list of `tf.Tensor` of same-shape and type values to
      be reduced.
    red_op: binary elementwise reduction operator.
    upper_level_f: function for reducing one value per worker, across
      workers.

  Returns:
    list of `tf.Tensor` of reduced values.

  Raises:
    ValueError: inputs not well-formed.
  c                 S   rM   r   r   rN   r   r   r   r!     rO   z&_build_nccl_hybrid.<locals>.<listcomp>c                 S   r9   rY   r   )r   r7   r   r   r   r!     r"   r   Nc                 S   s   g | ]	}|D ]}|qqS r   r   )r   Zsublistvr   r   r   r!     s    r	   )r   r   r   r'   r   r   Zcontrol_dependenciesr   r   rh   r   	broadcastr   r   )rS   rT   upper_level_fr   rB   per_worker_devicesper_worker_valuesrE   	up_valuesZ
up_devicesZdown_valuesr7   Zworker_valueslevel_2_outputZdst_tensorsZbroadcast_srcr<   rW   r   r   r   _build_nccl_hybrid  s@   

r   c              	   C   sf   t | dkr
|| S |s| S g }| D ]}t| ||| W d   n1 s+w   Y  q|S )z9If len(input_tensors) > 1, apply red_f, else apply un_op.r	   N)r   r   r   r   )rS   Zred_frU   rW   r   r   r   r   _reduce_non_singleton  s   r   c                    s*    fddfdd}t |  |S )z=Construct hybrid of NCCL within workers, Ring across workers.c                       t | t| dg S Nr   rX   r   )yrT   subdivrU   r   r   upper_builder	  s   z+build_nccl_then_ring.<locals>.upper_builderc                       t |  S rY   r   rg   rU   r   r   r   r        z+build_nccl_then_ring.<locals>.upper_level_fr   )rS   r   rT   rU   r   r   rT   r   rU   r   r   build_nccl_then_ring  s   r   c                    s    fdd}t |  |S )zEConstruct hybrid of NCCL within workers, Recursive-HD across workers.c                    s   t |  S rY   )rm   r   rT   rU   r   r   <lambda>  s    z.build_nccl_then_recursive_hd.<locals>.<lambda>r   )rS   rT   rU   r   r   r   r   build_nccl_then_recursive_hd  s   r   c                    s    fdd}t | ||S )z@Construct hybrid of NCCL within workers, Shuffle across workers.c                    s   t |  S rY   r   r   r   shuffle_red_oprU   r   r   r     s   z.build_nccl_then_shuffle.<locals>.upper_level_fr   )rS   r   Znccl_red_opr   rU   r   r   r   r   build_nccl_then_shuffle  s   r   c                 C   s   t | \} }dd | D }t|| \}}t|}g }	t||kr$tdtd|D ]}
t||
 ||
 g|}|	|d  q)||	}g }td|D ]}
|t||
 g||
 7 }qIt|dkrct||}|S )a  Construct a subgraph for Shuffle hybrid all-reduce.

  Args:
    input_tensors: list of `tf.Tensor` of same-shape and type values to
      be reduced.
    gather_devices: list of device names on which to host gather shards.
    red_op: binary elementwise reduction operator.
    upper_level_f: function for reducing one value per worker, across
      workers.

  Returns:
    list of `tf.Tensor` of reduced values.

  Raises:
    ValueError: inputs not well-formed.
  c                 S   rM   r   r   rN   r   r   r   r!   2  rO   z)_build_shuffle_hybrid.<locals>.<listcomp>zGFor shuffle hybrid, gather_devices must contain one device per worker. r   r	   )	r   r   r   r   r'   r}   r   r~   r   )rS   r   rT   r   r   rB   r   r   rE   r   r7   rl   r   rW   r   r   r   _build_shuffle_hybrid  s*   
r   c                    s,    fddfdd}t | |||S )z@Construct hybrid of Shuffle within workers, Ring across workers.c                    r   r   r   r   r   r   r   r   L  s   z.build_shuffle_then_ring.<locals>.upper_builderc                    r   rY   r   r   r   r   r   r   O  r   z.build_shuffle_then_ring.<locals>.upper_level_fr   )rS   r   r   Zred_n_oprT   rU   r   r   r   r   build_shuffle_then_ringI  
   r   c                    s,    fddfdd}t | | |S )zCConstruct hybrid of Shuffle within workers, Shuffle across workers.c                    s   t |  S rY   r   r   )rT   second_gather_devicesrU   r   r   r   X  s   z1build_shuffle_then_shuffle.<locals>.upper_builderc                    r   rY   r   r   r   r   r   r   [  r   z1build_shuffle_then_shuffle.<locals>.upper_level_fr   )rS   Zfirst_gather_devicesr   rT   rU   r   r   )rT   r   rU   r   r   build_shuffle_then_shuffleU  r   r   rY   )%__doc__r   rp   Ztensorflow.python.frameworkr   r   r   Ztensorflow.python.opsr   r   r   r   r   r.   r3   r4   rK   rX   rP   rQ   rR   rm   rj   rk   r   r}   r~   r   r   r   r   r   r   r   r   r   r   r   r   r   r   <module>   sL   5!>
+>
16)
%
"%
%3

	
	+
