|
Process Migration and Checkpointing
Process migration within a distributed computing
environment signifies the transfer of a running process from one
node to another in order to continue its execution there. In a
Java-based system this means transferring a running Java thread,
which consists of a set of class files and its execution
state. Process migration enables dynamic scheduling for the purpose
of an adaptive load balancing, fault-tolerance through
checkpointing, and data access locality. Moreover, the decoupling
of job execution from job initiation also requires the possibility
of process migration. During the execution of a parallel
computation certain process migrations are necessary, e.g. for the
transmission of a job to the Primary Scheduler, for the
distribution of processes among the Workers, and in order to pass
back results to the Initiator.
Dynamic scheduling: the
scheduling of communicating processes, which are executed over the
Internet, has to be adaptive to temporal variations
of connectivity. For the purpose of such a dynamic scheduling
predicated on disposition values a transparent mechanism of process
migration is provided by the system. This migration mechanism uses
JavaGo. Since a Java Message Passing Interface (JMPI)
application cannot migrate during its execution, a preprocessor (jgoc, abbr. for JavaGo
compiler) generates mobile code, i.e. a JMPI application whose
processes are able to migrate during their execution. Usually, a
migration is induced by a go() statement in
the application. According to the fundamental requirement of
transparency this statement is hidden in the communication methods
of the MPI package. Possible migration points are within the COMM.send() method or within the COMM.recv() method after a message passing was completed. A
migration server, called migServer, is
integrated into the communication layer of the system and listens
at a dedicated port for the directive to migrate a process. This
enables a scheduling instance to initiate the migration of a
process from arbitrary nodes in the system. Correspondingly, a
method migrationCall() is placed at the disposal of the scheduler in
order to send a migration request to a Worker's migServer. The
migServer can be regarded as an interface to the JavaGo migration
server, which actually carries out the process migration. The
preprocessor and the migration server together constitute JavaGo.
Transmission of jobs: if a distributed
computation is started by the MPI_Exec
application, the parameter n
that identifies the number of processes, as well as the path and
name of the JMPI class file have to be transmitted to the local
peer. For this purpose, a simple socket connection is used to
contact the local peer and pass these parameters. The received
parameters are stored in an object of the class JobDesrcriptor that is instantiated with a unique
job ID. Then, the job is executed locally until the MPI.init() method is called inside the
application. After that a request REQ_SCHEDULE_PROCS
including the number of
required Workers is sent to peers in the knownPeers hash map, until the acknowledgment
ACCEPT_SCHEDULE_PROCS
is received from any peer
that itself has not to be a known peer. This peer becomes the
Primary Scheduler of the job and receives the JobDesrcriptor object. In the next step, the
Initiator has to send the running job to the Primary Scheduler.
Since the execution state has to be restored at each Worker, first
of all it has to be transmitted to the Primary Scheduler. As
distinguished from a standard process migration the process
execution has not to be continued at the destination node. Instead,
the Primary Scheduler should write the process data into a file.
However, there is no corresponding service supported by JavaGo. To
meet the requirements we
adapted JavaGo as explained further below.
Initializing distributed computations: by
the time the Primary Scheduler receives the running job from the
Initiator this job consists of a single process. Now, the Primary
Scheduler is going to replicate this process n times. To this end,
first it chooses n peers as Workers among members of its availablePeers hash map according to their
disposition values. Peers can only be assigned to participate if an
ACCEPT_COMPUTE_PROC acknowledgment is replied to the
participation request REQ_COMPUTE_PROC. Then, information about the IP
address, the port and the disposition value of each participating
peer is stored in the process
descriptor table (PDT). In
order to avoid a single point of failure, the Primary Scheduler chooses at least one other peer
with a high disposition value as Secondary Scheduler that keeps a
duplicate of the PDT and the JobDesrcriptor
object. After that n
duplicates of the process received from the Initiator have to be
distributed among the chosen Workers. This requires multiple
migrations of a single process that was written into a file.
Finalizing distributed computations: the
computation of a single JMPI process is completed on a Worker, when
the MPI.finish() method is called. For reasons of
simplicity and contrary to the MPI-specification, our convention is
that the result of a distributed computation has to be allocated at
the process with rank zero. The corresponding Worker sends this
process to the Primary Scheduler using its migServer. After having received the process
with rank zero, the Primary Scheduler writes this process into a
file on its local file system, which can be fetched by the
Initiator at a later date. Then, the Initiator has to receive the
process with rank zero in its current execution state in order to
continue the execution after the MPI_finish()
method.
Checkpointing: in a P2P Grid environment
peers frequently connect to and disconnect from the system. If a single
Worker disconnects during a computation, this should not necessitate
restarting the entire computation. Therefore, during a computation
every Worker frequently sets a checkpoint by saving the
execution state as a local file and sending a copy to one of the
schedulers. In contrast to a process migration, which includes the
transfer of all involved classes, in this case only the execution
state needs to be transfered to a scheduler. The latest checkpoint
of each Worker is saved to a file at each scheduler. Setting a
checkpoint is initialized by the Primary Scheduler via the
MigServer. A challenge is the synchronization of all checkpoints.
If a Worker disconnects, all processes have to be restored
to a checkpoint. A method for identifying that a Worker has
disconnected from the system is to implement a timeout exception in
the MPI_recv() method that sends a request for rescheduling with
the corresponding rank to the Primary Scheduler. The scheduler
selects a new Worker to execute the process identified by that
rank. Using the corresponding checkpoint file the Primary Scheduler
is able to restart the process on the selected Worker via the
MigServer. In addition, it sends requests to all Workers for
restoring their execution states by loading the checkpoint files
from their local file systems.
Storing the execution state in a file: as
mentioned above, a running Java thread consists of a set of class
files and its execution state. A detailed explanation of how JavaGo
preserves and restores the execution state of a Java thread is
given in a PDF on the homepage of the author. While
classes are transmitted in a so-called ClassCache object between instances of the class BasicClassLoader, the
execution state is transmitted in a Message object
called Query. This object contains different fields and is
depicted in the figure below. The execution state is stored in the
byte array Continuation, the names and paths of all involved class files
are stored in a hash set object ActiveClass.

The Query object is initialized by the method sendContinuation()of the
class StackFrame and
transmitted via the migration server. On the receiver's side a
thread listens at a server socket waiting for an incoming query
object. The received object stream is written into a Message object by an instance of the class SocketHandler.
In order to set a checkpoint the received object
stream has to be written into a file. For this purpose a class Checkpoint was added
providing a method
setCP(Message object). The checkpoint file can be sent to the
Primary Scheduler using object serialization. Furthermore, a method
sendCheckpoint(int
node) was added to the class StackFrame.
This enables a scheduling instance to restore a checkpoint on a
selected worker node from remote.
|