Project Page

P2P Grid

Introduction

Scheduling

Basic concepts

Process migration

Java MPI

System overview

Simulations





Organic Computing as a concept to control parallel computations in P2P Grids

Process Migration and Checkpointing

Process migration within a distributed computing environment signifies the transfer of a running process from one node to another in order to continue its execution there. In a Java-based system this means transferring a running Java thread, which consists of a set of class files and its execution state. Process migration enables dynamic scheduling for the purpose of an adaptive load balancing, fault-tolerance through checkpointing, and data access locality. Moreover, the decoupling of job execution from job initiation also requires the possibility of process migration. During the execution of a parallel computation certain process migrations are necessary, e.g. for the transmission of a job to the Primary Scheduler, for the distribution of processes among the Workers, and in order to pass back results to the Initiator.

Dynamic scheduling: the scheduling of communicating processes, which are executed over the Internet, has to be adaptive to temporal variations of connectivity. For the purpose of such a dynamic scheduling predicated on disposition values a transparent mechanism of process migration is provided by the system. This migration mechanism uses JavaGo. Since a Java Message Passing Interface (JMPI) application cannot migrate during its execution, a preprocessor (jgoc, abbr. for JavaGo compiler) generates mobile code, i.e. a JMPI application whose processes are able to migrate during their execution. Usually, a migration is induced by a go() statement in the application. According to the fundamental requirement of transparency this statement is hidden in the communication methods of the MPI package. Possible migration points are within the COMM.send() method or within the COMM.recv() method after a message passing was completed. A migration server, called migServer, is integrated into the communication layer of the system and listens at a dedicated port for the directive to migrate a process. This enables a scheduling instance to initiate the migration of a process from arbitrary nodes in the system. Correspondingly, a method migrationCall() is placed at the disposal of the scheduler in order to send a migration request to a Worker's migServer. The migServer can be regarded as an interface to the JavaGo migration server, which actually carries out the process migration. The preprocessor and the migration server together constitute JavaGo.

Transmission of jobs: if a distributed computation is started by the MPI_Exec application, the parameter n that identifies the number of processes, as well as the path and name of the JMPI class file have to be transmitted to the local peer. For this purpose, a simple socket connection is used to contact the local peer and pass these parameters. The received parameters are stored in an object of the class JobDesrcriptor that is instantiated with a unique job ID. Then, the job is executed locally until the MPI.init() method is called inside the application. After that a request REQ_SCHEDULE_PROCS including the number of required Workers is sent to peers in the knownPeers hash map, until the acknowledgment ACCEPT_SCHEDULE_PROCS is received from any peer that itself has not to be a known peer. This peer becomes the Primary Scheduler of the job and receives the JobDesrcriptor object. In the next step, the Initiator has to send the running job to the Primary Scheduler. Since the execution state has to be restored at each Worker, first of all it has to be transmitted to the Primary Scheduler. As distinguished from a standard process migration the process execution has not to be continued at the destination node. Instead, the Primary Scheduler should write the process data into a file. However, there is no corresponding service supported by JavaGo. To meet the requirements we adapted JavaGo as explained further below.

Initializing distributed computations: by the time the Primary Scheduler receives the running job from the Initiator this job consists of a single process. Now, the Primary Scheduler is going to replicate this process n times. To this end, first it chooses n peers as Workers among members of its availablePeers hash map according to their disposition values. Peers can only be assigned to participate if an ACCEPT_COMPUTE_PROC acknowledgment is replied to the participation request REQ_COMPUTE_PROC. Then, information about the IP address, the port and the disposition value of each participating peer is stored in the process descriptor table (PDT). In order to avoid a single point of failure, the Primary Scheduler chooses at least one other peer with a high disposition value as Secondary Scheduler that keeps a duplicate of the PDT and the JobDesrcriptor object. After that n duplicates of the process received from the Initiator have to be distributed among the chosen Workers. This requires multiple migrations of a single process that was written into a file.

Finalizing distributed computations: the computation of a single JMPI process is completed on a Worker, when the MPI.finish() method is called. For reasons of simplicity and contrary to the MPI-specification, our convention is that the result of a distributed computation has to be allocated at the process with rank zero. The corresponding Worker sends this process to the Primary Scheduler using its migServer. After having received the process with rank zero, the Primary Scheduler writes this process into a file on its local file system, which can be fetched by the Initiator at a later date. Then, the Initiator has to receive the process with rank zero in its current execution state in order to continue the execution after the MPI_finish() method.

Checkpointing: in a P2P Grid environment peers frequently connect to and disconnect from the system. If a single Worker disconnects during a computation, this should not necessitate restarting the entire computation. Therefore, during a computation every Worker frequently sets a checkpoint by saving the execution state as a local file and sending a copy to one of the schedulers. In contrast to a process migration, which includes the transfer of all involved classes, in this case only the execution state needs to be transfered to a scheduler. The latest checkpoint of each Worker is saved to a file at each scheduler. Setting a checkpoint is initialized by the Primary Scheduler via the MigServer. A challenge is the synchronization of all checkpoints.
If a Worker disconnects, all processes have to be restored to a checkpoint. A method for identifying that a Worker has disconnected from the system is to implement a timeout exception in the MPI_recv() method that sends a request for rescheduling with the corresponding rank to the Primary Scheduler. The scheduler selects a new Worker to execute the process identified by that rank. Using the corresponding checkpoint file the Primary Scheduler is able to restart the process on the selected Worker via the MigServer. In addition, it sends requests to all Workers for restoring their execution states by loading the checkpoint files from their local file systems.

Storing the execution state in a file: as mentioned above, a running Java thread consists of a set of class files and its execution state. A detailed explanation of how JavaGo preserves and restores the execution state of a Java thread is given in a PDF on the homepage of the author. While classes are transmitted in a so-called ClassCache object between instances of the class BasicClassLoader, the execution state is transmitted in a Message object called Query. This object contains different fields and is depicted in the figure below. The execution state is stored in the byte array Continuation, the names and paths of all involved class files are stored in a hash set object ActiveClass.






The Query object is initialized by the method sendContinuation()of the class StackFrame and transmitted via the migration server. On the receiver's side a thread listens at a server socket waiting for an incoming query object. The received object stream is written into a Message object by an instance of the class SocketHandler.

In order to set a checkpoint the received object stream has to be written into a file. For this purpose a class Checkpoint was added providing a method setCP(Message object). The checkpoint file can be sent to the Primary Scheduler using object serialization. Furthermore, a method sendCheckpoint(int node) was added to the class StackFrame. This enables a scheduling instance to restore a checkpoint on a selected worker node from remote.

top next: Java MPI (JMPI) orco


Last modified: Thu Feb 05 20:33:00 CET 2009