C H A P T E R  5

One-Sided Communication

This chapter describes performance issues related to MPI-2 standard one-sided communication:


Introducing One-Sided Communication

The most common use of MPI calls is for two-sided communication. That is, if data moves from one process address space to another, the data movement has to be specified on both sides: the sender's and the receiver's. For example, on the sender's side, it is common to use MPI_Send() or MPI_Isend() calls. On the receiver's side, it is common to use MPI_Recv() or MPI_Irecv() calls. An MPI_Sendrecv() call specifies both a send and a receive.

Even collective calls, such as MPI_Bcast(), MPI_Reduce(), and MPI_Alltoall(), require that every process that contributes or receives data must explicitly do so with the correct MPI call.

The MPI-2 standard introduces one-sided communication. Notably, MPI_Put() and MPI_Get() allow a process to access another process address space without any explicit participation in that communication operation by the remote process.


Comparing Two-Sided and One-Sided Communication

In selected circumstances, one-sided communication offers several advantages:

Further, because a sender cannot usually write directly into a receiver's address space, some intermediate buffer is likely to be used. If such buffering is small compared to the data volume, additional synchronization occurs whenever the sender must wait for the receiver to free up buffering. With one-sided communication, you must still specify synchronization to order memory operations, but you can amortize a single synchronization over many data movements.


Basic Sun MPI Performance Advice

Observe two principles to get good performance with one-sided communication with Sun MPI:

1. When creating a window (choosing what memory to make available for one-sided operations), use memory that has been allocated with the MPI_Alloc_mem routine. This suggestion in the MPI-2 standard benefits Sun MPI users:

http://www.mpi-forum.org/docs/mpi-20-html/node119.htm#Node119

2. Use one-sided communication over the shared memory (SHM) or remote shared memory (RSM) protocol module. That is, use one-sided communication between MPI ranks that either share the same shared-memory node or communicate via the Sun Fire Link interconnect. Protocol modules are chosen by Sun MPI at runtime and can be checked by setting the environment variable MPI_SHOW_INTERFACES=2 before launching your MPI job.

Further one-sided performance considerations for Sun MPI are discussed in Appendix A and Appendix B.


Case Study: Matrix Transposition

This section illustrates some of the advantages of one-sided communication with a particular example: matrix transposition. While one-sided communication is probably best suited for dynamic, unstructured computations, matrix transposition offers a relatively simple illustration.

Matrix transposition refers to the exchange of the axes of a rectangular array, flipping the array about the main diagonal. For example, in , after a transposition the array shown on the left side becomes the array shown on the right:

 FIGURE 5-1 Matrix Transposition

Graphic image illustrating matrix transposition.

There are many ways of distributing elements of a matrix over the address spaces of multiple MPI processes. Perhaps the simplest is to distribute one axis in a block fashion. For example, our example transposition might appear distributed over two MPI processes (process 0 (p 0), and process 1 (p 1)) such as in :

 FIGURE 5-2 Matrix Transposition, Distributed Over Two Processes.

Graphic image illustrating matrix transposition distributed over two processes.

The ordering of multidimensional arrays within a linear address space varies from one programming language to another. A Fortran programmer, though, might think of the preceding matrix transposition as looking like :

 FIGURE 5-3 Matrix Transposition, Distributed Over Two Processes (Fortran Perspective).

Graphic image illustrating matrix transposition distributed over two processes (Fortran perspective).

In the final matrix, as shown in , elements that have stayed on the same process show up in bold font, while elements that have moved between processes are underlined.

These transpositions move data between MPI processes. Note that no two matrix elements that start out contiguous end up contiguous. There are several strategies to effecting the interprocess data movement:

is an example of maximal aggregation for our 4x4 transposition:

 FIGURE 5-4 Matrix Transposition, Maximal Aggregation for 4X4 Transposition

Graphic image illustrating matrix transposition, maximal aggregation for 4X4 transposition.

Test Program A

Program A, shown in CODE EXAMPLE 5-1:

Test Program B

Program B, shown in CODE EXAMPLE 5-2:

Test program B should outperform test program A because one-sided interprocess communication can write more directly to a remote process address space.

CODE EXAMPLE 5-2 Test Program B
include "mpif.h"
 
integer(kind=MPI_ADDRESS_KIND) nbytes
integer win
real(8) c(*)
pointer (cptr,c)
 
real(8), allocatable, dimension(:) :: a, b, d
real(8) t0, t1, t2, t3
 
! initialize parameters
call init(me,np,n,nb)
! allocate matrices
allocate(a(nb*np*nb))
allocate(b(nb*nb*np))
allocate(d(nb*np*nb))
 
nbytes = 8 * nb * nb * np
call MPI_Alloc_mem(nbytes, MPI_INFO_NULL, cptr, ier)
if ( ier .eq. MPI_ERR_NO_MEM  ) stop
 
! create window
call MPI_Win_create(c, nbytes, 1, MPI_INFO_NULL, MPI_COMM_WORLD, win, ier)
 
! initialize matrix
call initialize_matrix(me,np,nb,a)
 
! timing
do itime = 1, 10
  call MPI_Barrier(MPI_COMM_WORLD,ier)
  t0 = MPI_Wtime()
 
  ! first local transpose
  do k = 1, nb
  do j = 0, np - 1
    ioffa = nb * (   j   + np * (k-1) )
    ioffb = nb * ( (k-1) + nb *   j   )
    do i = 1, nb
      b(i+ioffb) = a(i+ioffa)
    enddo
  enddo
  enddo
  t1 = MPI_Wtime()
 
  ! global all-to-all
  call MPI_Win_fence(0, win, ier)
  do ip = 0, np - 1
    nbytes = 8 * nb * nb * me
    call MPI_Put(b(1+nb*nb*ip), nb*nb, MPI_REAL8, ip, nbytes, & 
                                  nb*nb, MPI_REAL8, win, ier)
  enddo
  call MPI_Win_fence(0, win, ier)
  t2 = MPI_Wtime()
 
  ! second local transpose
  call dtrans(`o', 1.d0, c, nb, nb*np, d)
 
  call MPI_Barrier(MPI_COMM_WORLD,ier)
  t3 = MPI_Wtime()
if ( me .eq. 0 ) &
  write(6,'(f8.3," seconds; breakdown on proc 0 = ",3f10.3)') &
  t3 - t0, t1 - t0, t2 - t1, t3 - t2
 
enddo
 
! check
call check_matrix(me,np,nb,d)
 
! deallocate matrices and stuff
call MPI_Win_free(win, ier)
deallocate(a)
deallocate(b)
deallocate(d)
call MPI_Free_mem(c, ier)
 
call MPI_Finalize(ier)
end

Test Program C

Program C, shown inCODE EXAMPLE 5-3:

  • Performs no data aggregation before the interprocess communication.
  • Establishes interprocess communication using numerous small MPI_Put() calls.
  • Rearranges data locally using a DTRANS() call.

Test program C should outperform test program B because program C eliminates aggregation before the interprocess communication. Such a strategy would be more difficult to implement with two-sided communication, which would have to make trade-offs between programming complexity and increased interprocess synchronization.

CODE EXAMPLE 5-3 Test Program C
include "mpif.h"
 
integer(kind=MPI_ADDRESS_KIND) nbytes
integer win
real(8) c(*)
pointer (cptr,c)
 
real(8), allocatable, dimension(:) :: a, b, d
real(8) t0, t1, t2, t3
 
! initialize parameters
call init(me,np,n,nb)
 
! allocate matrices
allocate(a(nb*np*nb))
allocate(b(nb*nb*np))
allocate(d(nb*np*nb))
 
nbytes = 8 * nb * nb * np
call MPI_Alloc_mem(nbytes, MPI_INFO_NULL, cptr, ier)
if ( ier .eq. MPI_ERR_NO_MEM  ) stop
 
! create window
call MPI_Win_create(c, nbytes, 1, MPI_INFO_NULL, MPI_COMM_WORLD, win, ier)
 
! initialize matrix
call initialize_matrix(me,np,nb,a)
 
! timing
do itime = 1, 10
  call MPI_Barrier(MPI_COMM_WORLD,ier)
  t0 = MPI_Wtime()
  t1 = t0
 
  ! combined local transpose with global all-to-all
  call MPI_Win_fence(0, win, ier)
  do ip = 0, np - 1
  do ib = 0, nb - 1
    nbytes = 8 * nb * ( ib + nb * me )
    call MPI_Put(a(1+nb*ip+nb*np*ib), nb, MPI_REAL8, ip, nbytes, &
                                        nb, MPI_REAL8, win, ier)
  enddo
  enddo
  call MPI_Win_fence(0, win, ier)
  t2 = MPI_Wtime()
 
  ! second local transpose
  call dtrans(`o', 1.d0, c, nb, nb*np, d)
 
  call MPI_Barrier(MPI_COMM_WORLD,ier)
  t3 = MPI_Wtime()
 
  if ( me .eq. 0 ) &
    write(6,'(f8.3," seconds; breakdown on proc 0 = ",3f10.3)') &
    t3 - t0, t1 - t0, t2 - t1, t3 - t2
enddo
 
! check
call check_matrix(me,np,nb,d)
 
! deallocate matrices and stuff
call MPI_Win_free(win, ier)
deallocate(a)
deallocate(b)
deallocate(d)
call MPI_Free_mem(c, ier)
 
call MPI_Finalize(ier)
end

Test Program D

Program D, shown in CODE EXAMPLE 5-4:

  • Performs no data aggregation before the interprocess communication.
  • Establishes interprocess communication using very numerous small MPI_Put() calls.
  • Rearranges no data after the interprocess communication call.

Test program D eliminates all local data movement before and after the interprocess step, but it is slow because it moves all the data one matrix element at a time.

CODE EXAMPLE 5-4 Test Program D
include "mpif.h"
 
integer(kind=MPI_ADDRESS_KIND) nbytes
integer win
real(8) c(*)
pointer (cptr,c)
 
real(8), allocatable, dimension(:) :: a, b, d
real(8) t0, t1, t2, t3
 
! initialize parameters
call init(me,np,n,nb)
 
! allocate matrices
allocate(a(nb*np*nb))
allocate(b(nb*nb*np))
allocate(d(nb*np*nb))
 
nbytes = 8 * nb * nb * np
call MPI_Alloc_mem(nbytes, MPI_INFO_NULL, cptr, ier)
if ( ier .eq. MPI_ERR_NO_MEM  ) stop
 
! create window
call MPI_Win_create(c, nbytes, 1, MPI_INFO_NULL, MPI_COMM_WORLD, win, ier)
 
! initialize matrix
call initialize_matrix(me,np,nb,a)
 
! timing
do itime = 1, 10
  call MPI_Barrier(MPI_COMM_WORLD,ier)
  t0 = MPI_Wtime()
  t1 = t0
 
  ! combined local transpose with global all-to-all
  call MPI_Win_fence(0, win, ier)
  do ip = 0, np - 1
  do ib = 0, nb - 1
  do jb = 0, nb - 1
    nbytes = 8 * ( ib + nb * ( me + np * jb ) )
    call MPI_Put(a(1+jb+nb*(ip+np*ib)), 1, MPI_REAL8, ip,nbytes, &
                                          1, MPI_REAL8, win, ier)
  enddo
  enddo
  enddo
  call MPI_Win_fence(0, win, ier)
  call MPI_Barrier(MPI_COMM_WORLD,ier)
  t2 = MPI_Wtime()
  t3 = t2
 
  if ( me .eq. 0 ) &
    write(6,'(f8.3," seconds; breakdown on proc 0 = ",3f10.3)') &
    t3 - t0, t1 - t0, t2 - t1, t3 - t2
enddo
 
! check
call check_matrix(me,np,nb,c)
 
! deallocate matrices and stuff
call MPI_Win_free(win, ier)
deallocate(a)
deallocate(b)
deallocate(d)
call MPI_Free_mem(c, ier)
 
call MPI_Finalize(ier)
end

Utility Routines

Test programs A, B, C, and D use the utility routines shown in CODE EXAMPLE 5-5, CODE EXAMPLE 5-6, and CODE EXAMPLE 5-7 to initialize parameters, initialize the test matrix, and check the transposition.

 

CODE EXAMPLE 5-5 The init Subroutine
subroutine init(me,np,n,nb)
 
include "mpif.h"
 
! usual MPI preamble
call MPI_Init(ier)
call MPI_Comm_rank(MPI_COMM_WORLD, me, ier)
call MPI_Comm_size(MPI_COMM_WORLD, np, ier)
 
! get matrix rank n
if ( me .eq. 0 ) then
  write(6,*) "matrix rank n?"
  read(5,*) n
  if ( mod(n,np) .ne. 0 ) then
    n = np * ( n / np )
    write(6,*) "specified matrix rank not a power of np =", np
    write(6,*) "using n =", n
  endif
endif
call MPI_Bcast(n,1,MPI_INTEGER,0,MPI_COMM_WORLD,ier)
nb = n / np
 
end

 

CODE EXAMPLE 5-6 The initialize_matrix Subroutine
subroutine initialize_matrix(me,np,nb,x)
 
real(8) x(*)
 
n = nb * np
 
do k = 1, nb
do j = 0, np - 1
do i = 1, nb
  l2 = k + nb * me
  l1 = i + nb * j
  x(i+nb*(j+np*(k-1))) = l1 + n * ( l2 - 1 )
enddo
enddo
enddo
 
end

 

CODE EXAMPLE 5-7 The check_matrix Subroutine
subroutine check_matrix(me,np,nb,x)
 
include "mpif.h"
 
real(8) x(*), error_local, error_global
 
n = nb * np
 
error_local = 0
do k = 1, nb
do j = 0, np - 1
do i = 1, nb
l2 = k + nb * me
  l1 = i + nb * j
  error_local = error_local + &
    abs( x(i+nb*(j+np*(k-1))) - ( l2 + n * ( l1 - 1 ) ) )
enddo
enddo
enddo
call MPI_Allreduce(error_local,error_global,1, &
  MPI_REAL8,MPI_SUM,MPI_COMM_WORLD,ier)
if ( me .eq. 0 ) write(6,*) "error:", error_global
 
end

Timing

Here are sample timings for the three factors on an (older-generation) Sun Enterprise 6000 server. An 8192x8192 matrix of 8-byte (64-bit) elements is transposed using 16 CPUs (that is, distributed over MPI processes).

Program

 

Total (seconds)

 

Local Aggregation

Interprocess Communication

Local DTRANS()

Call

A

2.109

0.585

0.852

0.673

B

1.797

0.587

0.466

0.744

C

1.177

0.000

0.430

0.747

D

4.908

0.000

4.908

0.000


Note that test program B is twice as fast in interprocess communication as test program A. This increased speed is because two-sided communication on a shared-memory server, while efficient, still moves data into a shared-memory area and then out again. In contrast, one-sided communication moves data directly from the address space of one process into that of the other. The aggregate bandwidth of the interprocess communication step for the one-sided case is:

8192 x 8192 x 8 byte / 0.466 seconds = 1.1 Gbyte/second

which is very close to the maximum theoretical value supported by this server (half of 2.6 Gbyte/second).

Further, the cost of aggregating data before the interprocess communication step can be eliminated, as demonstrated in test program C. This change adds a modest amount of time to the interprocess communication step, presumably due to the increased number of MPI_Put() calls that must be made. The most important advantage of this technique is that it eliminates the task of balancing the extra programming complexity and interprocess synchronization that two-sided communication requires.

Test program D allows complete elimination of the two local steps, but at the cost of moving data between processes one element at a time. The huge increase in runtime is evident in the timings. The issue here is not just overheads in interprocess data movement, but also the prohibitive cost of accessing single elements in memory rather than entire cache lines.