add max-keys=2 optimization for spark workloads (#18154)

comment in the code provides more detailed explanation
on what this PR entails and its assumptions.

this PR reduces the amount of listing() by an order
of magnitude, however there are other such calls that
still needs further optimization that shall be done
in subsequent PRs.
This commit is contained in:
Harshavardhana 2023-10-02 07:52:59 -06:00 committed by GitHub
parent 603437e70f
commit a2ab21e91c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -25,6 +25,7 @@ import (
"io"
"math/rand"
"net/http"
"path"
"sort"
"strconv"
"strings"
@ -1323,6 +1324,85 @@ func (z *erasureServerPools) ListObjects(ctx context.Context, bucket, prefix, ma
}
opts.setBucketMeta(ctx)
ri := logger.GetReqInfo(ctx)
hadoop := ri != nil && strings.Contains(ri.UserAgent, `Hadoop `) && strings.Contains(ri.UserAgent, "scala/")
matches := func() bool {
if prefix == "" {
return false
}
// List of standard files supported by s3a
// that involves a List() on a directory
// where directory is actually an object on
// namespace.
for _, k := range []string{
"_SUCCESS/",
".parquet/",
".csv/",
".json/",
".avro/",
".orc/",
".txt/",
// Add any other files in future
} {
if strings.HasSuffix(prefix, k) {
return true
}
}
return false
}
if hadoop && matches() && delimiter == SlashSeparator && maxKeys == 2 && marker == "" {
// Optimization for Spark/Hadoop workload where spark sends a garbage
// request of this kind
//
// GET /testbucket/?list-type=2&delimiter=%2F&max-keys=2&prefix=parquet%2F_SUCCESS%2F&fetch-owner=false
//
// Here spark is expecting that the List() return empty instead, so from MinIO's point
// of view if we simply do a GetObjectInfo() on this prefix by treating it as an object
// We save a lot of calls over the network.
//
// This happens repeatedly for all objects that are created concurrently() avoiding this
// as a List() call is an important performance improvement.
//
// Spark based s3a committers are a big enough use-case to have this optimization.
//
// A sample code to see the improvements is as follows, this sample code is
// simply a read on JSON from MinIO and write it back as "parquet".
//
// import org.apache.spark.sql.SparkSession
// import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
// object SparkJSONRead {
// def main(args: Array[String]): Unit = {
// val spark:SparkSession = SparkSession.builder()
// .appName("SparkByExample")
// .master("local[1]").getOrCreate()
//
// spark.sparkContext.setLogLevel("ERROR")
// spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", "http://minio-lb:9000")
// spark.sparkContext.hadoopConfiguration.set("fs.s3a.path.style.access", "true")
// spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", "minioadmin")
// spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", "minioadmin")
//
// val df = spark.read.json("s3a://testbucket/s3.json")
//
// df.write.parquet("s3a://testbucket/parquet/")
// }
// }
objInfo, err := z.GetObjectInfo(ctx, bucket, path.Dir(prefix), ObjectOptions{NoLock: true})
if err == nil {
if opts.Lifecycle != nil {
evt := evalActionFromLifecycle(ctx, *opts.Lifecycle, opts.Retention, objInfo)
if evt.Action.Delete() {
globalExpiryState.enqueueByDays(objInfo, evt, lcEventSrc_s3ListObjects)
if !evt.Action.DeleteRestored() {
// Skip entry if ILM action was DeleteVersionAction or DeleteAction
return loi, nil
}
}
}
return loi, nil
}
}
if len(prefix) > 0 && maxKeys == 1 && marker == "" {
// Optimization for certain applications like
// - Cohesity