お客様は、数十のKubernetesネームスペースにまたがる数百のWebLogicドメインを管理することで、オペレータのタスクを実行することを期待しています。 したがって、効率的なユーザー・レベルのスレッド・パターンを使用してオペレータを設計しました。 このパターンを使用して、Kubernetes APIリクエストの非同期コール・モデルを実装しました。 このコール・モデルには、タイムアウトのサポート、指数バックオフによる再試行、および継続機能を使用してリクエストされた最大サイズを超えるリストが組み込まれています。
ユーザー・レベルのスレッド・パターンは、oracle.kubernetes.operator.work
パッケージのクラスによって実装されます。
Engine
: Fibers
のエグゼキュータ・サービスおよびファクトリ。Fiber
: ユーザー・レベルのスレッド。 Fibers
は、一連のSteps
を介した単一の処理フローの実行を表します。 Fibers
は一時停止され、後で再開される可能性があり、一時停止中にThread
を使用しません。 Step
: 処理フロー内の個々のCPUバウンド・アクティビティ。Packet
: 処理フローのコンテキスト。NextAction
: 次に何が起こるかを示すためにStep
がFiber
に制御を返すときに使用されます。 一般的な次のアクションは、別のStep
の実行またはFiber
の一時停止です。 Component
: 処理フローに役立つSPIのプロバイダ。Container
: 含まれている環境を表し、Component
です。各Step
には、処理フローの次のStep
への参照があります。ただし、Step
がFiber
にNextAction
を返すときに、Steps
がFiber
によって次のStep
を起動することを示す必要はありません。 これにより、Fibers
が一連のSteps
を起動する一般的なユースケースが発生します。このユースケースは、is-next関係によってリンクされていますが、一般的には、Fiber
が通常のフローに戻る前に、Steps
のセットを詳細に沿って起動します。
このサンプルでは、コール元はEngine
、Fiber
、リンクされたStep
インスタンスのセットおよびPacket
を作成します。 その後、Fiber
が起動されます。 Engine
はScheduledExecutorService
によってバックアップされるため、通常はシングルトンになります。 Packet
には、通常、Steps
がapply()
メソッドで使用する値も事前にロードされています。
static class SomeClass {
public static void main(String[] args) {
Engine engine = new Engine("worker-pool");
Fiber fiber = engine.createFiber();
Step step = new StepOne(new StepTwo(new StepThree(null)));
Packet packet = new Packet();
fiber.start(
step,
packet,
new CompletionCallback() {
@Override
public void onCompletion(Packet packet) {
// Fiber has completed successfully
}
@Override
public void onThrowable(Packet packet, Throwable throwable) {
// Fiber processing was terminated with an exception
}
});
}
}
Steps
は、apply()
内からスリープまたはブロッキング・コールを呼び出すことはできません。 これにより、ワーカー・スレッドが他のFibers
にサービスを提供できなくなります。 かわりに、非同期コールおよびFiber
の一時停止/再開パターンを使用してください。 Step
には、Fiber
の一時停止/再開を実行するためのNextAction
を作成するメソッドdoDelay()
が用意されています。これは、遅延中にワーカー・スレッドが他のFibers
にサービスを提供できるため、スリープよりも正確に優れたオプションです。 非同期IOまたは同様のパターンの場合は、Fiber
を一時停止します。 Fiber
が一時停止したときのコールバックで、非同期コールを開始します。 最後に、コールが完了したら、Fiber
を再開します。 一時停止/再開機能は、一時停止コールバックが完了する前に再開されたケースを処理します。
このサンプルでは、ステップで非同期ファイルIOおよび一時停止/再開Fiber
パターンを使用します。
static class StepTwo extends Step {
public StepTwo(Step next) {
super(next);
}
@Override
public NextAction apply(Packet packet) {
return doSuspend((fiber) -> {
// The Fiber is now suspended
// Start the asynchronous call
try {
Path path = Paths.get(URI.create(this.getClass().getResource("/somefile.dat").toString()));
AsynchronousFileChannel fileChannel =
AsynchronousFileChannel.open(path, StandardOpenOption.READ);
ByteBuffer buffer = ByteBuffer.allocate(1024);
fileChannel.read(buffer, 0, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
void completed(Integer result, ByteBuffer attachment) {
// Store data in Packet and resume Fiber
packet.put("DATA_SIZE_READ", result);
packet.put("DATA_FROM_SOMEFILE", attachment);
fiber.resume(packet);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
// log exc
completed(0, null);
}
});
} catch (IOException e) {
// log exception
// If not resumed here, Fiber will never be resumed
}
});
}
}
非同期コール・モデルは、CallBuilder
やResponseStep
などのoracle.kubernetes.operator.helpers
パッケージのクラスによって実装されます。 モデルは、前述のFiber
中断/再開パターンに基づいています。 CallBuilder
には、listPodAsync()
やdeleteServiceAsync()
など、名前がAsyncで終わる多くのメソッドが用意されています。 これらのメソッドは、NextAction
の一部として返すことができるStep
を返します。 これらのSteps
を作成する場合、開発者はResponseStep
を提供する必要があります。 ResponseStep.onSuccess()
のみを実装する必要がありますが、Kubernetesは404 (Not Found)
を失敗として扱うため、onFailure()
をオーバーライドすると便利な場合があります。
このサンプルでは、開発者はパターンを使用して、cluster-1
の一部としてラベル付けされているデフォルトのネームスペースからポッドを一覧表示します。
static class StepOne extends Step {
public StepOne(Step next) {
super(next);
}
@Override
public NextAction apply(Packet packet) {
String namespace = "default";
Step step = CallBuilder.create().with($ -> {
$.labelSelector = "weblogic.clusterName=cluster-1";
$.limit = 50;
$.timeoutSeconds = 30;
}).listPodAsync(namespace, new ResponseStep<V1PodList>(next) {
@Override
public NextAction onFailure(Packet packet, ApiException e, int statusCode,
Map<String, List<String>> responseHeaders) {
if (statusCode == CallBuilder.NOT_FOUND) {
return onSuccess(packet, null, statusCode, responseHeaders);
}
return super.onFailure(packet, e, statusCode, responseHeaders);
}
@Override
NextAction onSuccess(Packet packet, V1PodList result, int statusCode,
Map<String, List<String>> responseHeaders) {
// do something with the result Pod, if not null
return doNext(packet);
}
});
return doNext(step, packet);
}
}
namespace
などの必須パラメータはメソッド引数ですが、オプションのパラメータは、with()
およびラムダを使用して簡易ビルダー・パターンを使用して指定されます。
onFailure()
のデフォルトの動作では、ステータス・コード429 (TooManyRequests)
, 500 (InternalServerError)
, 503 (ServiceUnavailable)
, 504 (ServerTimeout)
でのリクエストの指数関数的バックオフ、またはサーバーからのレスポンスなしの単純なタイムアウトを使用して再試行します。
サーバーがステータス・コード409 (Conflict)
で応答する場合、これはオプティミスティック・ロックの失敗を示します。 一般的なユースケースは、コードがある非同期ステップでKubernetesオブジェクトを読み取り、オブジェクトを変更し、別の非同期ステップでそのオブジェクトを置換しようとしたが、暫定的に別のアクティビティがその同じオブジェクトを置換したことです。 この場合、リクエストを再試行しても同じ結果になります。 したがって、開発者は、super.onFailure()
をコールするときに競合時のステップを提供できます。 競合ステップは、指数関数的バックオフ遅延の後に呼び出されます。 この例では、競合ステップは既存のKubernetesオブジェクトを読み取るステップである必要があります。