o
    ?eWN                    @   sJ  d Z ddlZddlZddlZddlZddlZddlZddlmZ ddl	Z
ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ dd	lmZ dd
lmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlm Z  ddl!m"Z# ddl$mZ% ddl&m'Z' ddl&m(Z( ddl&m)Z) ddl*m+Z+ ddl*m,Z- ddl*m.Z. ddl*m/Z/ ddl*m0Z0 ddl*m1Z1 ddl*m2Z2 ddl*m3Z3 ddl*m4Z4 dd l5m6Z6 dd!l5m7Z7 dd"l5m8Z8 dd#l5m9Z9 dd$l5m:Z; dd%l<m=Z= dd&l>m?Z? dd'l@mAZB dd(l@mCZC dd)l@mDZD dd*l@mEZE dd+lFmGZG dd,lHmIZI dd-lHmJZJ dd.lHmKZK dd/lLmMZM d0ZNd1ZOd2d3 ZPejQd4d5 ZRd6d7 ZSd8d9 ZTeMd:g d;G d<d= d=ejUZVeMd>g d;eIWd>G d?d@ d@ejUZXeMd>gd;G dAdB dBejYZZG dCdD dDej[Z\dEdF Z]e/j^e/j_e/j`e/jae/jbe/jcfZdG dGdH dHejeZfdIdJ ZgdS )KzTPU Strategy.    N)logging)ag_ctx)api)xla_sharding)cross_device_ops)device_util)distribute_lib)distribute_utils)	input_lib)
input_util)numpy_dataset)reduce_util)tpu_replicated_variable)tpu_util)
tpu_values)values)tpu_cluster_resolver)context)def_function)function)constant_op)device)device_spec)dtypes)indexed_slices)ops)sparse_tensor)tensor_shape)tensor_util)	array_ops)control_flow_ops)math_ops)resource_variable_ops)	variables)ragged_tensor)save_context)device_assignment)tpu)tpu_hardware_feature)training_loop)tpu_ops)deprecation)nest)
tf_inspect)	tf_export   Fc                   C   s   t o
t o
t  S )z8Whether to batch variable initialization in tf.function.)/_EXPERIMENTAL_TPU_BATCH_VARIABLE_INITIALIZATIONr   executing_eagerlyr%   Zin_save_context r2   r2   j/home/www/facesmatcher.com/pyenv/lib/python3.10/site-packages/tensorflow/python/distribute/tpu_strategy.py$enable_batch_variable_initializationP   s
   r4   c                   c   sH    t  r
d V  d S t   d V  W d    d S 1 sw   Y  d S N)r   Z#executing_eagerly_outside_functionsZ
init_scoper2   r2   r2   r3   maybe_init_scopeY   s   

"r6   c                 C   sN   t  rt| tjs!t| tjs#t| rt| jtjs%t	ddS dS dS dS )z/Validate the function passed into strategy.run.zTPUStrategy.run(fn, ...) does not support pure eager execution. please make sure the function passed into `strategy.run` is a `tf.function` or `strategy.run` is called inside a `tf.function` if eager behavior is enabled.N)
r   r1   
isinstancer   Functionr   ZConcreteFunctioncallable__call__NotImplementedError)fnr2   r2   r3   validate_run_functionb   s   

r=   c              	      s  dd  i }i }|r fdd|  D }|r" fdd|  D }g }d}tt| j D ]W\}}|dkr>|jdkr>q0|jtjj	krL|
|j q0|jtjjkrV|}q0|jtjjkr|sjt fd	d
|D rtdt| dt fdd
|D  d| ||f  S q0g }	d}
t|D ]K\}} |r|dur||krtdt||krtd|||| < d}
q|dur||kr|
rtd|	
| qt||krtd|||| < q|rtj| fi ||	|fS | ||fS )a|  Inspects arguments to partially apply any DistributedVariable.

  This avoids an automatic cast of the current variable value to tensor.

  Note that a variable may be captured implicitly with Python scope instead of
  passing it to run(), but supporting run() keeps behavior consistent
  with MirroredStrategy.

  Since positional arguments must be applied from left to right, this function
  does some tricky function inspection to move variable positional arguments
  into kwargs. As a result of this, we can't support passing Variables as *args,
  nor as args to functions which combine both explicit positional arguments and
  *args.

  Args:
    fn: The function to run, as passed to run().
    args: Positional arguments to fn, as passed to run().
    kwargs: Keyword arguments to fn, as passed to run().

  Returns:
    A tuple of the function (possibly wrapped), args, kwargs (both
    possibly filtered, with members of args possibly moved to kwargs).
    If no variables are found, this function is a noop.

  Raises:
    ValueError: If the function signature makes unsupported use of *args, or if
      too many arguments are passed.
  c                 S   s   t | }|ot|d tjS Nr   )r,   flattenr7   r   DistributedVariable)xZflatr2   r2   r3   is_distributed_var   s   
z:_maybe_partial_apply_variables.<locals>.is_distributed_varc                    s   i | ]\}} |r||qS r2   r2   .0kvrB   r2   r3   
<dictcomp>   s    z2_maybe_partial_apply_variables.<locals>.<dictcomp>c                    s   i | ]\}} |s||qS r2   r2   rC   rG   r2   r3   rH      s
    Nr   selfc                 3       | ]} |V  qd S r5   r2   rD   arG   r2   r3   	<genexpr>       z1_maybe_partial_apply_variables.<locals>.<genexpr>zWMixing Variables and positional-only parameters not supported by TPUStrategy. Received z& DistributedVariables in **kwargs and c                 3   rJ   r5   r2   rK   rG   r2   r3   rM      rN   z" in *args, expected zero for both.FzTPUStrategy.run() cannot handle Variables passed to *args. Either name the function argument, or capture the Variable implicitly.zBToo many positional arguments passed to call to TPUStrategy.run().TzTPUStrategy.run() cannot handle both Variables and a mix of positional args and *args. Either remove the *args, or capture the Variable implicitly.)items	enumerater-   	signature
parametersr   namekind	ParameterPOSITIONAL_OR_KEYWORDappendVAR_POSITIONALPOSITIONAL_ONLYany
ValueErrorlensum	functoolspartial)r<   argskwargsZ
var_kwargsZnonvar_kwargsZpositional_argsZindex_of_star_argsip	star_argsZhave_seen_var_argrL   r2   rG   r3   _maybe_partial_apply_variables}   st   


re   zdistribute.TPUStrategy)v1c                       sV   e Zd ZdZ			d fdd	ZdddZed	d
 Zdd Zdd Z	dd Z
  ZS )TPUStrategyV2a  Synchronous training on TPUs and TPU Pods.

  To construct a TPUStrategy object, you need to run the
  initialization code as below:

  >>> resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu='')
  >>> tf.config.experimental_connect_to_cluster(resolver)
  >>> tf.tpu.experimental.initialize_tpu_system(resolver)
  >>> strategy = tf.distribute.TPUStrategy(resolver)

  While using distribution strategies, the variables created within the
  strategy's scope will be replicated across all the replicas and can be kept in
  sync using all-reduce algorithms.

  To run TF2 programs on TPUs, you can either use `.compile` and
  `.fit` APIs in `tf.keras` with TPUStrategy, or write your own customized
  training loop by calling `strategy.run` directly. Note that
  TPUStrategy doesn't support pure eager execution, so please make sure the
  function passed into `strategy.run` is a `tf.function` or
  `strategy.run` is called inside a `tf.function` if eager
  behavior is enabled. See more details in https://www.tensorflow.org/guide/tpu.

  `distribute_datasets_from_function` and
  `experimental_distribute_dataset` APIs can be used to distribute the dataset
  across the TPU workers when writing your own training loop. If you are using
  `fit` and `compile` methods available in `tf.keras.Model`, then Keras will
  handle the distribution for you.

  An example of writing customized training loop on TPUs:

  >>> with strategy.scope():
  ...   model = tf.keras.Sequential([
  ...     tf.keras.layers.Dense(2, input_shape=(5,)),
  ...   ])
  ...   optimizer = tf.keras.optimizers.SGD(learning_rate=0.1)

  >>> def dataset_fn(ctx):
  ...   x = np.random.random((2, 5)).astype(np.float32)
  ...   y = np.random.randint(2, size=(2, 1))
  ...   dataset = tf.data.Dataset.from_tensor_slices((x, y))
  ...   return dataset.repeat().batch(1, drop_remainder=True)
  >>> dist_dataset = strategy.distribute_datasets_from_function(
  ...     dataset_fn)
  >>> iterator = iter(dist_dataset)

  >>> @tf.function()
  ... def train_step(iterator):
  ...
  ...   def step_fn(inputs):
  ...     features, labels = inputs
  ...     with tf.GradientTape() as tape:
  ...       logits = model(features, training=True)
  ...       loss = tf.keras.losses.sparse_categorical_crossentropy(
  ...           labels, logits)
  ...
  ...     grads = tape.gradient(loss, model.trainable_variables)
  ...     optimizer.apply_gradients(zip(grads, model.trainable_variables))
  ...
  ...   strategy.run(step_fn, args=(next(iterator),))

  >>> train_step(iterator)

  For the advanced use cases like model parallelism, you can set
  `experimental_device_assignment` argument when creating TPUStrategy to specify
  number of replicas and number of logical devices. Below is an example to
  initialize TPU system with 2 logical devices and 1 replica.

  >>> resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu='')
  >>> tf.config.experimental_connect_to_cluster(resolver)
  >>> topology = tf.tpu.experimental.initialize_tpu_system(resolver)
  >>> device_assignment = tf.tpu.experimental.DeviceAssignment.build(
  ...     topology,
  ...     computation_shape=[1, 1, 1, 2],
  ...     num_replicas=1)
  >>> strategy = tf.distribute.TPUStrategy(
  ...     resolver, experimental_device_assignment=device_assignment)

  Then you can run a `tf.add` operation only on logical device 0.

  >>> @tf.function()
  ... def step_fn(inputs):
  ...   features, _ = inputs
  ...   output = tf.add(features, features)
  ...
  ...   # Add operation will be executed on logical device 0.
  ...   output = strategy.experimental_assign_to_logical_device(output, 0)
  ...   return output
  >>> dist_dataset = strategy.distribute_datasets_from_function(
  ...     dataset_fn)
  >>> iterator = iter(dist_dataset)
  >>> strategy.run(step_fn, args=(next(iterator),))

  `experimental_spmd_xla_partitioning` enables the experimental XLA SPMD feature
  for model parallelism. This flag can reduce the compilation time and HBM
  requirements. When running in this mode, every input tensor must either be
  partitioned (via `strategy.experimental_split_to_logical_devices`) or fully
  replicated (via `strategy.experimental_replicate_to_logical_devices`) to all
  logical devices. And calling `strategy.experimental_assign_to_logical_device`
  will result in a ValueError in this mode.
  NFc              	      sj   t t| t| ||||dud tjdd tjd| j	j
 tjd| j	j d| _dS )aK  Synchronous training in TPU donuts or Pods.

    Args:
      tpu_cluster_resolver: A
        `tf.distribute.cluster_resolver.TPUClusterResolver` instance, which
        provides information about the TPU cluster. If None, it will assume
        running on a local TPU worker.
      experimental_device_assignment: Optional
        `tf.tpu.experimental.DeviceAssignment` to specify the placement of
        replicas on the TPU cluster.
      experimental_spmd_xla_partitioning: If True, enable the SPMD (Single
        Program Multiple Data) mode in XLA compiler. This flag only affects the
        performance of XLA compilation and the HBM requirement of the compiled
        TPU program. Ceveat: if this flag is True, calling
        `tf.distribute.TPUStrategy.experimental_assign_to_logical_device` will
        result in a ValueError.
    N)r&   use_spmd_for_xla_partitioningenable_data_reorderV2TPUStrategynum_workersnum_replicas_per_workerT)superrg   __init__TPUExtendedr   distribution_strategy_gaugeget_cellset#distribution_strategy_replica_gaugeextended	num_hostsnum_replicas_per_host%_enable_packed_variable_in_eager_mode)rI   r   Zexperimental_device_assignmentZ"experimental_spmd_xla_partitioning	__class__r2   r3   ro   [  s&   
	
zTPUStrategyV2.__init__r2   c                 C   H   t | t|||\}}}t|t }|pt }| j	||||S )a3  Run the computation defined by `fn` on each TPU replica.

    Executes ops specified by `fn` on each replica. If `args` or `kwargs` have
    `tf.distribute.DistributedValues`, such as those produced by a
    `tf.distribute.DistributedDataset` from
    `tf.distribute.Strategy.experimental_distribute_dataset` or
    `tf.distribute.Strategy.distribute_datasets_from_function`,
    when `fn` is executed on a particular replica, it will be executed with the
    component of `tf.distribute.DistributedValues` that correspond to that
    replica.

    `fn` may call `tf.distribute.get_replica_context()` to access members such
    as `all_reduce`.

    All arguments in `args` or `kwargs` should either be nest of tensors or
    `tf.distribute.DistributedValues` containing tensors or composite tensors.

    Example usage:

    >>> resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu='')
    >>> tf.config.experimental_connect_to_cluster(resolver)
    >>> tf.tpu.experimental.initialize_tpu_system(resolver)
    >>> strategy = tf.distribute.TPUStrategy(resolver)
    >>> @tf.function
    ... def run():
    ...   def value_fn(value_context):
    ...     return value_context.num_replicas_in_sync
    ...   distributed_values = (
    ...       strategy.experimental_distribute_values_from_function(value_fn))
    ...   def replica_fn(input):
    ...     return input * 2
    ...   return strategy.run(replica_fn, args=(distributed_values,))
    >>> result = run()

    Args:
      fn: The function to run. The output must be a `tf.nest` of `Tensor`s.
      args: (Optional) Positional arguments to `fn`.
      kwargs: (Optional) Keyword arguments to `fn`.
      options: (Optional) An instance of `tf.distribute.RunOptions` specifying
        the options to run `fn`.

    Returns:
      Merged return value of `fn` across replicas. The structure of the return
      value is the same as the return value from `fn`. Each element in the
      structure can either be `tf.distribute.DistributedValues`, `Tensor`
      objects, or `Tensor`s (for example, if running on a single replica).
    
r=   re   	autographZ
tf_convertautograph_ctxZcontrol_status_ctxr   Z
RunOptionsru   tpu_runrI   r<   r`   ra   optionsr2   r2   r3   run  s
   0zTPUStrategyV2.runc                 C      | j jS )a[  Returns the cluster resolver associated with this strategy.

    `tf.distribute.TPUStrategy` provides the associated
    `tf.distribute.cluster_resolver.ClusterResolver`. If the user provides one
    in `__init__`, that instance is returned; if the user does not, a default
    `tf.distribute.cluster_resolver.TPUClusterResolver` is provided.
    ru   _tpu_cluster_resolverrI   r2   r2   r3   cluster_resolver  s   	zTPUStrategyV2.cluster_resolverc                 C   sN   | j jrtd| j jjd }|dk s||krtd||tj||ddS )ay  Adds annotation that `tensor` will be assigned to a logical device.

    This adds an annotation to `tensor` specifying that operations on
    `tensor` will be invoked on logical core device id `logical_device_id`.
    When model parallelism is used, the default behavior is that all ops
    are placed on zero-th logical device.

    ```python

    # Initializing TPU system with 2 logical devices and 4 replicas.
    resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu='')
    tf.config.experimental_connect_to_cluster(resolver)
    topology = tf.tpu.experimental.initialize_tpu_system(resolver)
    device_assignment = tf.tpu.experimental.DeviceAssignment.build(
        topology,
        computation_shape=[1, 1, 1, 2],
        num_replicas=4)
    strategy = tf.distribute.TPUStrategy(
        resolver, experimental_device_assignment=device_assignment)
    iterator = iter(inputs)

    @tf.function()
    def step_fn(inputs):
      output = tf.add(inputs, inputs)

      # Add operation will be executed on logical device 0.
      output = strategy.experimental_assign_to_logical_device(output, 0)
      return output

    strategy.run(step_fn, args=(next(iterator),))
    ```

    Args:
      tensor: Input tensor to annotate.
      logical_device_id: Id of the logical core to which the tensor will be
        assigned.

    Raises:
      ValueError: The logical device id presented is not consistent with total
      number of partitions specified by the device assignment or the TPUStrategy
      is constructed with `experimental_spmd_xla_partitioning=True`.

    Returns:
      Annotated tensor with identical value as `tensor`.
    zCannot assign a tensor to a logical device in SPMD mode. To disable SPMD, Please construct the TPUStrategy with `experimental_spmd_xla_partitioning=False`   r   z`logical_core_id` to assign must be lower then total number of logical devices per replica. Received logical device id {} but there are only total of {} logical devices in replica.TZuse_sharding_op)ru   _use_spmd_for_xla_partitioningr[   _tpu_devicesshapeformatr   Zassign_device)rI   tensorlogical_device_idnum_logical_devices_per_replicar2   r2   r3   %experimental_assign_to_logical_device  s    .z3TPUStrategyV2.experimental_assign_to_logical_devicec                 C   s   | j jjd }t|}|j}t|}|t|kr#td|t|t|D ]\}}|du r0q'|| }	||	 dkrCtd||	|q'||krQtd|||t	|
|}
tj||
ddS )	a  Adds annotation that `tensor` will be split across logical devices.

    This adds an annotation to tensor `tensor` specifying that operations on
    `tensor` will be split among multiple logical devices. Tensor `tensor` will
    be split across dimensions specified by `partition_dimensions`.
    The dimensions of `tensor` must be divisible by corresponding value in
    `partition_dimensions`.

    For example, for system with 8 logical devices, if `tensor` is an image
    tensor with shape (batch_size, width, height, channel) and
    `partition_dimensions` is [1, 2, 4, 1], then `tensor` will be split
    2 in width dimension and 4 way in height dimension and the split
    tensor values will be fed into 8 logical devices.

    ```python
    # Initializing TPU system with 8 logical devices and 1 replica.
    resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu='')
    tf.config.experimental_connect_to_cluster(resolver)
    topology = tf.tpu.experimental.initialize_tpu_system(resolver)
    device_assignment = tf.tpu.experimental.DeviceAssignment.build(
        topology,
        computation_shape=[1, 2, 2, 2],
        num_replicas=1)
    # Construct the TPUStrategy. Since we are going to split the image across
    # logical devices, here we set `experimental_spmd_xla_partitioning=True`
    # so that the partitioning can be compiled in SPMD mode, which usually
    # results in faster compilation and smaller HBM requirement if the size of
    # input and activation tensors are much bigger than that of the model
    # parameters. Note that this flag is suggested but not a hard requirement
    # for `experimental_split_to_logical_devices`.
    strategy = tf.distribute.TPUStrategy(
        resolver, experimental_device_assignment=device_assignment,
        experimental_spmd_xla_partitioning=True)

    iterator = iter(inputs)

    @tf.function()
    def step_fn(inputs):
      inputs = strategy.experimental_split_to_logical_devices(
        inputs, [1, 2, 4, 1])

      # model() function will be executed on 8 logical devices with `inputs`
      # split 2 * 4  ways.
      output = model(inputs)
      return output

    strategy.run(step_fn, args=(next(iterator),))
    ```
    Args:
      tensor: Input tensor to annotate.
      partition_dimensions: An unnested list of integers with the size equal to
        rank of `tensor` specifying how `tensor` will be partitioned. The
        product of all elements in `partition_dimensions` must be equal to the
        total number of logical devices per replica.

    Raises:
      ValueError: 1) If the size of partition_dimensions does not equal to rank
        of `tensor` or 2) if product of elements of `partition_dimensions` does
        not match the number of logical devices per replica defined by the
        implementing DistributionStrategy's device specification or
        3) if a known size of `tensor` is not divisible by corresponding
        value in `partition_dimensions`.

    Returns:
      Annotated tensor with identical value as `tensor`.
    r   zvLength of `partition_dimensions` must equal to the rank of `tensor.shape` ({}). Received len(partition_dimensions)={}.Nr   zTensor shape at `partition_dimensions[{}]` must be divisible by corresponding value specified by `partition_dimensions` ({}). Received: {}.zThe product of `partition_dimensions` should be the same as the number of logical devices (={}). Received `partition_dimensions`={},and their product is {}.Tr   )ru   r   r   npprodr\   r[   r   rP   Zarangereshaper   tile)rI   r   Zpartition_dimensionsr   Znum_partition_splitsZinput_shapeZtensor_rankZ	dim_indexZdim_sizeZ
split_sizeZtile_assignmentr2   r2   r3   %experimental_split_to_logical_devices  sB   C

z3TPUStrategyV2.experimental_split_to_logical_devicesc                 C   s   t j|ddS )a  Adds annotation that `tensor` will be replicated to all logical devices.

    This adds an annotation to tensor `tensor` specifying that operations on
    `tensor` will be invoked on all logical devices.

    ```python
    # Initializing TPU system with 2 logical devices and 4 replicas.
    resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu='')
    tf.config.experimental_connect_to_cluster(resolver)
    topology = tf.tpu.experimental.initialize_tpu_system(resolver)
    device_assignment = tf.tpu.experimental.DeviceAssignment.build(
        topology,
        computation_shape=[1, 1, 1, 2],
        num_replicas=4)
    strategy = tf.distribute.TPUStrategy(
        resolver, experimental_device_assignment=device_assignment)

    iterator = iter(inputs)

    @tf.function()
    def step_fn(inputs):
      images, labels = inputs
      images = strategy.experimental_split_to_logical_devices(
        inputs, [1, 2, 4, 1])

      # model() function will be executed on 8 logical devices with `inputs`
      # split 2 * 4  ways.
      output = model(inputs)

      # For loss calculation, all logical devices share the same logits
      # and labels.
      labels = strategy.experimental_replicate_to_logical_devices(labels)
      output = strategy.experimental_replicate_to_logical_devices(output)
      loss = loss_fn(labels, output)

      return loss

    strategy.run(step_fn, args=(next(iterator),))
    ```
    Args:
      tensor: Input tensor to annotate.

    Returns:
      Annotated tensor with identical value as `tensor`.
    Tr   )r   	replicate)rI   r   r2   r2   r3   )experimental_replicate_to_logical_devicesl  s   .z7TPUStrategyV2.experimental_replicate_to_logical_devices)NNFr2   NN)__name__
__module____qualname____doc__ro   r   propertyr   r   r   r   __classcell__r2   r2   ry   r3   rg      s    f
(:

?erg   z#distribute.experimental.TPUStrategyc                       s<   e Zd ZdZ		d
 fdd	ZdddZedd	 Z  ZS )rk   a  Synchronous training on TPUs and TPU Pods.

  To construct a TPUStrategy object, you need to run the
  initialization code as below:

  >>> resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu='')
  >>> tf.config.experimental_connect_to_cluster(resolver)
  >>> tf.tpu.experimental.initialize_tpu_system(resolver)
  >>> strategy = tf.distribute.experimental.TPUStrategy(resolver)

  While using distribution strategies, the variables created within the
  strategy's scope will be replicated across all the replicas and can be kept in
  sync using all-reduce algorithms.

  To run TF2 programs on TPUs, you can either use `.compile` and
  `.fit` APIs in `tf.keras` with TPUStrategy, or write your own customized
  training loop by calling `strategy.run` directly. Note that
  TPUStrategy doesn't support pure eager execution, so please make sure the
  function passed into `strategy.run` is a `tf.function` or
  `strategy.run` is called inside a `tf.function` if eager
  behavior is enabled.
  Nc                    sr   t d tt| t| |||dud tjd	d tj
d	| jj tj
d	| jj d| _dS )	aP  Synchronous training in TPU donuts or Pods.

    Args:
      tpu_cluster_resolver: A tf.distribute.cluster_resolver.TPUClusterResolver,
        which provides information about the TPU cluster.
      device_assignment: Optional `tf.tpu.experimental.DeviceAssignment` to
        specify the placement of replicas on the TPU cluster.
    z`tf.distribute.experimental.TPUStrategy` is deprecated, please use the non-experimental symbol `tf.distribute.TPUStrategy` instead.N)r&   ri   rj   rk   rl   rm   T)r   warningrn   rk   ro   rp   r   rq   rr   rs   rt   ru   rv   rw   rx   )rI   r   r&   ry   r2   r3   ro     s*   

zTPUStrategy.__init__r2   c                 C   r{   )zSee base class.r|   r   r2   r2   r3   r     s
   zTPUStrategy.runc                 C   r   )al  Returns the cluster resolver associated with this strategy.

    `tf.distribute.experimental.TPUStrategy` provides the
    associated `tf.distribute.cluster_resolver.ClusterResolver`. If the user
    provides one in `__init__`, that instance is returned; if the user does
    not, a default
    `tf.distribute.cluster_resolver.TPUClusterResolver` is provided.
    r   r   r2   r2   r3   r     s   
zTPUStrategy.cluster_resolver)NNr   )	r   r   r   r   ro   r   r   r   r   r2   r2   ry   r3   rk     s    
$rk   c                       s>   e Zd ZdZ			d
 fdd	Zedd Zddd	Z  ZS )TPUStrategyV1z)TPU distribution strategy implementation.Nc                    sb   t t| t| ||| tjdd tjd| j	j
 tjd| j	j d| _dS )a  Initializes the TPUStrategy object.

    Args:
      tpu_cluster_resolver: A tf.distribute.cluster_resolver.TPUClusterResolver,
          which provides information about the TPU cluster.
      steps_per_run: Number of steps to run on device before returning to the
          host. Note that this can have side-effects on performance, hooks,
          metrics, summaries etc.
          This parameter is only used when Distribution Strategy is used with
          estimator or keras.
      device_assignment: Optional `tf.tpu.experimental.DeviceAssignment` to
          specify the placement of replicas on the TPU cluster. Currently only
          supports the usecase of using a single core within a TPU cluster.
    ZV1rk   rl   rm   TN)rn   r   ro   rp   r   rq   rr   rs   rt   ru   rv   rw   rx   )rI   r   steps_per_runr&   ry   r2   r3   ro     s   
zTPUStrategyV1.__init__c                 C   r   )z0DEPRECATED: use .extended.steps_per_run instead.)Z	_extendedr   r   r2   r2   r3   r     s   zTPUStrategyV1.steps_per_runr2   c                 C   r{   )a  Run `fn` on each replica, with the given arguments.

    Executes ops specified by `fn` on each replica. If `args` or `kwargs` have
    "per-replica" values, such as those produced by a "distributed `Dataset`",
    when `fn` is executed on a particular replica, it will be executed with the
    component of those "per-replica" values that correspond to that replica.

    `fn` may call `tf.distribute.get_replica_context()` to access members such
    as `all_reduce`.

    All arguments in `args` or `kwargs` should either be nest of tensors or
    per-replica objects containing tensors or composite tensors.

    Users can pass strategy specific options to `options` argument. An example
    to enable bucketizing dynamic shapes in `TPUStrategy.run`
    is:

    >>> resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu='')
    >>> tf.config.experimental_connect_to_cluster(resolver)
    >>> tf.tpu.experimental.initialize_tpu_system(resolver)
    >>> strategy = tf.distribute.experimental.TPUStrategy(resolver)

    >>> options = tf.distribute.RunOptions(
    ...     experimental_bucketizing_dynamic_shape=True)

    >>> dataset = tf.data.Dataset.range(
    ...    strategy.num_replicas_in_sync, output_type=dtypes.float32).batch(
    ...        strategy.num_replicas_in_sync, drop_remainder=True)
    >>> input_iterator = iter(strategy.experimental_distribute_dataset(dataset))

    >>> @tf.function()
    ... def step_fn(inputs):
    ...  output = tf.reduce_sum(inputs)
    ...  return output

    >>> strategy.run(step_fn, args=(next(input_iterator),), options=options)

    Args:
      fn: The function to run. The output must be a `tf.nest` of `Tensor`s.
      args: (Optional) Positional arguments to `fn`.
      kwargs: (Optional) Keyword arguments to `fn`.
      options: (Optional) An instance of `tf.distribute.RunOptions` specifying
        the options to run `fn`.

    Returns:
      Merged return value of `fn` across replicas. The structure of the return
      value is the same as the return value from `fn`. Each element in the
      structure can either be "per-replica" `Tensor` objects or `Tensor`s
      (for example, if running on a single replica).
    r|   r   r2   r2   r3   r     s
   3zTPUStrategyV1.run)NNNr   )	r   r   r   r   ro   r   r   r   r   r2   r2   ry   r3   r     s    
r   c                       s  e Zd ZdZ					d\ fdd	Zdd Zdd	 Zd
d Zej	j
fddZdd Zdd Zdd Zdd Zdd Zdd Z	d]ddZdd Zejdd Zd d! Zd"d# Zed$d% Zd&d' Zd(d) Zd*d+ Zd,d- Zd.d/ Zd0d1 Z d2d3 Z!d4d5 Z"ed6d7 Z#ed8d9 Z$ed:d; Z%ed<d= Z&ed>d? Z'ed@dA Z(edBdC Z)edDdE Z*edFdG Z+edHdI Z,dJdK Z-dLdM Z.				d^dNdOZ/dPdQ Z0edRdS Z1d]dTdUZ2dVdW Z3dXdY Z4dZd[ Z5  Z6S )_rp   zImplementation of TPUStrategy.NFc                    s   t t| | |d u rtd}|d u rd}t | _|| _| j	 | _
|| _dd | j
jD }|d u rCtjdd |D td| _n8tj|d j}g }	t|jD ]}
g }t|jD ]}|t|j|
||d q\|	| qStj|	td| _t| jd d | _t | _ t | _!| jd d df D ]%}t|}| j "|g  | j | | | j!"|g  | j!| | q|r| #| jd d df nd | _$|| _%d	| _&d	| _'dg| _(t)* rt+,t)j- | | _.|| _/d
| _0| jd d | j(d f }|D ]}t)1|rd	| _0 d S qd S )N r   c                 S   s   g | ]
}d |j v r|j qS )zdevice:TPU:)rS   rD   dr2   r2   r3   
<listcomp>z  s
    
z(TPUExtended.__init__.<locals>.<listcomp>c                 S   s   g | ]}|gqS r2   r2   r   r2   r2   r3   r     s    dtyper   )replicalogical_corejobTF)2rn   rp   ro   tpu_cluster_resolver_libZTPUClusterResolverweakrefWeakKeyDictionary_tpu_function_cacher   Zget_tpu_system_metadata_tpu_metadata_device_assignmentdevicesr   arrayobjectr   r   ZDeviceSpecV2from_stringr   rangenum_replicasnum_cores_per_replicarW   r   canonicalize
tpu_deviceZget_host_for_device_host_devicecollectionsOrderedDict_device_input_worker_devices_host_input_worker_devices
setdefault_get_replica_order_replica_orderr   Z_require_static_shapesZ(experimental_enable_get_next_as_optional_logical_device_stackr   r1   atexitregisterZ
async_waitZ_use_var_policyr   _using_custom_deviceZis_custom_device)rI   Zcontainer_strategyr   r   r&   rh   ri   Ztpu_devices_flatZjob_nametpu_devices
replica_idZreplica_devicesr   r   host_devicer   r   ry   r2   r3   ro   ^  sz   	





zTPUExtended.__init__c                 C   sV   g }t |D ]\}}tj|}||j|j|j|j|j	f|f qdd t
|D S )a  Get the replica order based on the tpu device order.

    For example, if the tpu_devices are:
    '/job:worker/replica:0/task:0/device:TPU:0',
    '/job:worker/replica:0/task:0/device:TPU:2',
    '/job:worker/replica:0/task:1/device:TPU:0',
    '/job:worker/replica:0/task:1/device:TPU:2',
    '/job:worker/replica:0/task:1/device:TPU:6',
    '/job:worker/replica:0/task:1/device:TPU:4',
    '/job:worker/replica:0/task:0/device:TPU:6',
    '/job:worker/replica:0/task:0/device:TPU:4',

    the returned replica order will be:
    [0, 1, 7, 6, 2, 3, 5, 4]

    This replica order will be used to reorder the data returned by the
    iterators,
    so that they can be placed on the same node as their computation graphs.

    Args:
      tpu_devices (List[str]): A list of tpu device names in the order of
        replicas.

    Returns:
      A list containing the order ids of corresponding TPU devices.
    c                 S   s   g | ]\}}|qS r2   r2   )rD   _rb   r2   r2   r3   r     s    z2TPUExtended._get_replica_order.<locals>.<listcomp>)rP   	tf_deviceZ
DeviceSpecr   rW   r   r   Zdevice_typetaskZdevice_indexsorted)rI   r   Zdevices_with_idsrb   r   specr2   r2   r3   r     s   
zTPUExtended._get_replica_orderc                 C   s   t ||  d S r5   )r	   Zvalidate_colocate)rI   Zcolocate_with_variabler2   r2   r3    _validate_colocate_with_variable  s   z,TPUExtended._validate_colocate_with_variablec                 C   s,   t t| j }tj|||  | jdS )z)Make iterators for each of the TPU hosts.)num_replicas_in_sync)	r
   InputWorkerstupler   rO   input_lib_v1ZDatasetIterator_container_strategy_num_replicas_in_sync)rI   datasetinput_workersr2   r2   r3   _make_dataset_iterator  s   z"TPUExtended._make_dataset_iteratorc                 C   sX   g }t t| j }|j}t|D ]}|tj	||| j
d qt||||  S )NZnum_input_pipelinesZinput_pipeline_idr   )r
   r   r   r   rO   rl   r   rW   r   InputContextr   r   ZInputFunctionIteratorr   )rI   Zinput_fnZreplication_modeinput_contextsr   rl   rb   r2   r2   r3   _make_input_fn_iterator  s"   z#TPUExtended._make_input_fn_iteratorc                 C   s   t |t | j|S r5   )r   Zone_host_numpy_datasetSingleDevicer   )rI   Znumpy_inputsessionr2   r2   r3    _experimental_make_numpy_dataset  s   z,TPUExtended._experimental_make_numpy_datasetc                 C   s2   |r|j rtt| j S tt| j S r5   )experimental_fetch_to_devicer
   r   r   r   rO   r   )rI   r   r2   r2   r3   _get_input_workers  s   
zTPUExtended._get_input_workersc                 C   sT   t |tjr	|j}t|}|D ]\}}t |tjtj	fr't
d|t|qd S )Na>  Found tensor {} with spec {}. TPUStrategy does not support distributed datasets with device prefetch when using sparse or ragged tensors. If you intend to use sparse or ragged tensors, please pass a tf.distribute.InputOptions object with experimental_fetch_to_device set to False to your dataset distribution function.)r7   r   ZPerReplicaSpecZ_component_specsr,   Z flatten_with_joined_string_pathsr   ZSparseTensorSpecr$   ZRaggedTensorSpecr[   r   type)rI   element_specspecspathr   r2   r2   r3   _check_spec   s   
zTPUExtended._check_specc                 C   sX   |r|j tjjkrtd|d u s|jr| |j tj	|| 
||  | j|| jdS )NzgInputReplicationMode.PER_REPLICA is only supported in `experimental_distribute_datasets_from_function`.)r   r   replica_order)experimental_replication_moder   InputReplicationModePER_REPLICAr;   r   r   r   r   Zget_distributed_datasetr   r   r   r   )rI   r   r   r2   r2   r3    _experimental_distribute_dataset/  s    z,TPUExtended._experimental_distribute_datasetc                 C   s   |r|j tjjkrtd| |}g }|j}t|D ]}|tj	||| j
d qtj||||  || jd}|d u s?|jrE| |j |S )NzInputReplicationMode.PER_REPLICA is only supported in  `experimental_distribute_datasets_from_function` of tf.distribute.MirroredStrategyr   )r   r   )r   r   r   r   r;   r   rl   r   rW   r   r   r   Z&get_distributed_datasets_from_functionr   r   r   r   r   )rI   Z
dataset_fnr   r   r   rl   rb   Zdistributed_datasetr2   r2   r3   "_distribute_datasets_from_functionC  s4   


z.TPUExtended._distribute_datasets_from_functionc                 C   s:   g }t | jD ]}||t|| j qtj|ddS )NT)Zalways_wrap)r   r   rW   r   ZValueContextr	   regroup)rI   Zvalue_fnZper_replica_valuesr   r2   r2   r3   -_experimental_distribute_values_from_functionb  s   z9TPUExtended._experimental_distribute_values_from_functionc                    s  |d u ri }t |}t   fddt  _fdd}t|t	s,J |j
 }tj jdkrA| }nt|||}W d    n1 sRw   Y  `t| _t|t	rdd |D tj
 fddtD ng t   S )	Nc                    s\    | }t  j}|r,t|g dd |D W  d   S 1 s%w   Y  dS |S )zSingle step on the TPU device.c                 S   s   g | ]}t |qS r2   )r   identity)rD   fr2   r2   r3   r   {  s    zSTPUExtended._experimental_run_steps_on_iterator.<locals>.run_fn.<locals>.<listcomp>N)r,   r?   last_step_outputsr   Zcontrol_dependencies)inputsZ	fn_resultZflat_last_step_outputs)ctxr<   r2   r3   run_fnu  s   
$z?TPUExtended._experimental_run_steps_on_iterator.<locals>.run_fnc                     sx   ~   }g }tjD ]  fdd}|t||f qtj|jtj	j
dd}t|d tr:t|}|S )z%The rewritten step fn running on TPU.c                    s   t  | S r5   )r	   select_replicarA   r   r2   r3   <lambda>  s    zUTPUExtended._experimental_run_steps_on_iterator.<locals>.rewrite_fn.<locals>.<lambda>rh   )r&   xla_optionsr   )Zget_nextr   r   rW   r,   map_structurer'   r   r   
XLAOptionsr   r7   listr?   )r`   Zper_replica_inputsreplicate_inputsr   replicate_outputs)multi_worker_iteratorr   rI   r   r3   
rewrite_fn  s&   


zCTPUExtended._experimental_run_steps_on_iterator.<locals>.rewrite_fnr   c                 S      g | ]
}t |tjs|qS r2   r7   r   	Operation)rD   rA   r2   r2   r3   r     s
    zCTPUExtended._experimental_run_steps_on_iterator.<locals>.<listcomp>c                    s   g | ]	} |d  qS r5   r2   )rD   rb   )last_step_tensor_outputs
output_numr2   r3   r     s    )r,   r?   r
   ZMultiStepContextr   Zget_default_graphZ_get_control_flow_contextZ_outer_control_flow_contextr7   r   r   r   r   r   r)   repeatr    groupZrun_opr\   r   _set_last_step_outputs)rI   r<   r  Z
iterationsZinitial_loop_valuesr  r   r2   )r   r<   r  r  r  r   rI   r3   #_experimental_run_steps_on_iteratorm  s>   





z/TPUExtended._experimental_run_steps_on_iteratorc                 C   s>   t |   ||i |W  d    S 1 sw   Y  d S r5   )_TPUReplicaContextr   )rI   r<   r`   ra   r2   r2   r3   _call_for_each_replica  s   $z"TPUExtended._call_for_each_replicac                 c   s    | j jd }||krtd||| j| z=t du r$dV  n#t	t
| dV  W d   n1 s:w   Y  W | j  dS W | j  dS W | j  dS | j  w )9Places variables and ops on the specified logical device.r   z]`logical_device_id` not in range (was {}, but there are only {} logical devices per replica).N)r   r   r[   r   r   rW   r   enclosing_tpu_contextr   r   r'   corepop)rI   r   r   r2   r2   r3   experimental_logical_device  s*   z'TPUExtended.experimental_logical_devicec                 C   s   t | j dS )zExperimental method added to be used by Estimator.

    This is a private method only to be used by Estimator. Other frameworks
    should directly be calling `tf.tpu.experimental.initialize_tpu_system`
    N)r   Zinitialize_tpu_systemr   r   r2   r2   r3   _experimental_initialize_system  s   z+TPUExtended._experimental_initialize_systemc           	         s  | ddrdi |S | dd}|dur|fi |S | dd}|du r6jddjd f n't|tjrZt|j di |W  d   S 1 sTw   Y  n|jjj	\fddfd	d
 fddfdd} fdd}j
st rjrdkr|}n|}d|d< njrdkr }n}tj |tjtjfi |}j
st rt|dj |S )z?Create a TPUMirroredVariable. See `DistributionStrategy.scope`.Zskip_mirrored_creatorFcustom_tpu_variable_creatorNcolocate_withr   c               
      s  d}g }t  D ]\}}t|p |dkr6| d }t  t|r%| n|}W d   n1 s1w   Y  |dkrL|d jdd }d||f | d< || d< ttj	 di | }W d   n1 shw   Y  t
|tjruJ || W d   n1 sw   Y  q|S )Returns a list of `tf.Variable`s.

      The list contains `number_replicas` `tf.Variable`s and can be used to
      initialize a `TPUMirroredVariable`.

      Args:
        **kwargs: the keyword arguments for creating a variable
      Nr   initial_value:%s/replica_%d/rS   r2   )rP   r   r   r6   r9   rS   splitr   device_policyDEVICE_PLACEMENT_SILENTr7   r   TPUMirroredVariablerW   )ra   r  
value_listrb   r   var0namerF   )r   next_creatorr2   r3   _create_mirrored_tpu_variables  s2   	
zDTPUExtended._create_variable.<locals>._create_mirrored_tpu_variablesc            	   
      s   | d }t   t|r| n|}W d   n1 sw   Y  g }tD ]I}g }tD ],}tj| |  || d<  di | }W d   n1 sPw   Y  || q.d| d |}tj	||d}|| q&|S )  Returns a list of `TPUReplicatedVariable`s.

      The list consists of `num_replicas` `TPUReplicatedVariable`s and can be
      used to initialize a `TPUMirroredVariable`. Each `TPUReplicatedVariable`
      contains a list of `tf.Variable`s which are replicated to
      `num_cores_per_replica` logical cores to enable XLA SPMD compilation.

      Args:
        **kwargs: the keyword arguments for creating a variable
      r  N{}/r:{}rS   r#   rS   r2   )
r6   r9   r   r   r   r   rW   r   r   TPUReplicatedVariable)	ra   r  mirrored_replicated_var_listr   replicated_var_listlogic_core_idrF   replica_nametpu_replicated_var)r   r   r   rI   r2   r3   )_create_mirrored_tpu_replicated_variables,  s.   
zOTPUExtended._create_variable.<locals>._create_mirrored_tpu_replicated_variablesc                     s.   t jdi | } j| t|d j |S )N_lazy_scoper2   )r   ZTPUUninitializedVariablelazy_variable_trackerZadd_uninitialized_varsetattr)ra   Zuninitialized_variabler   r2   r3   uninitialized_variable_creatorR  s   zDTPUExtended._create_variable.<locals>.uninitialized_variable_creatorc               
      s  |  dddu r d
i | S g }tD ]\}}t| |dkrR|  dd}t  |durCt|r8| }tj||  ddd}W d   n1 sMw   Y  |dkrh|d jdd }d||f | d< || d< |  dddu r{| d j	| d< |  d	ddu r| d j
| d	< ttj d
i | }W d   n1 sw   Y  t|tjrJ || W d   n1 sw   Y  q|S )r  r  Nr   r   r   r  r  rS   r   r2   )getrP   r   r   r6   r9   convert_to_tensorrS   r  r   r   r   r  r  r7   r   r  rW   )ra   r  rb   r   r  r  rF   )r!  r   r/  r2   r3   ,_create_uninitialized_mirrored_tpu_variables[  s@   	!zRTPUExtended._create_variable.<locals>._create_uninitialized_mirrored_tpu_variablesc               
      sT  |  dd}|  dd}|  dd}|du r d	i | S t 4 |durMt|r,| }tj||d}|| d< |du rB| d j| d< |du rM| d j| d< W d   n1 sWw   Y  g }tD ]E}g }tD ](}tj	| |  d	i | }W d   n1 sw   Y  |
| qjd| d |}	tj||	d}
|
|
 qb|S )
r"  r   Nr   r  r   r#  rS   r$  r2   )r0  r6   r9   r   r1  r   r   r   r   r   rW   r   r   r%  )ra   r   r   r  r&  r   r'  r(  rF   r)  r*  )r+  r   r   rI   r/  r2   r3   7_create_uninitialized_mirrored_tpu_replicated_variables  sB   z]TPUExtended._create_variable.<locals>._create_uninitialized_mirrored_tpu_replicated_variablesr   TZ!experimental_batch_initializationr,  r2   )r  r   r   r7   r   r   r   r   Z_devicesr   r   r4   r   r	   Zcreate_mirrored_variabler   ZTPU_VARIABLE_CLASS_MAPPINGZTPU_VARIABLE_POLICY_MAPPINGr.  r-  )	rI   r   ra   r  r  r2  r3  Zreal_creatorZmirrored_variabler2   )r+  r!  r   r   r   r   rI   r/  r3   _create_variable  sP   "%&	22
zTPUExtended._create_variablec                 C   s   t | dd st | _| jS )N_lazy_variable_tracker)getattrr   ZLazyVariableTrackerr5  r   r2   r2   r3   r-    s   
z!TPUExtended.lazy_variable_trackerc                    s    fdd}t d|S )Nc              	      sd   t  } j D ] }t| | |i |||< W d    n1 s$w   Y  q	t  |S r5   )	r   r   r   keysr   r   r   ZPerWorkerResourcer   )r   r`   ra   Zhost_to_tabler   r   r2   r3   lookup_creator  s   z;TPUExtended._resource_creator_scope.<locals>.lookup_creatorZStaticHashTable)r   Zresource_creator_scope)rI   r8  r2   r   r3   _resource_creator_scope  s   	z#TPUExtended._resource_creator_scopec                    s   t  tjs S t j}t  tjr% jd ur%t fdd jjD }t jtkr4t	j
||d}n*t	j
|d t |d}ttt|td D ]}t	j
|g|||t d   |d}qI| ||}|S )Nc                 3       | ]	} j |V  qd S r5   _packed_variableZ	on_devicer   valuer2   r3   rM     
    

z8TPUExtended._gather_to_implementation.<locals>.<genexpr>axisr   )r7   r   DistributedValuesr   r@   r<  r   r\   _XLA_OP_BY_OP_INPUTS_LIMITr   concatr   _broadcast_output)rI   r>  destinationsrA  r   r  outputrb   r2   r=  r3   _gather_to_implementation  s4   

z%TPUExtended._gather_to_implementationc                 C   s   t |}t|dkr9t|d }t| j}||kr7t| t	|}W d    |S 1 s2w   Y  |S t 
||}|S )Nr   r   )cross_device_ops_libZget_devices_fromr\   r   r   r   r   r   r   r   Zsimple_broadcast)rI   rF  rG  r   Zdest_canonicalZhost_canonicalr2   r2   r3   rE    s   

zTPUExtended._broadcast_outputc                    sJ  t  tjst r4t d ur4|tjj	kr!t
d| j   n|tjjkr/td| dt S t  tjsCt| || jS  j}t  tjr^ jd ur^t fdd jjD }t jtkrkt
|}n$tj|d |d jd}tdt|tD ]}|t
|||t  7 }q|tjj	kr|dt| 9 }| ||}|S )Ng      ?z`reduce_op`=z[ is not supported. Currently we only support ReduceOp.SUM and ReduceOp.MEAN in TPUStrategy.c                 3   r:  r5   r;  r   r=  r2   r3   rM   7  r?  z)TPUExtended._reduce_to.<locals>.<genexpr>r   r   )r7   r   rB  r   
is_tf_typer   r  r   ReduceOpZMEANr!   Z
scalar_mulr   SUMr;   r*   Zcross_replica_sumrI  Zreduce_non_distributed_valuer@   r<  r   r   r\   rC  Zadd_nr   Z
zeros_liker   r   rE  )rI   	reduce_opr>  rF  r   r  rG  rb   r2   r=  r3   
_reduce_to  sF   



	zTPUExtended._reduce_toc                 C   s  t |tjst |tjsJ t d ur-|r!||g|R i |S ||g|R i |fS |j}|d urQt	 sQ|rE||g|R i |S ||g|R i |fS g }g }|d urg|j
D ]	}	|||	f q\n|jD ]
}
||
|
jf qj|jtjjkr|jtjjkrt| t| t|D ]e\}}|d }
|d }	d| }t|	H t|3 t| |||
gt||R i t|| W d    n1 sw   Y  W d    n1 sw   Y  W d    n1 sw   Y  qt| ||S )Nr   r   z	update_%d)r7   r   TPUVariableMixinr"   BaseResourceVariabler   r  r<  r   r1   r   rW   r   r   Zsynchronizationvariables_libZVariableSynchronizationZON_READZaggregationZVariableAggregationNONEr	   Zassert_mirroredrP   r   r   UpdateContextZ
name_scoper   Zupdate_regroup)rI   varr<   r`   ra   r	  Z
packed_varZupdatesZvalues_and_devicesr   r>  rb   Zvalue_and_devicerS   r2   r2   r3   _updateM  s\   




  zTPUExtended._updatec                 C   s$   t |tjst |tjsJ | S r5   )r7   r   rO  r"   rP  Z
read_value)rI   rT  r2   r2   r3   read_var{  s   zTPUExtended.read_varc                 C      |S r5   r2   )rI   r>  r2   r2   r3   value_container     zTPUExtended.value_containerc                    sX   ~t  ttfr
 S t d ur* fddt| jD }tj|dd| jd}|d S  S )Nc                    s   g | ]} qS r2   r2   )rD   r   r   r2   r3   r     s    z-TPUExtended._broadcast_to.<locals>.<listcomp>r   Zconcat_dimensionZsplit_dimensionZsplit_count)	r7   floatintr   r  r   r   r*   
all_to_all)rI   r   rF  Zbroadcast_tensorresultr2   rZ  r3   _broadcast_to  s   zTPUExtended._broadcast_toc                    s4    j d u r	 jjS tt fddt j jD S )Nc                    s   g | ]} j |qS r2   )r   r   )rD   rr   r2   r3   r     s    z)TPUExtended.num_hosts.<locals>.<listcomp>)r   r   rv   r\   rs   r   r   r   r2   r   r3   rv     s
   


zTPUExtended.num_hostsc                 C   s0   | j d u r	| jjS | jj| j j }t| j j|S r5   )r   r   Znum_of_cores_per_hostr   minr   )rI   Zmax_models_per_hostr2   r2   r3   rw     s   
z!TPUExtended.num_replicas_per_hostc                 C   s   | j d u r	| jjS | j jS r5   )r   r   Z	num_coresr   r   r2   r2   r3   r     s   
z!TPUExtended._num_replicas_in_syncc                 C      dS )NFr2   r   r2   r2   r3   experimental_between_graph     z&TPUExtended.experimental_between_graphc                 C   rc  NTr2   r   r2   r2   r3   experimental_should_init  re  z$TPUExtended.experimental_should_initc                 C   rc  rf  r2   r   r2   r2   r3   should_checkpoint  re  zTPUExtended.should_checkpointc                 C   rc  rf  r2   r   r2   r2   r3   should_save_summary  re  zTPUExtended.should_save_summaryc                 C   s   t | jd d | jd f S )Nr   )r   r   r   r   r2   r2   r3   worker_devices  s   zTPUExtended.worker_devicesc                 C      | j S r5   )rj  r   r2   r2   r3   parameter_devices  s   zTPUExtended.parameter_devicesc                 C   s   t | jj S )z7Return the `tf.tpu.experimental.HardwareFeature` class.)r(   ZHardwareFeaturer   r   r2   r2   r3   r(     s   z TPUExtended.tpu_hardware_featurec                 C   rk  r5   )r   )rI   Zvar_listr2   r2   r3   non_slot_devices  s   zTPUExtended.non_slot_devicesc              	   C   s   ~t | jE td / ||i |}|r'|W  d    W  d    S t| j|W  d    W  d    S 1 s?w   Y  W d    d S 1 sOw   Y  d S r5   )r   r   r   r   rS  r,   r   Z_local_results)rI   r  r<   r`   ra   r	  r_  r2   r2   r3   _update_non_slot  s   RzTPUExtended._update_non_slotc                 C   s"   ~~~|r| | | d S d S r5   )CopyFrom_update_config_proto)rI   Zsession_configcluster_specZ	task_typetask_idr2   r2   r3   
_configure  s   zTPUExtended._configurec                 C   s2   t |}d|_| j }|r|j|  |S rf  )copydeepcopyZisolate_session_stater   rq  Zcluster_defro  Zas_cluster_def)rI   Zconfig_protoZupdated_configrq  r2   r2   r3   rp    s   

z TPUExtended._update_config_protoc                 C   rc  )z`make_dataset_iterator` and `make_numpy_iterator` use global batch size.

    `make_input_fn_iterator` assumes per-replica batching.

    Returns:
      Boolean.
    Tr2   r   r2   r2   r3   _global_batch_size  s   	zTPUExtended._global_batch_sizec                 C   s   |  ||}|||S r5   )_tpu_function_creator)rI   r<   r`   ra   r   funcr2   r2   r3   r     s   
zTPUExtended.tpu_runc                    sV   t  r jv rj  S   fdd}t  r)t|}|j < |S )Nc              	      s  t dd| | |du ri }g gfdd}g }tjD ]}|tj|tjdt	
|| t	
||g q jr||r|g }t|d }|D ])}t|rU|jj}nt|}|du retd|tdg| }	||	 qIt|d |}nd}jrtjj}
nd}
  jptjj d	}tj!||j"||
|d
}W d   n1 sw   Y  dd  t#d t$rň d d< d du st#d t%j&rdgt'| }n
 fdd|D }t	(|S )z3TF Function used to replicate the user computation.r   z8`TPUStrategy.run` is called with [args: %s] [kwargs: %s]Nc                    sL   t | d  |i |d< W d   d S 1 sw   Y  d S )z>Wraps user function to provide replica ID and `Tensor` inputs.replica_id_in_sync_groupr   N)r  )r   Zreplica_argsZreplica_kwargs)r<   r_  strategyr2   r3   replicated_fn  s   
zNTPUExtended._tpu_function_creator.<locals>.tpu_function.<locals>.replicated_fnr   r   zKinput tensor {} to TPUStrategy.run() has unknown rank, which is not allowedr   )r&   maximum_shapespadding_specr   c                 S   s   dd | D S )Nc                 S   r  r2   r  )rD   or2   r2   r3   r   F  s    z]TPUExtended._tpu_function_creator.<locals>.tpu_function.<locals>.<lambda>.<locals>.<listcomp>r2   r   r2   r2   r3   r   F  s    zITPUExtended._tpu_function_creator.<locals>.tpu_function.<locals>.<lambda>c              	      s&   g | ]}t d   t |qS r   )r,   pack_sequence_asr?   )rD   rG  Z
filter_opsr_  r2   r3   r   N  s    zKTPUExtended._tpu_function_creator.<locals>.tpu_function.<locals>.<listcomp>))r   Zvlogr   r   rW   r   Zconstantr   int32r	   r   Z&experimental_enable_dynamic_batch_sizer,   r?   r   rJ  r   rankr   ndimr[   r   r   ZTensorShaper  Z&experimental_bucketizing_dynamic_shaper'   ZPaddingSpecZPOWER_OF_TWOscopeZexperimental_xla_optionsr   r   r   r   r7   r   r   r  r\   r   )r`   ra   r|  r   rb   r}  Zflattened_listZinput_tensorr  Zmaximum_shaper~  r   r   r<   r   rI   r{  r  r3   tpu_function  sv   










z7TPUExtended._tpu_function_creator.<locals>.tpu_function)r   r1   r   r   r   r   )rI   r<   r   r  r2   r  r3   rw     s   
N

z!TPUExtended._tpu_function_creatorc                 C   rc  )zAWhether this strategy indicates working in multi-worker settings.Fr2   r   r2   r2   r3   _in_multi_worker_modeY  s   z!TPUExtended._in_multi_worker_modec                 C   rW  r5   r2   )rI   rz  r2   r2   r3   _get_local_replica_idb  rY  z!TPUExtended._get_local_replica_id)NNNFFr5   )NNNN)7r   r   r   r   ro   r   r   r   r   r   Z
PER_WORKERr   r   r   r   r   r   r   r  r  
contextlibcontextmanagerr  r  r4  r   r-  r9  rH  rE  rN  rU  rV  rX  r`  rv   rw   r   rd  rg  rh  ri  rj  rl  r(   rm  rn  rs  rp  rv  r   rw  r  r  r   r2   r2   ry   r3   rp   [  s    l*

`
 l
1.











		


Y	rp   c                 C   s8   t | tr| dkr| S | | S tt| d| | | S r>   )r7   r]  r   where_v2r!   Zgreater_equal)rA  r  r2   r2   r3   _make_axis_nonnegativef  s   

r  c                   @   s@   e Zd ZdZdddZedd Zdd Zd	d
 ZdddZ	dS )r  z+Replication Context class for TPU Strategy.r   c                 C   s   t jj| ||d d S )Nry  )r   ReplicaContextro   )rI   r{  rz  r2   r2   r3   ro     s   
z_TPUReplicaContext.__init__c                 C   s>   t |  | j}t| j}|d u rtdfS |jj	| fS r>   )
r   Zrequire_replica_contextZ	_strategyr   Zconstant_valuerz  r'   r  ru   rj  )rI   Zdsr   r2   r2   r3   r     s   
z_TPUReplicaContext.devicesc                 C   s   | j j|S )r  )r{  ru   r  )rI   r   r2   r2   r3   r    s   z._TPUReplicaContext.experimental_logical_devicec                 C   sN   t |trt|}||  | j9  < |S ttt|||t	j |}|S r5   )
r7   r]  r   r   r   r  r!   equalr   r   )rI   value_shape
value_rankrA  output_shaper2   r2   r3    _compute_all_gather_output_shape  s   
z3_TPUReplicaContext._compute_all_gather_output_shapeNc                    sX   ~t |D ]}t|tjrtdqfdd  fddt |D }t ||S )Nz)all_gather does not support IndexedSlicesc                    s  t | } | jjd u rt| }t| }n#| jj}| j }t| }tt|D ]}|| d u r8|| ||< q*t||}t	|t
rPdg|d  } j||< nttt|d | jd} |||}| jtv rt j j}t||}t|| j}t| || }	 tjj|	}	t|	|S tj| |d}
t|
|}
tj|
|| jd}t jdg}t| jg}tj|dd jd}tjt| jdd}tj|||d}t||S )Nr   r@  r[  r   ) r   r1  r   r  r   as_listr   r\   r  r7   r]  r   r  r!   r  r  r   &_DTYPES_SUPPORTED_BY_CROSS_REPLICA_SUMZone_hotrz  r   castZexpand_dimsZ
all_reducer   rK  rL  r   r*   r^  Zargmaxgather)r>  rA  r  r  Zvalue_shape_tensorrb   Zreplica_broadcast_shaper  Zreplica_id_maskZgathered_valuer   Zunordered_outputZconcat_replica_idZxla_to_replica_context_idZreplica_context_to_xla_idZsorted_with_extra_dimr   r2   r3   _all_gather_tensor  s   






	z9_TPUReplicaContext.all_gather.<locals>._all_gather_tensorc                    s   g | ]} |d qS )r@  r2   )rD   t)r  rA  r2   r3   r     s    z1_TPUReplicaContext.all_gather.<locals>.<listcomp>)r,   r?   r7   r   ZIndexedSlicesr;   r  )rI   r>  rA  Zexperimental_hintsrF   Zysr2   )r  rA  rI   r3   
all_gather  s   [z_TPUReplicaContext.all_gatherr  r5   )
r   r   r   r   ro   r   r   r  r  r  r2   r2   r2   r3   r    s    

r  c                 C   s\   t | j|}| j D ]\}}|| }|du r t|||< q|d ||< q| | dS )z0Sets the last step outputs on the given context.Nr   )r,   r  r   Z_last_step_outputs_reduce_opsrO   r   Z
PerReplicar
  )r   r  Zlast_step_tensor_outputs_dictrS   rM  rG  r2   r2   r3   r
    s   r
  )hr   r   r   r  rt  r^   r   Zabslr   numpyr   Z tensorflow.python.autograph.corer   r~   Z tensorflow.python.autograph.implr   r}   Z+tensorflow.python.compiler.xla.experimentalr   Ztensorflow.python.distributer   rI  r   r   r	   r
   r   r   r   r   r   r   r   Z-tensorflow.python.distribute.cluster_resolverr   r   Ztensorflow.python.distribute.v1r   Ztensorflow.python.eagerr   r   r   Ztensorflow.python.frameworkr   r   r   r   r   r   r   r   r   r   Ztensorflow.python.opsr   r    r!   r"   r#   rQ  Ztensorflow.python.ops.raggedr$   Ztensorflow.python.saved_modelr%   Ztensorflow.python.tpur&   Zdevice_assignment_libr'   r(   r)   Ztensorflow.python.tpu.opsr*   Ztensorflow.python.utilr+   r,   r-   Z tensorflow.python.util.tf_exportr.   rC  r0   r4   r  r6   r=   re   ZStrategyrg   Zdeprecated_endpointsrk   Z
StrategyV1r   ZStrategyExtendedV1rp   r  Zbfloat16Zfloat16Zfloat32Zfloat64r  Zuint32r  r  r  r
  r2   r2   r2   r3   <module>   s   	

w   
+
Uf        
 
